CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
catalog_mgr_rw.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "catalog_mgr_rw.h"
8 
9 #include <inttypes.h>
10 #include <unistd.h>
11 
12 #include <cassert>
13 #include <cstdio>
14 #include <cstdlib>
15 #include <string>
16 
17 #include "catalog_balancer.h"
18 #include "catalog_rw.h"
19 #include "manifest.h"
20 #include "statistics.h"
21 #include "upload.h"
22 #include "util/exception.h"
23 #include "util/logging.h"
24 #include "util/posix.h"
25 #include "util/smalloc.h"
26 
27 using namespace std; // NOLINT
28 
29 namespace catalog {
30 
31 WritableCatalogManager::WritableCatalogManager(
32  const shash::Any &base_hash,
33  const std::string &stratum0,
34  const string &dir_temp,
35  upload::Spooler *spooler,
36  download::DownloadManager *download_manager,
37  bool enforce_limits,
38  const unsigned nested_kcatalog_limit,
39  const unsigned root_kcatalog_limit,
40  const unsigned file_mbyte_limit,
41  perf::Statistics *statistics,
42  bool is_balanceable,
43  unsigned max_weight,
44  unsigned min_weight,
45  const std::string &dir_cache)
46  : SimpleCatalogManager(base_hash, stratum0, dir_temp, download_manager,
47  statistics, false, dir_cache,
48  true /* copy to tmpdir */)
49  , spooler_(spooler)
50  , enforce_limits_(enforce_limits)
51  , nested_kcatalog_limit_(nested_kcatalog_limit)
52  , root_kcatalog_limit_(root_kcatalog_limit)
53  , file_mbyte_limit_(file_mbyte_limit)
54  , is_balanceable_(is_balanceable)
55  , max_weight_(max_weight)
56  , min_weight_(min_weight)
57  , balance_weight_(max_weight / 2) {
58  sync_lock_ = reinterpret_cast<pthread_mutex_t *>(
59  smalloc(sizeof(pthread_mutex_t)));
60  int retval = pthread_mutex_init(sync_lock_, NULL);
61  assert(retval == 0);
62  catalog_processing_lock_ = reinterpret_cast<pthread_mutex_t *>(
63  smalloc(sizeof(pthread_mutex_t)));
64  retval = pthread_mutex_init(catalog_processing_lock_, NULL);
65  assert(retval == 0);
66 }
67 
68 
70  pthread_mutex_destroy(sync_lock_);
71  free(sync_lock_);
72  pthread_mutex_destroy(catalog_processing_lock_);
74 }
75 
76 
87  const shash::Any &catalog_hash,
88  Catalog *parent_catalog) {
89  return new WritableCatalog(
90  mountpoint.ToString(), catalog_hash, parent_catalog);
91 }
92 
93 
95  catalog->TakeDatabaseFileOwnership();
96 }
97 
98 
106  const string &dir_temp,
107  const bool volatile_content,
108  const std::string &voms_authz,
109  upload::Spooler *spooler) {
110  // Create a new root catalog at file_path
111  const string file_path = dir_temp + "/new_root_catalog";
112 
113  const shash::Algorithms hash_algorithm = spooler->GetHashAlgorithm();
114 
115  // A newly created catalog always needs a root entry
116  // we create and configure this here
117  DirectoryEntry root_entry;
119  root_entry.mode_ = 16877;
120  root_entry.size_ = 4096;
121  root_entry.mtime_ = time(NULL);
122  root_entry.uid_ = getuid();
123  root_entry.gid_ = getgid();
124  root_entry.checksum_ = shash::Any(hash_algorithm);
125  root_entry.linkcount_ = 2;
126  const string root_path = "";
127 
128  // Create the database schema and the initial root entry
129  {
130  const UniquePtr<CatalogDatabase> new_clg_db(
131  CatalogDatabase::Create(file_path));
132  if (!new_clg_db.IsValid()
133  || !new_clg_db->InsertInitialValues(
134  root_path, volatile_content, voms_authz, root_entry)) {
135  LogCvmfs(kLogCatalog, kLogStderr, "creation of catalog '%s' failed",
136  file_path.c_str());
137  return NULL;
138  }
139  }
140 
141  // Compress root catalog;
142  const int64_t catalog_size = GetFileSize(file_path);
143  if (catalog_size < 0) {
144  unlink(file_path.c_str());
145  return NULL;
146  }
147  const string file_path_compressed = file_path + ".compressed";
148  shash::Any hash_catalog(hash_algorithm, shash::kSuffixCatalog);
149  const bool retval =
150  zlib::CompressPath2Path(file_path, file_path_compressed, &hash_catalog);
151  if (!retval) {
152  LogCvmfs(kLogCatalog, kLogStderr, "compression of catalog '%s' failed",
153  file_path.c_str());
154  unlink(file_path.c_str());
155  return NULL;
156  }
157  unlink(file_path.c_str());
158 
159  // Create manifest
160  const string manifest_path = dir_temp + "/manifest";
161  manifest::Manifest *manifest = new manifest::Manifest(hash_catalog,
162  catalog_size, "");
163  if (!voms_authz.empty()) {
164  manifest->set_has_alt_catalog_path(true);
165  }
166 
167  // Upload catalog
168  spooler->Upload(file_path_compressed, "data/" + hash_catalog.MakePath());
169  spooler->WaitForUpload();
170  unlink(file_path_compressed.c_str());
171  if (spooler->GetNumberOfErrors() > 0) {
172  LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalog %s",
173  file_path_compressed.c_str());
174  delete manifest;
175  return NULL;
176  }
177 
178  return manifest;
179 }
180 
181 
193 bool WritableCatalogManager::FindCatalog(const string &path,
194  WritableCatalog **result,
195  DirectoryEntry *dirent) {
196  const PathString ps_path(path);
197 
199  assert(best_fit != NULL);
200  Catalog *catalog = NULL;
201  const bool retval =
202  MountSubtree(ps_path, best_fit, true /* is_listable */, &catalog);
203  if (!retval)
204  return false;
205 
207  if (NULL == dirent) {
208  dirent = &dummy;
209  }
210  const bool found = catalog->LookupPath(ps_path, dirent);
211  if (!found || !catalog->IsWritable())
212  return false;
213 
214  *result = static_cast<WritableCatalog *>(catalog);
215  return true;
216 }
217 
218 
220  const std::string &path) {
221  WritableCatalog *result = NULL;
222  const bool retval = FindCatalog(MakeRelativePath(path), &result, NULL);
223  if (!retval)
224  return NULL;
225  return result;
226 }
227 
228 
234 void WritableCatalogManager::RemoveFile(const std::string &path) {
235  const string file_path = MakeRelativePath(path);
236  const string parent_path = GetParentPath(file_path);
237 
238  SyncLock();
239  WritableCatalog *catalog;
240  if (!FindCatalog(parent_path, &catalog)) {
241  PANIC(kLogStderr, "catalog for file '%s' cannot be found",
242  file_path.c_str());
243  }
244 
245  catalog->RemoveEntry(file_path);
246  SyncUnlock();
247 }
248 
249 
255 void WritableCatalogManager::RemoveDirectory(const std::string &path) {
256  const string directory_path = MakeRelativePath(path);
257  const string parent_path = GetParentPath(directory_path);
258 
259  SyncLock();
260  WritableCatalog *catalog;
261  DirectoryEntry parent_entry;
262  if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
263  PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
264  directory_path.c_str());
265  }
266 
267  parent_entry.set_linkcount(parent_entry.linkcount() - 1);
268 
269  catalog->RemoveEntry(directory_path);
270  catalog->UpdateEntry(parent_entry, parent_path);
271  if (parent_entry.IsNestedCatalogRoot()) {
272  LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
273  parent_path.c_str());
274  WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
275  catalog->parent());
276  parent_entry.set_is_nested_catalog_mountpoint(true);
277  parent_entry.set_is_nested_catalog_root(false);
278  parent_catalog->UpdateEntry(parent_entry, parent_path);
279  }
280  SyncUnlock();
281 }
282 
291 void WritableCatalogManager::Clone(const std::string destination,
292  const std::string source) {
293  const std::string relative_source = MakeRelativePath(source);
294 
295  DirectoryEntry source_dirent;
296  if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
297  PANIC(kLogStderr, "catalog for file '%s' cannot be found, aborting",
298  source.c_str());
299  }
300  if (source_dirent.IsDirectory()) {
301  PANIC(kLogStderr, "Trying to clone a directory: '%s', aborting",
302  source.c_str());
303  }
304 
305  // if the file is already there we remove it and we add it back
306  DirectoryEntry check_dirent;
307  const bool destination_already_present =
308  LookupPath(MakeRelativePath(destination), kLookupDefault, &check_dirent);
309  if (destination_already_present) {
310  this->RemoveFile(destination);
311  }
312 
313  DirectoryEntry destination_dirent(source_dirent);
314  std::string destination_dirname;
315  std::string destination_filename;
316  SplitPath(destination, &destination_dirname, &destination_filename);
317 
318  destination_dirent.name_.Assign(
319  NameString(destination_filename.c_str(), destination_filename.length()));
320 
321  // TODO(jblomer): clone is used by tarball engine and should eventually
322  // support extended attributes
323  this->AddFile(destination_dirent, empty_xattrs, destination_dirname);
324 }
325 
326 
333 void WritableCatalogManager::CloneTree(const std::string &from_dir,
334  const std::string &to_dir) {
335  // Sanitize input paths
336  if (from_dir.empty() || to_dir.empty())
337  PANIC(kLogStderr, "clone tree from or to root impossible");
338 
339  const std::string relative_source = MakeRelativePath(from_dir);
340  const std::string relative_dest = MakeRelativePath(to_dir);
341 
342  if (relative_source == relative_dest) {
343  PANIC(kLogStderr, "cannot clone tree into itself ('%s')", to_dir.c_str());
344  }
345  if (HasPrefix(relative_dest, relative_source + "/", false /*ignore_case*/)) {
347  "cannot clone tree into sub directory of source '%s' --> '%s'",
348  from_dir.c_str(), to_dir.c_str());
349  }
350 
351  DirectoryEntry source_dirent;
352  if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
353  PANIC(kLogStderr, "path '%s' cannot be found, aborting", from_dir.c_str());
354  }
355  if (!source_dirent.IsDirectory()) {
356  PANIC(kLogStderr, "CloneTree: source '%s' not a directory, aborting",
357  from_dir.c_str());
358  }
359 
360  DirectoryEntry dest_dirent;
361  if (LookupPath(relative_dest, kLookupDefault, &dest_dirent)) {
362  PANIC(kLogStderr, "destination '%s' exists, aborting", to_dir.c_str());
363  }
364 
365  const std::string dest_parent = GetParentPath(relative_dest);
366  DirectoryEntry dest_parent_dirent;
367  if (!LookupPath(dest_parent, kLookupDefault, &dest_parent_dirent)) {
368  PANIC(kLogStderr, "destination '%s' not on a known path, aborting",
369  to_dir.c_str());
370  }
371 
372  CloneTreeImpl(PathString(from_dir),
373  GetParentPath(to_dir),
374  NameString(GetFileName(to_dir)));
375 }
376 
377 
383  const std::string &dest_parent_dir,
384  const NameString &dest_name) {
385  LogCvmfs(kLogCatalog, kLogDebug, "cloning %s --> %s/%s", source_dir.c_str(),
386  dest_parent_dir.c_str(), dest_name.ToString().c_str());
387  const PathString relative_source(MakeRelativePath(source_dir.ToString()));
388 
389  DirectoryEntry source_dirent;
390  bool retval = LookupPath(relative_source, kLookupDefault, &source_dirent);
391  assert(retval);
392  assert(!source_dirent.IsBindMountpoint());
393 
394  DirectoryEntry dest_dirent(source_dirent);
395  dest_dirent.name_.Assign(dest_name);
396  // Just in case, reset the nested catalog markers
397  dest_dirent.set_is_nested_catalog_mountpoint(false);
398  dest_dirent.set_is_nested_catalog_root(false);
399 
400  XattrList xattrs;
401  if (source_dirent.HasXattrs()) {
402  retval = LookupXattrs(relative_source, &xattrs);
403  assert(retval);
404  }
405  AddDirectory(dest_dirent, xattrs, dest_parent_dir);
406 
407  std::string dest_dir = dest_parent_dir;
408  if (!dest_dir.empty())
409  dest_dir.push_back('/');
410  dest_dir += dest_name.ToString();
411  if (source_dirent.IsNestedCatalogRoot()
412  || source_dirent.IsNestedCatalogMountpoint()) {
413  CreateNestedCatalog(dest_dir);
414  }
415 
417  retval = Listing(relative_source, &ls, false /* expand_symlink */);
418  assert(retval);
419  for (unsigned i = 0; i < ls.size(); ++i) {
420  PathString sub_path(source_dir);
421  assert(!sub_path.IsEmpty());
422  sub_path.Append("/", 1);
423  sub_path.Append(ls[i].name().GetChars(), ls[i].name().GetLength());
424 
425  if (ls[i].IsDirectory()) {
426  CloneTreeImpl(sub_path, dest_dir, ls[i].name());
427  continue;
428  }
429 
430  // We break hard-links during cloning
431  ls[i].set_hardlink_group(0);
432  ls[i].set_linkcount(1);
433 
434  xattrs.Clear();
435  if (ls[i].HasXattrs()) {
436  retval = LookupXattrs(sub_path, &xattrs);
437  assert(retval);
438  }
439 
440  if (ls[i].IsChunkedFile()) {
441  FileChunkList chunks;
442  const std::string relative_sub_path =
443  MakeRelativePath(sub_path.ToString());
444  retval = ListFileChunks(PathString(relative_sub_path),
445  ls[i].hash_algorithm(), &chunks);
446  assert(retval);
447  AddChunkedFile(ls[i], xattrs, dest_dir, chunks);
448  } else {
449  AddFile(ls[i], xattrs, dest_dir);
450  }
451  }
452 }
453 
454 
463  const XattrList &xattrs,
464  const std::string &parent_directory) {
465  const string parent_path = MakeRelativePath(parent_directory);
466  string directory_path = parent_path + "/";
467  directory_path.append(entry.name().GetChars(), entry.name().GetLength());
468 
469  SyncLock();
470  WritableCatalog *catalog;
471  DirectoryEntry parent_entry;
472  if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
473  PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
474  directory_path.c_str());
475  }
476 
477  DirectoryEntry fixed_hardlink_count(entry);
478  fixed_hardlink_count.set_linkcount(2);
479  catalog->AddEntry(fixed_hardlink_count, xattrs, directory_path, parent_path);
480 
481  parent_entry.set_linkcount(parent_entry.linkcount() + 1);
482  catalog->UpdateEntry(parent_entry, parent_path);
483  if (parent_entry.IsNestedCatalogRoot()) {
484  LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
485  parent_path.c_str());
486  WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
487  catalog->parent());
488  parent_entry.set_is_nested_catalog_mountpoint(true);
489  parent_entry.set_is_nested_catalog_root(false);
490  parent_catalog->UpdateEntry(parent_entry, parent_path);
491  }
492  SyncUnlock();
493 }
494 
503  const XattrList &xattrs,
504  const std::string &parent_directory) {
505  const string parent_path = MakeRelativePath(parent_directory);
506  const string file_path = entry.GetFullPath(parent_path);
507 
508  SyncLock();
509  WritableCatalog *catalog;
510  if (!FindCatalog(parent_path, &catalog)) {
511  PANIC(kLogStderr, "catalog for file '%s' cannot be found",
512  file_path.c_str());
513  }
514 
515  assert(!entry.IsRegular() || entry.IsChunkedFile()
516  || !entry.checksum().IsNull());
517  assert(entry.IsRegular() || !entry.IsExternalFile());
518 
519  // check if file is too big
520  const unsigned mbytes = entry.size() / (1024 * 1024);
521  if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
523  "%s: file at %s is larger than %u megabytes (%u). "
524  "CernVM-FS works best with small files. "
525  "Please remove the file or increase the limit.",
526  enforce_limits_ ? "FATAL" : "WARNING", file_path.c_str(),
527  file_mbyte_limit_, mbytes);
528  if (enforce_limits_)
529  PANIC(kLogStderr, "file at %s is larger than %u megabytes (%u).",
530  file_path.c_str(), file_mbyte_limit_, mbytes);
531  }
532 
533  catalog->AddEntry(entry, xattrs, file_path, parent_path);
534  SyncUnlock();
535 }
536 
537 
539  const XattrList &xattrs,
540  const std::string &parent_directory,
541  const FileChunkList &file_chunks) {
542  assert(file_chunks.size() > 0);
543 
544  DirectoryEntry full_entry(entry);
545  full_entry.set_is_chunked_file(true);
546 
547  AddFile(full_entry, xattrs, parent_directory);
548 
549  const string parent_path = MakeRelativePath(parent_directory);
550  const string file_path = entry.GetFullPath(parent_path);
551 
552  SyncLock();
553  WritableCatalog *catalog;
554  if (!FindCatalog(parent_path, &catalog)) {
555  PANIC(kLogStderr, "catalog for file '%s' cannot be found",
556  file_path.c_str());
557  }
558 
559  for (unsigned i = 0; i < file_chunks.size(); ++i) {
560  catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
561  }
562  SyncUnlock();
563 }
564 
565 
574  const DirectoryEntryBaseList &entries,
575  const XattrList &xattrs,
576  const std::string &parent_directory,
577  const FileChunkList &file_chunks) {
578  assert(entries.size() >= 1);
579  assert(file_chunks.IsEmpty() || entries[0].IsRegular());
580  if (entries.size() == 1) {
581  DirectoryEntry fix_linkcount(entries[0]);
582  fix_linkcount.set_linkcount(1);
583  if (file_chunks.IsEmpty())
584  return AddFile(fix_linkcount, xattrs, parent_directory);
585  return AddChunkedFile(fix_linkcount, xattrs, parent_directory, file_chunks);
586  }
587 
588  LogCvmfs(kLogCatalog, kLogVerboseMsg, "adding hardlink group %s/%s",
589  parent_directory.c_str(), entries[0].name().c_str());
590 
591  // Hardlink groups have to reside in the same directory.
592  // Therefore we only have one parent directory here
593  const string parent_path = MakeRelativePath(parent_directory);
594 
595  // check if hard link is too big
596  const unsigned mbytes = entries[0].size() / (1024 * 1024);
597  if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
599  "%s: hard link at %s is larger than %u megabytes (%u). "
600  "CernVM-FS works best with small files. "
601  "Please remove the file or increase the limit.",
602  enforce_limits_ ? "FATAL" : "WARNING",
603  (parent_path + entries[0].name().ToString()).c_str(),
604  file_mbyte_limit_, mbytes);
605  if (enforce_limits_)
606  PANIC(kLogStderr, "hard link at %s is larger than %u megabytes (%u)",
607  (parent_path + entries[0].name().ToString()).c_str(),
608  file_mbyte_limit_, mbytes);
609  }
610 
611  SyncLock();
612  WritableCatalog *catalog;
613  if (!FindCatalog(parent_path, &catalog)) {
615  "catalog for hardlink group containing '%s' cannot be found",
616  parent_path.c_str());
617  }
618 
619  // Get a valid hardlink group id for the catalog the group will end up in
620  // TODO(unknown): Compaction
621  const uint32_t new_group_id = catalog->GetMaxLinkId() + 1;
622  LogCvmfs(kLogCatalog, kLogVerboseMsg, "hardlink group id %u issued",
623  new_group_id);
624  assert(new_group_id > 0);
625 
626  // Add the file entries to the catalog
627  for (DirectoryEntryBaseList::const_iterator i = entries.begin(),
628  iEnd = entries.end();
629  i != iEnd;
630  ++i) {
631  string file_path = parent_path + "/";
632  file_path.append(i->name().GetChars(), i->name().GetLength());
633 
634  // create a fully fledged DirectoryEntry to add the hardlink group to it
635  // which is CVMFS specific meta data.
636  DirectoryEntry hardlink(*i);
637  hardlink.set_hardlink_group(new_group_id);
638  hardlink.set_linkcount(entries.size());
639  hardlink.set_is_chunked_file(!file_chunks.IsEmpty());
640 
641  catalog->AddEntry(hardlink, xattrs, file_path, parent_path);
642  if (hardlink.IsChunkedFile()) {
643  for (unsigned i = 0; i < file_chunks.size(); ++i) {
644  catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
645  }
646  }
647  }
648  SyncUnlock();
649 }
650 
651 
652 void WritableCatalogManager::ShrinkHardlinkGroup(const string &remove_path) {
653  const string relative_path = MakeRelativePath(remove_path);
654 
655  SyncLock();
656  WritableCatalog *catalog;
657  if (!FindCatalog(relative_path, &catalog)) {
659  "catalog for hardlink group containing '%s' cannot be found",
660  remove_path.c_str());
661  }
662 
663  catalog->IncLinkcount(relative_path, -1);
664  SyncUnlock();
665 }
666 
667 
677  const XattrList &xattrs,
678  const std::string &directory_path) {
679  assert(entry.IsDirectory());
680 
681  const string entry_path = MakeRelativePath(directory_path);
682  const string parent_path = GetParentPath(entry_path);
683 
684  SyncLock();
685  // find the catalog to be updated
686  WritableCatalog *catalog;
687  if (!FindCatalog(parent_path, &catalog)) {
688  PANIC(kLogStderr, "catalog for entry '%s' cannot be found",
689  entry_path.c_str());
690  }
691 
692  catalog->TouchEntry(entry, xattrs, entry_path);
693 
694  // since we deal with a directory here, we might just touch a
695  // nested catalog transition point. If this is the case we would need to
696  // update two catalog entries:
697  // * the nested catalog MOUNTPOINT in the parent catalog
698  // * the nested catalog ROOT in the nested catalog
699 
700  // first check if we really have a nested catalog transition point
701  catalog::DirectoryEntry potential_transition_point;
702  const PathString transition_path(entry_path.data(), entry_path.length());
703  bool retval = catalog->LookupPath(transition_path,
704  &potential_transition_point);
705  assert(retval);
706  if (potential_transition_point.IsNestedCatalogMountpoint()) {
707  LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point at %s",
708  entry_path.c_str());
709 
710  // find and mount nested catalog associated to this transition point
711  shash::Any nested_hash;
712  uint64_t nested_size;
713  retval = catalog->FindNested(transition_path, &nested_hash, &nested_size);
714  assert(retval);
715  Catalog *nested_catalog;
716  nested_catalog = MountCatalog(transition_path, nested_hash, catalog);
717  assert(nested_catalog != NULL);
718 
719  // update nested catalog root in the child catalog
720  reinterpret_cast<WritableCatalog *>(nested_catalog)
721  ->TouchEntry(entry, xattrs, entry_path);
722  }
723 
724  SyncUnlock();
725 }
726 
727 
735  const std::string &mountpoint) {
736  const string nested_root_path = MakeRelativePath(mountpoint);
737  const PathString ps_nested_root_path(nested_root_path);
738 
739  SyncLock();
740  // Find the catalog currently containing the directory structure, which
741  // will be represented as a new nested catalog and its root-entry/mountpoint
742  // along the way
743  WritableCatalog *old_catalog = NULL;
744  DirectoryEntry new_root_entry;
745  if (!FindCatalog(nested_root_path, &old_catalog, &new_root_entry)) {
747  "failed to create nested catalog '%s': "
748  "mountpoint was not found in current catalog structure",
749  nested_root_path.c_str());
750  }
751 
752  // Create the database schema and the initial root entry
753  // for the new nested catalog
754  const string database_file_path = CreateTempPath(dir_temp() + "/catalog",
755  0666);
756  const bool volatile_content = false;
757  CatalogDatabase *new_catalog_db = CatalogDatabase::Create(database_file_path);
758  assert(NULL != new_catalog_db);
759  // Note we do not set the external_data bit for nested catalogs
760  bool retval = new_catalog_db->InsertInitialValues(
761  nested_root_path,
762  volatile_content,
763  "", // At this point, only root
764  // catalog gets VOMS authz
765  new_root_entry);
766  assert(retval);
767  // TODO(rmeusel): we need a way to attach a catalog directly from an open
768  // database to remove this indirection
769  delete new_catalog_db;
770  new_catalog_db = NULL;
771 
772  // Attach the just created nested catalog
773  Catalog *new_catalog = CreateCatalog(ps_nested_root_path, shash::Any(),
774  old_catalog);
775  retval = AttachCatalog(database_file_path, new_catalog);
776  assert(retval);
777 
778  assert(new_catalog->IsWritable());
779  WritableCatalog *wr_new_catalog = static_cast<WritableCatalog *>(new_catalog);
780 
781  if (new_root_entry.HasXattrs()) {
782  XattrList xattrs;
783  retval = old_catalog->LookupXattrsPath(ps_nested_root_path, &xattrs);
784  assert(retval);
785  wr_new_catalog->TouchEntry(new_root_entry, xattrs, nested_root_path);
786  }
787 
788  // From now on, there are two catalogs, spanning the same directory structure
789  // we have to split the overlapping directory entries from the old catalog
790  // to the new catalog to re-gain a valid catalog structure
791  old_catalog->Partition(wr_new_catalog);
792 
793  // Add the newly created nested catalog to the references of the containing
794  // catalog
795  old_catalog->InsertNestedCatalog(new_catalog->mountpoint().ToString(), NULL,
796  shash::Any(spooler_->GetHashAlgorithm()), 0);
797 
798  // Fix subtree counters in new nested catalogs: subtree is the sum of all
799  // entries of all "grand-nested" catalogs
800  // Note: taking a copy of the nested catalog list here
802  &grand_nested = wr_new_catalog->ListOwnNestedCatalogs();
803  DeltaCounters fix_subtree_counters;
804  for (Catalog::NestedCatalogList::const_iterator i = grand_nested.begin(),
805  iEnd = grand_nested.end();
806  i != iEnd;
807  ++i) {
808  WritableCatalog *grand_catalog;
809  retval = FindCatalog(i->mountpoint.ToString(), &grand_catalog);
810  assert(retval);
811  const Counters &grand_counters = grand_catalog->GetCounters();
812  grand_counters.AddAsSubtree(&fix_subtree_counters);
813  }
814  const DeltaCounters save_counters = wr_new_catalog->delta_counters_;
815  wr_new_catalog->delta_counters_ = fix_subtree_counters;
816  wr_new_catalog->UpdateCounters();
817  wr_new_catalog->delta_counters_ = save_counters;
818 
819  SyncUnlock();
820 }
821 
822 
834 void WritableCatalogManager::RemoveNestedCatalog(const string &mountpoint,
835  const bool merge) {
836  const string nested_root_path = MakeRelativePath(mountpoint);
837 
838  SyncLock();
839  // Find the catalog which should be removed
840  WritableCatalog *nested_catalog = NULL;
841  if (!FindCatalog(nested_root_path, &nested_catalog)) {
843  "failed to remove nested catalog '%s': "
844  "mountpoint was not found in current catalog structure",
845  nested_root_path.c_str());
846  }
847 
848  // Check if the found catalog is really the nested catalog to be deleted
849  assert(!nested_catalog->IsRoot()
850  && (nested_catalog->mountpoint().ToString() == nested_root_path));
851 
852  if (merge) {
853  // Merge all data from the nested catalog into it's parent
854  nested_catalog->MergeIntoParent();
855  } else {
856  nested_catalog->RemoveFromParent();
857  }
858 
859  // Delete the catalog database file from the working copy
860  if (unlink(nested_catalog->database_path().c_str()) != 0) {
862  "unable to delete the removed nested catalog database file '%s'",
863  nested_catalog->database_path().c_str());
864  }
865 
866  // Remove the catalog from internal data structures
867  DetachCatalog(nested_catalog);
868  SyncUnlock();
869 }
870 
871 
883 void WritableCatalogManager::SwapNestedCatalog(const string &mountpoint,
884  const shash::Any &new_hash,
885  const uint64_t new_size) {
886  const string nested_root_path = MakeRelativePath(mountpoint);
887  const string parent_path = GetParentPath(nested_root_path);
888  const PathString nested_root_ps = PathString(nested_root_path);
889 
890  SyncLock();
891 
892  // Find the immediate parent catalog
893  WritableCatalog *parent = NULL;
894  if (!FindCatalog(parent_path, &parent)) {
895  SyncUnlock(); // this is needed for the unittest. otherwise they get stuck
897  "failed to swap nested catalog '%s': could not find parent '%s'",
898  nested_root_path.c_str(), parent_path.c_str());
899  }
900 
901  // Get old nested catalog counters
902  Catalog *old_attached_catalog = parent->FindChild(nested_root_ps);
903  Counters old_counters;
904  if (old_attached_catalog) {
905  // Old catalog was already attached (e.g. as a child catalog
906  // attached by a prior call to CreateNestedCatalog()). Ensure
907  // that it has not been modified, get counters, and detach it.
908  WritableCatalogList list;
909  if (GetModifiedCatalogLeafsRecursively(old_attached_catalog, &list)) {
910  SyncUnlock();
912  "failed to swap nested catalog '%s': already modified",
913  nested_root_path.c_str());
914  }
915  old_counters = old_attached_catalog->GetCounters();
916  DetachSubtree(old_attached_catalog);
917 
918  } else {
919  // Old catalog was not attached. Download a freely attached
920  // version and get counters.
921  shash::Any old_hash;
922  uint64_t old_size;
923  const bool old_found = parent->FindNested(nested_root_ps, &old_hash,
924  &old_size);
925  if (!old_found) {
926  SyncUnlock();
928  "failed to swap nested catalog '%s': not found in parent",
929  nested_root_path.c_str());
930  }
931  const UniquePtr<Catalog> old_free_catalog(
932  LoadFreeCatalog(nested_root_ps, old_hash));
933  if (!old_free_catalog.IsValid()) {
934  SyncUnlock();
936  "failed to swap nested catalog '%s': failed to load old catalog",
937  nested_root_path.c_str());
938  }
939  old_counters = old_free_catalog->GetCounters();
940  }
941 
942  // Load freely attached new catalog
943  const UniquePtr<Catalog> new_catalog(
944  LoadFreeCatalog(nested_root_ps, new_hash));
945  if (!new_catalog.IsValid()) {
946  SyncUnlock();
948  "failed to swap nested catalog '%s': failed to load new catalog",
949  nested_root_path.c_str());
950  }
951 
952  // Get new catalog root directory entry
953  DirectoryEntry dirent;
954  XattrList xattrs;
955  const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
956  if (!dirent_found) {
957  SyncUnlock();
959  "failed to swap nested catalog '%s': missing dirent in new catalog",
960  nested_root_path.c_str());
961  }
962  if (dirent.HasXattrs()) {
963  const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
964  &xattrs);
965  if (!xattrs_found) {
966  SyncUnlock();
968  "failed to swap nested catalog '%s': missing xattrs in new catalog",
969  nested_root_path.c_str());
970  }
971  }
972 
973  // Swap catalogs
974  parent->RemoveNestedCatalog(nested_root_path, NULL);
975  parent->InsertNestedCatalog(nested_root_path, NULL, new_hash, new_size);
976 
977  // Update parent directory entry
979  dirent.set_is_nested_catalog_root(false);
980  parent->UpdateEntry(dirent, nested_root_path);
981  parent->TouchEntry(dirent, xattrs, nested_root_path);
982 
983  // Update counters
984  const DeltaCounters delta =
985  Counters::Diff(old_counters, new_catalog->GetCounters());
986  delta.PopulateToParent(&parent->delta_counters_);
987 
988  SyncUnlock();
989 }
990 
1003 void WritableCatalogManager::GraftNestedCatalog(const string &mountpoint,
1004  const shash::Any &new_hash,
1005  const uint64_t new_size) {
1006  const string nested_root_path = MakeRelativePath(mountpoint);
1007  const string parent_path = GetParentPath(nested_root_path);
1008  const PathString nested_root_ps = PathString(nested_root_path);
1009 
1010  assert(!nested_root_path.empty());
1011 
1012  // Load freely attached new catalog
1013  const UniquePtr<Catalog> new_catalog(
1014  LoadFreeCatalog(nested_root_ps, new_hash));
1015  if (!new_catalog.IsValid()) {
1016  PANIC(kLogStderr,
1017  "failed to graft nested catalog '%s': failed to load new catalog",
1018  nested_root_path.c_str());
1019  }
1020  if (new_catalog->root_prefix() != nested_root_ps) {
1021  PANIC(kLogStderr,
1022  "invalid nested catalog for grafting at '%s': catalog rooted at '%s'",
1023  nested_root_path.c_str(),
1024  new_catalog->root_prefix().ToString().c_str());
1025  }
1026 
1027  // Get new catalog root directory entry
1028  DirectoryEntry dirent;
1029  XattrList xattrs;
1030  const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
1031  if (!dirent_found) {
1032  PANIC(kLogStderr,
1033  "failed to swap nested catalog '%s': missing dirent in new catalog",
1034  nested_root_path.c_str());
1035  }
1036  if (dirent.HasXattrs()) {
1037  const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
1038  &xattrs);
1039  if (!xattrs_found) {
1040  PANIC(kLogStderr,
1041  "failed to swap nested catalog '%s': missing xattrs in new catalog",
1042  nested_root_path.c_str());
1043  }
1044  }
1045  // Transform the nested catalog root into a transition point to be inserted
1046  // in the parent catalog
1047  dirent.set_is_nested_catalog_root(false);
1048  dirent.set_is_nested_catalog_mountpoint(true);
1049 
1050  // Add directory and nested catalog
1051 
1052  SyncLock();
1053  WritableCatalog *parent_catalog;
1054  DirectoryEntry parent_entry;
1055  if (!FindCatalog(parent_path, &parent_catalog, &parent_entry)) {
1056  SyncUnlock();
1057  PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1058  parent_path.c_str());
1059  }
1060  if (parent_catalog->LookupPath(nested_root_ps, NULL)) {
1061  SyncUnlock();
1062  PANIC(kLogStderr,
1063  "invalid attempt to graft nested catalog into existing "
1064  "directory '%s'",
1065  nested_root_path.c_str());
1066  }
1067  parent_catalog->AddEntry(dirent, xattrs, nested_root_path, parent_path);
1068  parent_entry.set_linkcount(parent_entry.linkcount() + 1);
1069  parent_catalog->UpdateEntry(parent_entry, parent_path);
1070  if (parent_entry.IsNestedCatalogRoot()) {
1071  WritableCatalog *grand_parent_catalog = reinterpret_cast<WritableCatalog *>(
1072  parent_catalog->parent());
1073  parent_entry.set_is_nested_catalog_root(false);
1074  parent_entry.set_is_nested_catalog_mountpoint(true);
1075  grand_parent_catalog->UpdateEntry(parent_entry, parent_path);
1076  }
1077 
1078  parent_catalog->InsertNestedCatalog(nested_root_path, NULL, new_hash,
1079  new_size);
1080 
1081  // Fix-up counters
1082  const Counters counters;
1083  const DeltaCounters delta =
1084  Counters::Diff(counters, new_catalog->GetCounters());
1085  delta.PopulateToParent(&parent_catalog->delta_counters_);
1086 
1087  SyncUnlock();
1088 }
1089 
1093 bool WritableCatalogManager::IsTransitionPoint(const string &mountpoint) {
1094  const string path = MakeRelativePath(mountpoint);
1095 
1096  SyncLock();
1097  WritableCatalog *catalog;
1098  DirectoryEntry entry;
1099  if (!FindCatalog(path, &catalog, &entry)) {
1100  PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1101  path.c_str());
1102  }
1103  const bool result = entry.IsNestedCatalogRoot();
1104  SyncUnlock();
1105  return result;
1106 }
1107 
1108 
1110  // TODO(jblomer): meant for micro catalogs
1111 }
1112 
1113 
1114 void WritableCatalogManager::SetTTL(const uint64_t new_ttl) {
1115  SyncLock();
1116  reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetTTL(new_ttl);
1117  SyncUnlock();
1118 }
1119 
1120 
1121 bool WritableCatalogManager::SetVOMSAuthz(const std::string &voms_authz) {
1122  bool result;
1123  SyncLock();
1124  result = reinterpret_cast<WritableCatalog *>(GetRootCatalog())
1125  ->SetVOMSAuthz(voms_authz);
1126  SyncUnlock();
1127  return result;
1128 }
1129 
1130 
1131 bool WritableCatalogManager::Commit(const bool stop_for_tweaks,
1132  const uint64_t manual_revision,
1134  WritableCatalog *root_catalog = reinterpret_cast<WritableCatalog *>(
1135  GetRootCatalog());
1136  root_catalog->SetDirty();
1137 
1138  // set root catalog revision to manually provided number if available
1139  if (manual_revision > 0) {
1140  const uint64_t revision = root_catalog->GetRevision();
1141  if (revision >= manual_revision) {
1143  "Manual revision (%" PRIu64 ") must not be "
1144  "smaller than the current root catalog's (%" PRIu64
1145  "). Skipped!",
1146  manual_revision, revision);
1147  } else {
1148  // Gets incremented by FinalizeCatalog() afterwards!
1149  root_catalog->SetRevision(manual_revision - 1);
1150  }
1151  }
1152 
1153  // do the actual catalog snapshotting and upload
1154  CatalogInfo root_catalog_info;
1155  if (getenv("_CVMFS_SERIALIZED_CATALOG_PROCESSING_") == NULL)
1156  root_catalog_info = SnapshotCatalogs(stop_for_tweaks);
1157  else
1158  root_catalog_info = SnapshotCatalogsSerialized(stop_for_tweaks);
1159  if (spooler_->GetNumberOfErrors() > 0) {
1160  LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalogs");
1161  return false;
1162  }
1163 
1164  // .cvmfspublished export
1165  LogCvmfs(kLogCatalog, kLogVerboseMsg, "Committing repository manifest");
1166  set_base_hash(root_catalog_info.content_hash);
1167 
1168  manifest->set_catalog_hash(root_catalog_info.content_hash);
1169  manifest->set_catalog_size(root_catalog_info.size);
1170  manifest->set_root_path("");
1171  manifest->set_ttl(root_catalog_info.ttl);
1172  manifest->set_revision(root_catalog_info.revision);
1173 
1174  return true;
1175 }
1176 
1177 
1205  const bool stop_for_tweaks) {
1206  // prepare environment for parallel processing
1207  Future<CatalogInfo> root_catalog_info_future;
1208  CatalogUploadContext upload_context;
1209  upload_context.root_catalog_info = &root_catalog_info_future;
1210  upload_context.stop_for_tweaks = stop_for_tweaks;
1211 
1213  this, upload_context);
1214 
1215  // find dirty leaf catalogs and annotate non-leaf catalogs (dirty child count)
1216  // post-condition: the entire catalog tree is ready for concurrent processing
1217  WritableCatalogList leafs_to_snapshot;
1218  GetModifiedCatalogLeafs(&leafs_to_snapshot);
1219 
1220  // finalize and schedule the catalog processing
1221  WritableCatalogList::const_iterator i = leafs_to_snapshot.begin();
1222  const WritableCatalogList::const_iterator iend = leafs_to_snapshot.end();
1223  for (; i != iend; ++i) {
1224  FinalizeCatalog(*i, stop_for_tweaks);
1226  }
1227 
1228  LogCvmfs(kLogCatalog, kLogVerboseMsg, "waiting for upload of catalogs");
1229  const CatalogInfo &root_catalog_info = root_catalog_info_future.Get();
1230  spooler_->WaitForUpload();
1231 
1232  spooler_->UnregisterListeners();
1233  return root_catalog_info;
1234 }
1235 
1236 
1238  const bool stop_for_tweaks) {
1239  // update meta information of this catalog
1240  LogCvmfs(kLogCatalog, kLogVerboseMsg, "creating snapshot of catalog '%s'",
1241  catalog->mountpoint().c_str());
1242 
1243  catalog->UpdateCounters();
1244  catalog->UpdateLastModified();
1245  catalog->IncrementRevision();
1246 
1247  // update the previous catalog revision pointer
1248  if (catalog->IsRoot()) {
1250  "setting '%s' as previous revision "
1251  "for root catalog",
1252  base_hash().ToStringWithSuffix().c_str());
1253  catalog->SetPreviousRevision(base_hash());
1254  } else {
1255  // Multiple catalogs might query the parent concurrently
1256  SyncLock();
1257  shash::Any hash_previous;
1258  uint64_t size_previous;
1259  const bool retval = catalog->parent()->FindNested(
1260  catalog->mountpoint(), &hash_previous, &size_previous);
1261  assert(retval);
1262  SyncUnlock();
1263 
1265  "found '%s' as previous revision "
1266  "for nested catalog '%s'",
1267  hash_previous.ToStringWithSuffix().c_str(),
1268  catalog->mountpoint().c_str());
1269  catalog->SetPreviousRevision(hash_previous);
1270  }
1271  catalog->Commit();
1272 
1273  // check if catalog has too many entries
1274  const uint64_t catalog_limit =
1275  uint64_t(1000) * uint64_t((catalog->IsRoot() ? root_kcatalog_limit_
1277  if ((catalog_limit > 0)
1278  && (catalog->GetCounters().GetSelfEntries() > catalog_limit)) {
1280  "%s: catalog at %s has more than %lu entries (%lu). "
1281  "Large catalogs stress the CernVM-FS transport infrastructure. "
1282  "Please split it into nested catalogs or increase the limit.",
1283  enforce_limits_ ? "FATAL" : "WARNING",
1284  (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1285  catalog_limit, catalog->GetCounters().GetSelfEntries());
1286  if (enforce_limits_)
1287  PANIC(kLogStderr, "catalog at %s has more than %u entries (%u). ",
1288  (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1289  catalog_limit, catalog->GetCounters().GetSelfEntries());
1290  }
1291 
1292  // allow for manual adjustments in the catalog
1293  if (stop_for_tweaks) {
1295  "Allowing for tweaks in %s at %s "
1296  "(hit return to continue)",
1297  catalog->database_path().c_str(), catalog->mountpoint().c_str());
1298  const int read_char = getchar();
1299  assert(read_char != EOF);
1300  }
1301 
1302  // compaction of bloated catalogs (usually after high database churn)
1303  catalog->VacuumDatabaseIfNecessary();
1304 }
1305 
1306 
1308  WritableCatalog *catalog) {
1309  {
1311  // register catalog object for WritableCatalogManager::CatalogUploadCallback
1312  catalog_processing_map_[catalog->database_path()] = catalog;
1313  }
1314  spooler_->ProcessCatalog(catalog->database_path());
1315 }
1316 
1326  const upload::SpoolerResult &result) {
1327  std::string tmp_catalog_path;
1328  const std::string cache_catalog_path = dir_cache_ + "/"
1329  + result.content_hash
1331  FILE *fcatalog = CreateTempFile(dir_cache_ + "/txn/catalog", 0666, "w",
1332  &tmp_catalog_path);
1333  if (!fcatalog) {
1335  "Creating file for temporary catalog failed: %s",
1336  tmp_catalog_path.c_str());
1337  }
1338  CopyPath2File(result.local_path.c_str(), fcatalog);
1339  (void)fclose(fcatalog);
1340 
1341  if (rename(tmp_catalog_path.c_str(), cache_catalog_path.c_str()) != 0) {
1342  PANIC(kLogDebug | kLogStderr, "Failed to copy catalog from %s to cache %s",
1343  result.local_path.c_str(), cache_catalog_path.c_str());
1344  }
1345  return true;
1346 }
1347 
1349  const upload::SpoolerResult &result,
1350  const CatalogUploadContext catalog_upload_context) {
1351  if (result.return_code != 0) {
1352  PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1353  result.local_path.c_str(), result.return_code);
1354  }
1355 
1356  // retrieve the catalog object based on the callback information
1357  // see WritableCatalogManager::ScheduleCatalogProcessing()
1358  WritableCatalog *catalog = NULL;
1359  {
1361  const std::map<std::string, WritableCatalog *>::iterator c =
1362  catalog_processing_map_.find(result.local_path);
1363  assert(c != catalog_processing_map_.end());
1364  catalog = c->second;
1365  }
1366 
1367  const uint64_t catalog_size = GetFileSize(result.local_path);
1368  assert(catalog_size > 0);
1369 
1370  if (UseLocalCache()) {
1371  CopyCatalogToLocalCache(result);
1372  }
1373 
1374  SyncLock();
1375  if (catalog->HasParent()) {
1376  // finalized nested catalogs will update their parent's pointer and schedule
1377  // them for processing (continuation) if the 'dirty children count' == 0
1378  LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1379  WritableCatalog *parent = catalog->GetWritableParent();
1380 
1381  parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1382  result.content_hash,
1383  catalog_size,
1384  catalog->delta_counters_);
1385  catalog->delta_counters_.SetZero();
1386 
1387  const int remaining_dirty_children = catalog->GetWritableParent()
1389 
1390  SyncUnlock();
1391 
1392  // continuation of the dirty catalog tree traversal
1393  // see WritableCatalogManager::SnapshotCatalogs()
1394  if (remaining_dirty_children == 0) {
1395  FinalizeCatalog(parent, catalog_upload_context.stop_for_tweaks);
1396  ScheduleCatalogProcessing(parent);
1397  }
1398 
1399  } else if (catalog->IsRoot()) {
1400  // once the root catalog is reached, we are done with processing and report
1401  // back to the main via a Future<> and provide the necessary information
1402  CatalogInfo root_catalog_info;
1403  root_catalog_info.size = catalog_size;
1404  root_catalog_info.ttl = catalog->GetTTL();
1405  root_catalog_info.content_hash = result.content_hash;
1406  root_catalog_info.revision = catalog->GetRevision();
1407  catalog_upload_context.root_catalog_info->Set(root_catalog_info);
1408  SyncUnlock();
1409  } else {
1410  PANIC(kLogStderr, "inconsistent state detected");
1411  }
1412 }
1413 
1414 
1426  Catalog *catalog, WritableCatalogList *result) const {
1427  WritableCatalog *wr_catalog = static_cast<WritableCatalog *>(catalog);
1428 
1429  // Look for dirty catalogs in the descendants of *catalog
1430  int dirty_children = 0;
1431  CatalogList children = wr_catalog->GetChildren();
1432  CatalogList::const_iterator i = children.begin();
1433  const CatalogList::const_iterator iend = children.end();
1434  for (; i != iend; ++i) {
1435  if (GetModifiedCatalogLeafsRecursively(*i, result)) {
1436  ++dirty_children;
1437  }
1438  }
1439 
1440  // a catalog is dirty if itself or one of its children has changed
1441  // a leaf catalog doesn't have any dirty children
1442  wr_catalog->set_dirty_children(dirty_children);
1443  const bool is_dirty = wr_catalog->IsDirty() || dirty_children > 0;
1444  const bool is_leaf = dirty_children == 0;
1445  if (is_dirty && is_leaf) {
1446  result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1447  }
1448 
1449  return is_dirty;
1450 }
1451 
1452 
1454  CatalogList catalog_list = GetCatalogs();
1455  reverse(catalog_list.begin(), catalog_list.end());
1456  for (unsigned i = 0; i < catalog_list.size(); ++i) {
1457  FixWeight(static_cast<WritableCatalog *>(catalog_list[i]));
1458  }
1459 }
1460 
1462  // firstly check underflow because they can provoke overflows
1463  if (catalog->GetNumEntries() < min_weight_ && !catalog->IsRoot()
1464  && catalog->IsAutogenerated()) {
1466  "Deleting an autogenerated catalog in '%s'",
1467  catalog->mountpoint().c_str());
1468  // Remove the .cvmfscatalog and .cvmfsautocatalog files first
1469  const string path = catalog->mountpoint().ToString();
1470  catalog->RemoveEntry(path + "/.cvmfscatalog");
1471  catalog->RemoveEntry(path + "/.cvmfsautocatalog");
1472  // Remove the actual catalog
1473  const string catalog_path = catalog->mountpoint().ToString().substr(1);
1474  RemoveNestedCatalog(catalog_path);
1475  } else if (catalog->GetNumEntries() > max_weight_) {
1476  CatalogBalancer<WritableCatalogManager> catalog_balancer(this);
1477  catalog_balancer.Balance(catalog);
1478  }
1479 }
1480 
1481 
1482 //****************************************************************************
1483 // Workaround -- Serialized Catalog Committing
1484 
1486  const Catalog *catalog, WritableCatalogList *result) const {
1487  // A catalog must be snapshot, if itself or one of it's descendants is dirty.
1488  // So we traverse the catalog tree recursively and look for dirty catalogs
1489  // on the way.
1490  const WritableCatalog *wr_catalog = static_cast<const WritableCatalog *>(
1491  catalog);
1492  // This variable will contain the number of dirty catalogs in the sub tree
1493  // with *catalog as it's root.
1494  int dirty_catalogs = (wr_catalog->IsDirty()) ? 1 : 0;
1495 
1496  // Look for dirty catalogs in the descendants of *catalog
1497  CatalogList children = wr_catalog->GetChildren();
1498  for (CatalogList::const_iterator i = children.begin(), iEnd = children.end();
1499  i != iEnd; ++i) {
1500  dirty_catalogs += GetModifiedCatalogsRecursively(*i, result);
1501  }
1502 
1503  // If we found a dirty catalog in the checked sub tree, the root (*catalog)
1504  // must be snapshot and ends up in the result list
1505  if (dirty_catalogs > 0)
1506  result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1507 
1508  // tell the upper layer about number of catalogs
1509  return dirty_catalogs;
1510 }
1511 
1512 
1514  const upload::SpoolerResult &result, const CatalogUploadContext unused) {
1515  if (result.return_code != 0) {
1516  PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1517  result.local_path.c_str(), result.return_code);
1518  }
1519 
1520  if (UseLocalCache()) {
1521  CopyCatalogToLocalCache(result);
1522  }
1523 
1524  unlink(result.local_path.c_str());
1525 }
1526 
1527 
1530  LogCvmfs(kLogCvmfs, kLogStdout, "Serialized committing of file catalogs...");
1531  reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetDirty();
1532  WritableCatalogList catalogs_to_snapshot;
1533  GetModifiedCatalogs(&catalogs_to_snapshot);
1534  CatalogUploadContext unused;
1535  unused.root_catalog_info = NULL;
1536  unused.stop_for_tweaks = false;
1537  spooler_->RegisterListener(
1539 
1540  CatalogInfo root_catalog_info;
1541  WritableCatalogList::const_iterator i = catalogs_to_snapshot.begin();
1542  const WritableCatalogList::const_iterator iend = catalogs_to_snapshot.end();
1543  for (; i != iend; ++i) {
1544  FinalizeCatalog(*i, stop_for_tweaks);
1545 
1546  // Compress and upload catalog
1547  shash::Any hash_catalog(spooler_->GetHashAlgorithm(),
1549  if (!zlib::CompressPath2Null((*i)->database_path(), &hash_catalog)) {
1550  PANIC(kLogStderr, "could not compress catalog %s",
1551  (*i)->mountpoint().ToString().c_str());
1552  }
1553 
1554  const int64_t catalog_size = GetFileSize((*i)->database_path());
1555  assert(catalog_size > 0);
1556 
1557  if ((*i)->HasParent()) {
1558  LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1559  WritableCatalog *parent = (*i)->GetWritableParent();
1560  parent->UpdateNestedCatalog((*i)->mountpoint().ToString(), hash_catalog,
1561  catalog_size, (*i)->delta_counters_);
1562  (*i)->delta_counters_.SetZero();
1563  } else if ((*i)->IsRoot()) {
1564  root_catalog_info.size = catalog_size;
1565  root_catalog_info.ttl = (*i)->GetTTL();
1566  root_catalog_info.content_hash = hash_catalog;
1567  root_catalog_info.revision = (*i)->GetRevision();
1568  } else {
1569  PANIC(kLogStderr, "inconsistent state detected");
1570  }
1571 
1572  spooler_->ProcessCatalog((*i)->database_path());
1573  }
1574  spooler_->WaitForUpload();
1575 
1576  spooler_->UnregisterListeners();
1577  return root_catalog_info;
1578 }
1579 
1581 {
1583 }
1584 
1586 {
1587  spooler_->WaitForUpload(); // wait for all outstanding jobs to finish before tearing it down
1588  spooler_->UnregisterListeners();
1589  pending_catalogs_ = {}; // whatever we couldn't process, leave it to the Commit
1590 }
1591 
1592 void WritableCatalogManager::AddCatalogToQueue(const std::string &path)
1593 {
1594  SyncLock();
1595  WritableCatalog *catalog = NULL;
1596  bool const retval = FindCatalog(MakeRelativePath(path), &catalog, NULL);
1597  assert(retval);
1598  assert(catalog);
1599  catalog->SetDirty(); // ensure it's dirty so its parent will wait for it
1600  SyncUnlock();
1601  pending_catalogs_.push_back(catalog);
1602 }
1603 
1605 {
1606  // best effort to schedule as many catalogs for upload as possible
1607  for (auto it = pending_catalogs_.begin(); it != pending_catalogs_.end(); ) {
1608  if ((*it)->dirty_children() == 0) {
1609  FinalizeCatalog(*it, false /* stop_for_tweaks */);
1611  LogCvmfs(kLogCatalog, kLogVerboseMsg, "scheduled %s for processing", (*it)->mountpoint().c_str());
1612  it = pending_catalogs_.erase(it);
1613  } else {
1614  ++it;
1615  }
1616  }
1617 }
1618 
1619 // Callback for uploading a single catalog, similar to CatalogUploadCallback. The main difference
1620 // is that this callback would not trigger processing of the parent
1622  const upload::SpoolerResult &result) {
1623  if (result.return_code != 0) {
1624  PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1625  result.local_path.c_str(), result.return_code);
1626  }
1627 
1628  // retrieve the catalog object based on the callback information
1629  // see WritableCatalogManager::ScheduleCatalogProcessing()
1630  WritableCatalog *catalog = NULL;
1631  {
1633  std::map<std::string, WritableCatalog*>::iterator const c =
1634  catalog_processing_map_.find(result.local_path);
1635  assert(c != catalog_processing_map_.end());
1636  catalog = c->second;
1637  }
1638 
1639  uint64_t const catalog_size = GetFileSize(result.local_path);
1640  assert(catalog_size > 0);
1641 
1642  SyncLock();
1643  if (catalog->HasParent()) {
1644  // finalized nested catalogs will update their parent's pointer
1645  LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1646  WritableCatalog *parent = catalog->GetWritableParent();
1647 
1648  parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1649  result.content_hash,
1650  catalog_size,
1651  catalog->delta_counters_);
1652  parent->DecrementDirtyChildren();
1653  catalog->delta_counters_.SetZero();
1654  }
1655  // JUMP: detach the catalog after uploading to free sqlite related resources
1656  DetachCatalog(catalog);
1657  SyncUnlock();
1658 }
1659 // using the given list of dirs, fetch all relevant catalogs
1660 void WritableCatalogManager::LoadCatalogs(const std::string &base_path, const std::unordered_set<std::string> &dirs)
1661 {
1662  // mount everything up to "base_path" first (this would be our lease_path typically)
1663  Catalog *base_catalog;
1664  MountSubtree(PathString(base_path), NULL /* entry_point */, true /* is_listable */, &base_catalog);
1665 
1666  // start up the downloader
1667  CatalogDownloadContext context;
1668  context.dirs = &dirs;
1669  catalog_download_pipeline_ = new CatalogDownloadPipeline(static_cast<SimpleCatalogManager*>(this));
1672 
1673  Catalog::NestedCatalogList nested_catalogs = base_catalog->ListNestedCatalogs();
1674  for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1675  // schedule relevant child nested catalogs for download
1676  std::string const mountpoint = it->mountpoint.ToString();
1677  if (dirs.find(mountpoint) != dirs.end()) {
1678  Catalog *catalog = CreateCatalog(it->mountpoint, it->hash, NULL /* parent */);
1679  {
1681  catalog_download_map_.insert(std::make_pair(it->hash.ToString(), catalog));
1682  }
1684  }
1685  }
1686 
1688  delete catalog_download_pipeline_; // terminate all the threads
1689 }
1690 
1692  const LookupOptions options,
1693  DirectoryEntry *dirent) {
1694  SyncLock();
1695  bool const exists = LookupPath(path, options, dirent);
1696  SyncUnlock();
1697  return exists;
1698 }
1699 
1701  const CompressHashResult &result)
1702 {
1703  MutexLockGuard const guard(catalog_hash_lock_);
1704  catalog_hash_map_[result.path] = result.hash;
1705 }
1706 
1708  const CatalogDownloadResult &result,
1709  CatalogDownloadContext context)
1710 {
1711  Catalog *downloaded_catalog;
1712  {
1714  auto it = catalog_download_map_.find(result.hash);
1715  assert(it != catalog_download_map_.end());
1716  downloaded_catalog = it->second;
1717  }
1718 
1719  if (!downloaded_catalog->OpenDatabase(result.db_path)) {
1720  LogCvmfs(kLogCvmfs, kLogDebug, "failed to initialize catalog");
1721  delete downloaded_catalog;
1722  return;
1723  }
1724 
1725  Catalog::NestedCatalogList nested_catalogs = downloaded_catalog->ListNestedCatalogs();
1726  for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1727  // schedule relevant child nested catalogs for download
1728  if (context.dirs->find(it->mountpoint.ToString()) != context.dirs->end()) {
1729  Catalog *child_catalog = CreateCatalog(it->mountpoint, it->hash, NULL /* parent */);
1730  {
1732  catalog_download_map_.insert(std::make_pair(it->hash.ToString(), child_catalog));
1733  }
1735  }
1736  }
1737  delete downloaded_catalog;
1738 }
1739 
1740 
1741 
1742 } // namespace catalog
bool CompressPath2Null(const string &src, shash::Any *compressed_hash)
Definition: compression.cc:526
uint32_t linkcount() const
int return_code
the return value of the spooler operation
bool IsExternalFile() const
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
const Counters & GetCounters() const
Definition: catalog.h:171
void AddChunkedFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
bool IsNull() const
Definition: hash.h:371
const manifest::Manifest * manifest() const
Definition: repository.h:125
void set_base_hash(const shash::Any &hash)
bool IsRoot() const
Definition: catalog.h:189
ShortString< kDefaultMaxName, 1 > NameString
Definition: shortstring.h:214
void CloneTree(const std::string &from_dir, const std::string &to_dir)
void AddHardlinkGroup(const DirectoryEntryBaseList &entries, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
void set_dirty_children(const int count)
Definition: catalog_rw.h:137
void set_is_chunked_file(const bool val)
void AddDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
std::string database_path() const
Definition: catalog.h:180
bool IsDirectory() const
void set_catalog_hash(const shash::Any &catalog_hash)
Definition: manifest.h:107
std::unordered_map< std::string, Catalog * > catalog_download_map_
NameString GetFileName(const PathString &path)
Definition: shortstring.cc:28
Catalog * LoadFreeCatalog(const PathString &mountpoint, const shash::Any &hash)
bool IsChunkedFile() const
inode_t inode_
void Clear()
Definition: xattr.h:41
bool HasParent() const
Definition: catalog.h:196
void SingleCatalogUploadCallback(const upload::SpoolerResult &result)
bool MountSubtree(const PathString &path, const Catalog *entry_point, bool can_listing, Catalog **leaf_catalog)
void RemoveDirectory(const std::string &directory_path)
uint32_t GetMaxLinkId() const
Definition: catalog_rw.cc:125
void UpdateNestedCatalog(const std::string &path, const shash::Any &hash, const uint64_t size, const DeltaCounters &child_counters)
Definition: catalog_rw.cc:590
bool OpenDatabase(const std::string &db_path)
Definition: catalog.cc:161
#define PANIC(...)
Definition: exception.h:29
Definition: future.h:32
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1014
uint64_t size() const
WritableCatalog * GetHostingCatalog(const std::string &path)
void Assign(const char *chars, const unsigned length)
Definition: shortstring.h:61
void set_linkcount(const uint32_t linkcount)
void set_is_nested_catalog_root(const bool val)
bool IsDirty() const
Definition: catalog_rw.h:57
void FixWeight(WritableCatalog *catalog)
std::string ToStringWithSuffix() const
Definition: hash.h:296
gid_t gid_
void Balance(catalog_t *catalog)
bool LookupPath(const PathString &path, DirectoryEntry *dirent) const
Definition: catalog.h:124
unsigned LookupOptions
Definition: catalog_mgr.h:42
void ScheduleCatalogProcessing(WritableCatalog *catalog)
pthread_mutex_t * catalog_download_lock_
std::vector< Catalog * > CatalogList
Definition: catalog_mgr.h:242
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1042
void SetTTL(const uint64_t new_ttl)
static DeltaCounters Diff(const Counters &from, const Counters &to)
int GetModifiedCatalogsRecursively(const Catalog *catalog, WritableCatalogList *result) const
Counters_t GetSelfEntries() const
void set_revision(const uint64_t revision)
Definition: manifest.h:93
void InsertNestedCatalog(const std::string &mountpoint, Catalog *attached_reference, const shash::Any content_hash, const uint64_t size)
Definition: catalog_rw.cc:486
void TouchEntry(const DirectoryEntryBase &entry, const XattrList &xattrs, const shash::Md5 &path_hash)
Definition: catalog_rw.cc:217
uint64_t GetTTL() const
Definition: catalog.cc:480
assert((mem||(size==0))&&"Out Of Memory")
void Process(const shash::Any &catalog_hash)
bool FindCatalog(const std::string &path, WritableCatalog **result, DirectoryEntry *dirent=NULL)
bool Commit(const bool stop_for_tweaks, const uint64_t manual_revision, manifest::Manifest *manifest)
CatalogList GetChildren() const
Definition: catalog.cc:741
bool LookupPath(const PathString &path, const LookupOptions options, DirectoryEntry *entry)
void SetPreviousRevision(const shash::Any &hash)
Definition: catalog_rw.cc:337
Catalog * parent() const
Definition: catalog.h:176
pthread_mutex_t * catalog_processing_lock_
std::map< std::string, WritableCatalog * > catalog_processing_map_
Catalog * FindChild(const PathString &mountpoint) const
Definition: catalog.cc:798
std::vector< WritableCatalog * > WritableCatalogList
Definition: catalog_rw.h:194
shash::Any checksum() const
const NestedCatalogList & ListNestedCatalogs() const
Definition: catalog.cc:620
void CloneTreeImpl(const PathString &source_dir, const std::string &dest_parent_dir, const NameString &dest_name)
const unsigned kLookupDefault
Definition: catalog_mgr.h:43
uint64_t size_
bool CopyPath2File(const std::string &src, FILE *fdest)
Definition: compression.cc:46
T & Get()
Definition: future.h:66
void AddAsSubtree(DeltaCounters *delta) const
bool IsNestedCatalogMountpoint() const
bool IsTransitionPoint(const std::string &mountpoint)
pthread_mutex_t * catalog_hash_lock_
Algorithms
Definition: hash.h:41
bool Listing(const PathString &path, DirectoryEntryList *listing, const bool expand_symlink)
void CatalogUploadCallback(const upload::SpoolerResult &result, const CatalogUploadContext clg_upload_context)
bool IsNestedCatalogRoot() const
NameString name_
void TouchDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &directory_path)
uint64_t GetNumEntries() const
Definition: catalog.cc:535
uint64_t GetRevision() const
Definition: catalog.cc:517
void AddFileChunk(const std::string &entry_path, const FileChunk &chunk)
Definition: catalog_rw.cc:255
bool IsAutogenerated() const
Definition: catalog.h:190
std::vector< DirectoryEntry > DirectoryEntryList
CatalogDownloadPipeline * catalog_download_pipeline_
NameString name() const
static manifest::Manifest * CreateRepository(const std::string &dir_temp, const bool volatile_content, const std::string &voms_authz, upload::Spooler *spooler)
bool AttachCatalog(const std::string &db_path, Catalog *new_catalog)
void SplitPath(const std::string &path, std::string *dirname, std::string *filename)
Definition: posix.cc:114
bool HasXattrs() const
void ActivateCatalog(Catalog *catalog)
std::string local_path
the local_path previously given as input
bool LookupXattrsPath(const PathString &path, XattrList *xattrs) const
Definition: catalog.h:128
void ShrinkHardlinkGroup(const std::string &remove_path)
bool IsRegular() const
bool CopyCatalogToLocalCache(const upload::SpoolerResult &result)
bool GetModifiedCatalogLeafsRecursively(Catalog *catalog, WritableCatalogList *result) const
const char kSuffixCatalog
Definition: hash.h:54
uint32_t linkcount_
const std::string & dir_temp() const
void PopulateToParent(DeltaCounters *parent) const
bool InsertInitialValues(const std::string &root_path, const bool volatile_content, const std::string &voms_authz, const DirectoryEntry &root_entry=DirectoryEntry(kDirentNegative))
Definition: catalog_sql.cc:299
void AddCatalogToQueue(const std::string &path)
CatalogInfo SnapshotCatalogsSerialized(const bool stop_for_tweaks)
CatalogInfo SnapshotCatalogs(const bool stop_for_tweaks)
PathString mountpoint() const
Definition: catalog.h:175
time_t mtime_
static const inode_t kInvalidInode
void Append(const char *chars, const unsigned length)
Definition: shortstring.h:80
std::list< WritableCatalog * > pending_catalogs_
void TakeDatabaseFileOwnership()
Definition: catalog.cc:464
bool SetVOMSAuthz(const std::string &voms_authz)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
void Clone(const std::string from, const std::string to)
void RemoveFile(const std::string &file_path)
bool IsBindMountpoint() const
std::string MakeRelativePath(const std::string &relative_path) const
void CatalogUploadSerializedCallback(const upload::SpoolerResult &result, const CatalogUploadContext unused)
void RemoveNestedCatalog(const std::string &mountpoint, Catalog **attached_reference)
Definition: catalog_rw.cc:538
bool IsEmpty() const
Definition: bigvector.h:70
void AddEntry(const DirectoryEntry &entry, const XattrList &xattr, const std::string &entry_path, const std::string &parent_path)
void RemoveNestedCatalog(const std::string &mountpoint, const bool merge=true)
void set_ttl(const uint32_t ttl)
Definition: manifest.h:92
shash::Algorithms GetHashAlgorithm() const
Definition: manifest.h:90
std::string MakePathWithoutSuffix() const
Definition: hash.h:323
std::string ToString() const
Definition: shortstring.h:139
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:204
bool IsEmpty() const
Definition: shortstring.h:137
shash::Any hash
Definition: pipeline.h:162
unsigned int mode_
bool ListFileChunks(const PathString &path, const shash::Algorithms interpret_hashes_as, FileChunkList *chunks)
void Partition(WritableCatalog *new_nested_catalog)
Definition: catalog_rw.cc:345
ShortString< kDefaultMaxPath, 0 > PathString
Definition: shortstring.h:213
void SetRevision(const uint64_t new_revision)
Definition: catalog_rw.cc:314
std::string path
Definition: pipeline.h:161
void LoadCatalogs(const std::string &base_path, const std::unordered_set< std::string > &dirs)
const std::vector< Catalog * > & GetCatalogs() const
Definition: catalog_mgr.h:350
Definition: mutex.h:42
void CatalogDownloadCallback(const CatalogDownloadResult &result, const CatalogDownloadContext context)
void set_hardlink_group(const uint32_t group)
std::map< std::string, shash::Any > catalog_hash_map_
PathString GetParentPath(const PathString &path)
Definition: shortstring.cc:14
WritableCatalog * GetWritableParent() const
Definition: catalog_rw.h:130
void set_catalog_size(const uint64_t catalog_size)
Definition: manifest.h:104
DeltaCounters delta_counters_
Definition: catalog_rw.h:157
bool FindNested(const PathString &mountpoint, shash::Any *hash, uint64_t *size) const
Definition: catalog.cc:680
std::string GetFullPath(const std::string &parent_directory) const
bool LookupXattrs(const PathString &path, XattrList *xattrs)
Catalog * MountCatalog(const PathString &mountpoint, const shash::Any &hash, Catalog *parent_catalog)
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:812
std::vector< DirectoryEntryBase > DirectoryEntryBaseList
const int kLogVerboseMsg
void set_has_alt_catalog_path(const bool &has_alt_path)
Definition: manifest.h:113
unsigned GetLength() const
Definition: shortstring.h:131
void CatalogHashSerializedCallback(const CompressHashResult &result)
shash::Any checksum_
CatalogT * FindCatalog(const PathString &path) const
std::string MakePath() const
Definition: hash.h:306
void AddFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
const char * c_str() const
Definition: shortstring.h:143
void SwapNestedCatalog(const string &mountpoint, const shash::Any &new_hash, const uint64_t new_size)
void RemoveEntry(const std::string &entry_path)
Definition: catalog_rw.cc:181
const char * GetChars() const
Definition: shortstring.h:123
virtual bool IsWritable() const
Definition: catalog.h:197
void GetModifiedCatalogLeafs(WritableCatalogList *result) const
Catalog * CreateCatalog(const PathString &mountpoint, const shash::Any &catalog_hash, Catalog *parent_catalog)
void FinalizeCatalog(WritableCatalog *catalog, const bool stop_for_tweaks)
void IncLinkcount(const std::string &path_within_group, const int delta)
Definition: catalog_rw.cc:203
void GraftNestedCatalog(const string &mountpoint, const shash::Any &new_hash, const uint64_t new_size)
bool LookupDirEntry(const std::string &path, const LookupOptions options, DirectoryEntry *dirent)
const Item * AtPtr(const size_t index) const
Definition: bigvector.h:53
void set_is_nested_catalog_mountpoint(const bool val)
bool CompressPath2Path(const string &src, const string &dest)
Definition: compression.cc:315
void CreateNestedCatalog(const std::string &mountpoint)
void UpdateEntry(const DirectoryEntry &entry, const shash::Md5 &path_hash)
Definition: catalog_rw.cc:245
static DerivedT * Create(const std::string &filename)
Definition: sql_impl.h:30
void GetModifiedCatalogs(WritableCatalogList *result) const
size_t size() const
Definition: bigvector.h:117
const std::unordered_set< std::string > * dirs
uid_t uid_
void set_root_path(const std::string &root_path)
Definition: manifest.h:117
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545
const shash::Any & base_hash() const