GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/catalog.cc Lines: 358 392 91.3 %
Date: 2019-02-03 02:48:13 Branches: 126 197 64.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "catalog.h"
6
7
#include <alloca.h>
8
#include <errno.h>
9
10
#include <algorithm>
11
#include <cassert>
12
13
#include "catalog_mgr.h"
14
#include "logging.h"
15
#include "platform.h"
16
#include "smalloc.h"
17
#include "util_concurrency.h"
18
19
using namespace std;  // NOLINT
20
21
namespace catalog {
22
23
15
const shash::Md5 Catalog::kMd5PathEmpty("", 0);
24
25
26
/**
27
 * Open a catalog outside the framework of a catalog manager.
28
 */
29
13
Catalog* Catalog::AttachFreely(const string     &imaginary_mountpoint,
30
                               const string     &file,
31
                               const shash::Any &catalog_hash,
32
                                     Catalog    *parent,
33
                               const bool        is_nested) {
34
  Catalog *catalog =
35
    new Catalog(PathString(imaginary_mountpoint.data(),
36
                           imaginary_mountpoint.length()),
37
                catalog_hash,
38
                parent,
39
13
                is_nested);
40
13
  const bool successful_init = catalog->InitStandalone(file);
41
13
  if (!successful_init) {
42
1
    delete catalog;
43
1
    return NULL;
44
  }
45
12
  return catalog;
46
}
47
48
49
106
Catalog::Catalog(const PathString &mountpoint,
50
                 const shash::Any &catalog_hash,
51
                       Catalog    *parent,
52
                 const bool        is_nested) :
53
  catalog_hash_(catalog_hash),
54
  mountpoint_(mountpoint),
55
  is_regular_mountpoint_(mountpoint_ == root_prefix_),
56
  volatile_flag_(false),
57
  is_root_(parent == NULL && !is_nested),
58
  managed_database_(false),
59
  parent_(parent),
60
  nested_catalog_cache_dirty_(true),
61
  voms_authz_status_(kVomsUnknown),
62

106
  initialized_(false)
63
{
64
106
  max_row_id_ = 0;
65
106
  inode_annotation_ = NULL;
66
106
  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
67
106
  int retval = pthread_mutex_init(lock_, NULL);
68
106
  assert(retval == 0);
69
70
106
  database_ = NULL;
71
106
  uid_map_ = NULL;
72
106
  gid_map_ = NULL;
73
106
  sql_listing_ = NULL;
74
106
  sql_lookup_md5path_ = NULL;
75
106
  sql_lookup_nested_ = NULL;
76
106
  sql_list_nested_ = NULL;
77
106
  sql_own_list_nested_ = NULL;
78
106
  sql_all_chunks_ = NULL;
79
106
  sql_chunks_listing_ = NULL;
80
106
  sql_lookup_xattrs_ = NULL;
81
106
}
82
83
84
167
Catalog::~Catalog() {
85
106
  pthread_mutex_destroy(lock_);
86
106
  free(lock_);
87
106
  FinalizePreparedStatements();
88
106
  delete database_;
89

61
}
90
91
92
/**
93
 * InitPreparedStatement uses polymorphism in case of a r/w catalog.
94
 * FinalizePreparedStatements is called in the destructor where
95
 * polymorphism does not work any more and has to be called both in
96
 * the WritableCatalog and the Catalog destructor
97
 */
98
98
void Catalog::InitPreparedStatements() {
99
98
  sql_listing_          = new SqlListing(database());
100
98
  sql_lookup_md5path_   = new SqlLookupPathHash(database());
101
98
  sql_lookup_nested_    = new SqlNestedCatalogLookup(database());
102
98
  sql_list_nested_      = new SqlNestedCatalogListing(database());
103
98
  sql_own_list_nested_  = new SqlOwnNestedCatalogListing(database());
104
98
  sql_all_chunks_       = new SqlAllChunks(database());
105
98
  sql_chunks_listing_   = new SqlChunksListing(database());
106
98
  sql_lookup_xattrs_    = new SqlLookupXattrs(database());
107
98
}
108
109
110
106
void Catalog::FinalizePreparedStatements() {
111
106
  delete sql_lookup_xattrs_;
112
106
  delete sql_chunks_listing_;
113
106
  delete sql_all_chunks_;
114
106
  delete sql_listing_;
115
106
  delete sql_lookup_md5path_;
116
106
  delete sql_lookup_nested_;
117
106
  delete sql_list_nested_;
118
106
  delete sql_own_list_nested_;
119
106
}
120
121
122
29
bool Catalog::InitStandalone(const std::string &database_file) {
123
29
  bool retval = OpenDatabase(database_file);
124
29
  if (!retval) {
125
1
    return false;
126
  }
127
128
28
  InodeRange inode_range;
129
28
  inode_range.MakeDummy();
130
28
  set_inode_range(inode_range);
131
28
  return true;
132
}
133
134
135
120
bool Catalog::ReadCatalogCounters() {
136
120
  assert(database_ != NULL);
137
  bool statistics_loaded;
138
120
  if (database().schema_version() <
139
      CatalogDatabase::kLatestSupportedSchema - CatalogDatabase::kSchemaEpsilon)
140
  {
141
    statistics_loaded =
142
      counters_.ReadFromDatabase(database(), LegacyMode::kLegacy);
143
120
  } else if (database().schema_revision() < 2) {
144
    statistics_loaded =
145
      counters_.ReadFromDatabase(database(), LegacyMode::kNoXattrs);
146
120
  } else if (database().schema_revision() < 3) {
147
    statistics_loaded =
148
      counters_.ReadFromDatabase(database(), LegacyMode::kNoExternals);
149
120
  } else if (database().schema_revision() < 5) {
150
    statistics_loaded =
151
      counters_.ReadFromDatabase(database(), LegacyMode::kNoSpecials);
152
  } else {
153
120
    statistics_loaded = counters_.ReadFromDatabase(database());
154
  }
155
120
  return statistics_loaded;
156
}
157
158
159
/**
160
 * Establishes the database structures and opens the sqlite database file.
161
 * @param db_path the absolute path to the database file on local file system
162
 * @return true on successful initialization otherwise false
163
 */
164
100
bool Catalog::OpenDatabase(const string &db_path) {
165
100
  database_ = CatalogDatabase::Open(db_path, DatabaseOpenMode());
166
100
  if (NULL == database_) {
167
2
    return false;
168
  }
169
170
98
  InitPreparedStatements();
171
172
  // Set the database file ownership if requested
173
98
  if (managed_database_) {
174
3
    database_->TakeFileOwnership();
175
  }
176
177
  // Find out the maximum row id of this database file
178
98
  SqlCatalog sql_max_row_id(database(), "SELECT MAX(rowid) FROM catalog;");
179
98
  if (!sql_max_row_id.FetchRow()) {
180
    LogCvmfs(kLogCatalog, kLogDebug,
181
             "Cannot retrieve maximal row id for database file %s "
182
             "(SqliteErrorcode: %d)",
183
             db_path.c_str(), sql_max_row_id.GetLastError());
184
    return false;
185
  }
186
98
  max_row_id_ = sql_max_row_id.RetrieveInt64(0);
187
188
  // Get root prefix
189

98
  if (database_->HasProperty("root_prefix")) {
190
    const std::string root_prefix =
191
22
                           database_->GetProperty<std::string>("root_prefix");
192
22
    root_prefix_.Assign(root_prefix.data(), root_prefix.size());
193
    LogCvmfs(kLogCatalog, kLogDebug,
194
             "found root prefix %s in root catalog file %s",
195
22
             root_prefix_.c_str(), db_path.c_str());
196
22
    is_regular_mountpoint_ = (root_prefix_ == mountpoint_);
197
  } else {
198
    LogCvmfs(kLogCatalog, kLogDebug,
199
76
             "no root prefix for root catalog file %s", db_path.c_str());
200
  }
201
202
  // Get volatile content flag
203
  volatile_flag_ = database_->GetPropertyDefault<bool>("volatile",
204
98
                                                       volatile_flag_);
205
206
  // Read Catalog Counter Statistics
207
98
  if (!ReadCatalogCounters()) {
208
    LogCvmfs(kLogCatalog, kLogStderr,
209
             "failed to load statistics counters for catalog %s (file %s)",
210
             mountpoint_.c_str(), db_path.c_str());
211
    return false;
212
  }
213
214
98
  if (HasParent()) {
215
22
    parent_->AddChild(this);
216
  }
217
218
98
  initialized_ = true;
219
98
  return true;
220
}
221
222
223
/**
224
 * Removes the mountpoint and prepends the root prefix to path
225
 */
226
197
shash::Md5 Catalog::NormalizePath(const PathString &path) const {
227
197
  if (is_regular_mountpoint_)
228
193
    return shash::Md5(path.GetChars(), path.GetLength());
229
230
4
  assert(path.GetLength() >= mountpoint_.GetLength());
231
  // Piecewise hash calculation: root_prefix plus tail of path
232
4
  shash::Any result(shash::kMd5);
233
4
  shash::ContextPtr ctx(shash::kMd5);
234
4
  ctx.buffer = alloca(ctx.size);
235
4
  shash::Init(ctx);
236
  shash::Update(
237
    reinterpret_cast<const unsigned char *>(root_prefix_.GetChars()),
238
    root_prefix_.GetLength(),
239
4
    ctx);
240
  shash::Update(
241
    reinterpret_cast<const unsigned char *>(path.GetChars()) +
242
      mountpoint_.GetLength(),
243
    path.GetLength() - mountpoint_.GetLength(),
244
4
    ctx);
245
4
  shash::Final(ctx, &result);
246
4
  return result.CastToMd5();
247
}
248
249
250
/**
251
 * Same as NormalizePath but returns a PathString instead of an Md5 hash.
252
 */
253
18
PathString Catalog::NormalizePath2(const PathString &path) const {
254
18
  if (is_regular_mountpoint_)
255
14
    return path;
256
257
4
  assert(path.GetLength() >= mountpoint_.GetLength());
258
4
  PathString result = root_prefix_;
259
4
  PathString suffix = path.Suffix(mountpoint_.GetLength());
260
4
  result.Append(suffix.GetChars(), suffix.GetLength());
261
4
  return result;
262
}
263
264
265
/**
266
 * The opposite of NormalizePath: from a full path remove the root prefix and
267
 * add the catalogs current mountpoint.  Needed for normalized paths from the
268
 * SQlite tables, such as nested catalog entry points.
269
 */
270
23
PathString Catalog::PlantPath(const PathString &path) const {
271
23
  if (is_regular_mountpoint_)
272
19
    return path;
273
274
4
  assert(path.GetLength() >= root_prefix_.GetLength());
275
4
  PathString result = mountpoint_;
276
4
  PathString suffix = path.Suffix(root_prefix_.GetLength());
277
4
  result.Append(suffix.GetChars(), suffix.GetLength());
278
4
  return result;
279
}
280
281
282
/**
283
 * Performs a lookup on this Catalog for a given MD5 path hash.
284
 * @param md5path the MD5 hash of the searched path
285
 * @param expand_symlink indicates if variables in symlink should be resolved
286
 * @param dirent will be set to the found DirectoryEntry
287
 * @return true if DirectoryEntry was successfully found, false otherwise
288
 */
289
157
bool Catalog::LookupEntry(const shash::Md5 &md5path, const bool expand_symlink,
290
                          DirectoryEntry *dirent) const
291
{
292
157
  assert(IsInitialized());
293
294
157
  pthread_mutex_lock(lock_);
295
157
  sql_lookup_md5path_->BindPathHash(md5path);
296
157
  bool found = sql_lookup_md5path_->FetchRow();
297

157
  if (found && (dirent != NULL)) {
298
148
    *dirent = sql_lookup_md5path_->GetDirent(this, expand_symlink);
299
148
    FixTransitionPoint(md5path, dirent);
300
  }
301
157
  sql_lookup_md5path_->Reset();
302
157
  pthread_mutex_unlock(lock_);
303
304
157
  return found;
305
}
306
307
308
/**
309
 * Performs a lookup on this Catalog for a given MD5 path hash.
310
 * @param md5path the MD5 hash of the searched path
311
 * @param dirent will be set to the found DirectoryEntry
312
 * @return true if DirectoryEntry was successfully found, false otherwise
313
 */
314
156
bool Catalog::LookupMd5Path(const shash::Md5 &md5path,
315
                            DirectoryEntry *dirent) const
316
{
317
156
  return LookupEntry(md5path, true, dirent);
318
}
319
320
321
1
bool Catalog::LookupRawSymlink(const PathString &path,
322
                               LinkString *raw_symlink) const
323
{
324
1
  DirectoryEntry dirent;
325
1
  bool result = (LookupEntry(NormalizePath(path), false, &dirent));
326
1
  if (result)
327
1
    raw_symlink->Assign(dirent.symlink());
328
1
  return result;
329
}
330
331
332
1
bool Catalog::LookupXattrsMd5Path(
333
  const shash::Md5 &md5path,
334
  XattrList *xattrs) const
335
{
336
1
  assert(IsInitialized());
337
338
1
  pthread_mutex_lock(lock_);
339
1
  sql_lookup_xattrs_->BindPathHash(md5path);
340
1
  bool found = sql_lookup_xattrs_->FetchRow();
341

1
  if (found && (xattrs != NULL)) {
342
1
    *xattrs = sql_lookup_xattrs_->GetXattrs();
343
  }
344
1
  sql_lookup_xattrs_->Reset();
345
1
  pthread_mutex_unlock(lock_);
346
347
1
  return found;
348
}
349
350
351
/**
352
 * Perform a listing of the directory with the given MD5 path hash.
353
 * @param path_hash the MD5 hash of the path of the directory to list
354
 * @param listing will be set to the resulting DirectoryEntryList
355
 * @return true on successful listing, false otherwise
356
 */
357
4
bool Catalog::ListingMd5PathStat(const shash::Md5 &md5path,
358
                                 StatEntryList *listing) const
359
{
360
4
  assert(IsInitialized());
361
362
4
  DirectoryEntry dirent;
363
4
  StatEntry entry;
364
365
4
  pthread_mutex_lock(lock_);
366
4
  sql_listing_->BindPathHash(md5path);
367
16
  while (sql_listing_->FetchRow()) {
368
8
    dirent = sql_listing_->GetDirent(this);
369
8
    if (dirent.IsHidden())
370
1
      continue;
371
7
    FixTransitionPoint(md5path, &dirent);
372
7
    entry.name = dirent.name();
373
7
    entry.info = dirent.GetStatStructure();
374
7
    listing->PushBack(entry);
375
  }
376
4
  sql_listing_->Reset();
377
4
  pthread_mutex_unlock(lock_);
378
379
4
  return true;
380
}
381
382
383
/**
384
 * Perform a listing of the directory with the given MD5 path hash.
385
 * Returns only struct stat values
386
 * @param path_hash the MD5 hash of the path of the directory to list
387
 * @param listing will be set to the resulting DirectoryEntryList
388
 * @param expand_symlink defines if magic symlinks should be resolved
389
 * @return true on successful listing, false otherwise
390
 */
391
30
bool Catalog::ListingMd5Path(const shash::Md5 &md5path,
392
                             DirectoryEntryList *listing,
393
                             const bool expand_symlink) const
394
{
395
30
  assert(IsInitialized());
396
397
30
  pthread_mutex_lock(lock_);
398
30
  sql_listing_->BindPathHash(md5path);
399
30
  while (sql_listing_->FetchRow()) {
400
43
    DirectoryEntry dirent = sql_listing_->GetDirent(this, expand_symlink);
401
43
    FixTransitionPoint(md5path, &dirent);
402
43
    listing->push_back(dirent);
403
  }
404
30
  sql_listing_->Reset();
405
30
  pthread_mutex_unlock(lock_);
406
407
30
  return true;
408
}
409
410
411
1
bool Catalog::AllChunksBegin() {
412
1
  return sql_all_chunks_->Open();
413
}
414
415
416
5
bool Catalog::AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
417
{
418
5
  return sql_all_chunks_->Next(hash, compression_alg);
419
}
420
421
422
1
bool Catalog::AllChunksEnd() {
423
1
  return sql_all_chunks_->Close();
424
}
425
426
427
/**
428
 * Hash algorithm is given by the unchunked file.
429
 * Could be figured out by a join but it is faster if the user of this
430
 * method tells us.
431
 */
432
1
bool Catalog::ListMd5PathChunks(const shash::Md5  &md5path,
433
                                const shash::Algorithms interpret_hashes_as,
434
                                FileChunkList    *chunks) const
435
{
436

1
  assert(IsInitialized() && chunks->IsEmpty());
437
438
1
  pthread_mutex_lock(lock_);
439
1
  sql_chunks_listing_->BindPathHash(md5path);
440
3
  while (sql_chunks_listing_->FetchRow()) {
441
1
    chunks->PushBack(sql_chunks_listing_->GetFileChunk(interpret_hashes_as));
442
  }
443
1
  sql_chunks_listing_->Reset();
444
1
  pthread_mutex_unlock(lock_);
445
446
1
  return true;
447
}
448
449
450
/**
451
 * Only used by the garbage collection
452
 */
453
1
const Catalog::HashVector& Catalog::GetReferencedObjects() const {
454
1
  if (!referenced_hashes_.empty()) {
455
    return referenced_hashes_;
456
  }
457
458
  // retrieve all referenced content hashes of both files and file chunks
459
1
  SqlListContentHashes list_content_hashes(database());
460
10
  while (list_content_hashes.FetchRow()) {
461
8
    referenced_hashes_.push_back(list_content_hashes.GetHash());
462
  }
463
464
1
  return referenced_hashes_;
465
}
466
467
468
33
void Catalog::TakeDatabaseFileOwnership() {
469
33
  managed_database_ = true;
470
33
  if (NULL != database_) {
471
30
    database_->TakeFileOwnership();
472
  }
473
33
}
474
475
476
1
void Catalog::DropDatabaseFileOwnership() {
477
1
  managed_database_ = false;
478
1
  if (NULL != database_) {
479
1
    database_->DropFileOwnership();
480
  }
481
1
}
482
483
484
9
uint64_t Catalog::GetTTL() const {
485
9
  pthread_mutex_lock(lock_);
486
  const uint64_t result =
487
9
    database().GetPropertyDefault<uint64_t>("TTL", kDefaultTTL);
488
9
  pthread_mutex_unlock(lock_);
489
9
  return result;
490
}
491
492
493
bool Catalog::HasExplicitTTL() const {
494
  pthread_mutex_lock(lock_);
495
  const bool result = database().HasProperty("TTL");
496
  pthread_mutex_unlock(lock_);
497
  return result;
498
}
499
500
501
56
bool Catalog::GetVOMSAuthz(string *authz) const {
502
  bool result;
503
56
  pthread_mutex_lock(lock_);
504
56
  if (voms_authz_status_ == kVomsPresent) {
505
    if (authz) {*authz = voms_authz_;}
506
    result = true;
507
56
  } else if (voms_authz_status_ == kVomsNone) {
508
    result = false;
509
  } else {
510

56
    if (database().HasProperty("voms_authz")) {
511
      voms_authz_ = database().GetProperty<string>("voms_authz");
512
      if (authz) {*authz = voms_authz_;}
513
      voms_authz_status_ = kVomsPresent;
514
    } else {
515
56
      voms_authz_status_ = kVomsNone;
516
    }
517
56
    result = (voms_authz_status_ == kVomsPresent);
518
  }
519
56
  pthread_mutex_unlock(lock_);
520
56
  return result;
521
}
522
523
84
uint64_t Catalog::GetRevision() const {
524
84
  pthread_mutex_lock(lock_);
525
  const uint64_t result =
526
84
    database().GetPropertyDefault<uint64_t>("revision", 0);
527
84
  pthread_mutex_unlock(lock_);
528
84
  return result;
529
}
530
531
532
1
uint64_t Catalog::GetLastModified() const {
533
1
  const std::string prop_name = "last_modified";
534
  return (database().HasProperty(prop_name))
535
    ? database().GetProperty<int>(prop_name)
536
1
    : 0u;
537
}
538
539
540
uint64_t Catalog::GetNumChunks() const {
541
  return counters_.Get("self_regular") + counters_.Get("self_chunks");
542
}
543
544
545
2
uint64_t Catalog::GetNumEntries() const {
546
2
  const string sql = "SELECT count(*) FROM catalog;";
547
548
2
  pthread_mutex_lock(lock_);
549
2
  SqlCatalog stmt(database(), sql);
550
2
  const uint64_t result = (stmt.FetchRow()) ? stmt.RetrieveInt64(0) : 0;
551
2
  pthread_mutex_unlock(lock_);
552
553
2
  return result;
554
}
555
556
557
1
shash::Any Catalog::GetPreviousRevision() const {
558
1
  pthread_mutex_lock(lock_);
559
  const std::string hash_string =
560
1
    database().GetPropertyDefault<std::string>("previous_revision", "");
561
1
  pthread_mutex_unlock(lock_);
562
  return (!hash_string.empty())
563
    ? shash::MkFromHexPtr(shash::HexPtr(hash_string), shash::kSuffixCatalog)
564
1
    : shash::Any();
565
}
566
567
568
1
string Catalog::PrintMemStatistics() const {
569
1
  sqlite::MemStatistics stats;
570
1
  pthread_mutex_lock(lock_);
571
1
  database().GetMemStatistics(&stats);
572
1
  pthread_mutex_unlock(lock_);
573
  return string(mountpoint().GetChars(), mountpoint().GetLength()) + ": " +
574
    StringifyInt(stats.lookaside_slots_used) + " / " +
575
      StringifyInt(stats.lookaside_slots_max) + " slots -- " +
576
      StringifyInt(stats.lookaside_hit) + " hits, " +
577
      StringifyInt(stats.lookaside_miss_size) + " misses-size, " +
578
      StringifyInt(stats.lookaside_miss_full) + " misses-full -- " +
579
    StringifyInt(stats.page_cache_used / 1024) + " kB pages -- " +
580
      StringifyInt(stats.page_cache_hit) + " hits, " +
581
      StringifyInt(stats.page_cache_miss) + " misses -- " +
582
    StringifyInt(stats.schema_used / 1024) + " kB schema -- " +
583
1
    StringifyInt(stats.stmt_used / 1024) + " kB statements";
584
}
585
586
587
/**
588
 * Determine the actual inode of a DirectoryEntry.
589
 * The first used entry from a hardlink group deterimines the inode of the
590
 * others.
591
 * @param row_id the row id of a read row in the sqlite database
592
 * @param hardlink_group the id of a possibly present hardlink group
593
 * @return the assigned inode number
594
 */
595
199
inode_t Catalog::GetMangledInode(const uint64_t row_id,
596
                                 const uint64_t hardlink_group) const {
597
199
  assert(IsInitialized());
598
599
199
  if (inode_range_.IsDummy()) {
600
16
    return DirectoryEntry::kInvalidInode;
601
  }
602
603
183
  inode_t inode = row_id + inode_range_.offset;
604
605
  // Hardlinks are encoded in catalog-wide unique hard link group ids.
606
  // These ids must be resolved to actual inode relationships at runtime.
607
183
  if (hardlink_group > 0) {
608
    HardlinkGroupMap::const_iterator inode_iter =
609
      hardlink_groups_.find(hardlink_group);
610
611
    // Use cached entry if possible
612
    if (inode_iter == hardlink_groups_.end()) {
613
      hardlink_groups_[hardlink_group] = inode;
614
    } else {
615
      inode = inode_iter->second;
616
    }
617
  }
618
619
183
  if (inode_annotation_) {
620
    inode = inode_annotation_->Annotate(inode);
621
  }
622
623
183
  return inode;
624
}
625
626
627
/**
628
 * Get a list of all registered nested catalogs and bind mountpoints in this
629
 * catalog.
630
 * @return  a list of all nested catalog and bind mountpoints.
631
 */
632
117
const Catalog::NestedCatalogList& Catalog::ListNestedCatalogs() const {
633
117
  pthread_mutex_lock(lock_);
634
635
117
  if (nested_catalog_cache_dirty_) {
636
    LogCvmfs(kLogCatalog, kLogDebug, "refreshing nested catalog cache of '%s'",
637
29
             mountpoint().c_str());
638
29
    while (sql_list_nested_->FetchRow()) {
639
13
      NestedCatalog nested;
640
13
      nested.mountpoint = PlantPath(sql_list_nested_->GetPath());
641
13
      nested.hash = sql_list_nested_->GetContentHash();
642
13
      nested.size = sql_list_nested_->GetSize();
643
13
      nested_catalog_cache_.push_back(nested);
644
    }
645
29
    sql_list_nested_->Reset();
646
29
    nested_catalog_cache_dirty_ = false;
647
  }
648
649
117
  pthread_mutex_unlock(lock_);
650
117
  return nested_catalog_cache_;
651
}
652
653
654
/**
655
 * Get a list of all registered nested catalogs without bind mountpoints.  Used
656
 * for replication and garbage collection.
657
 * @return  a list of all nested catalogs.
658
 */
659
9
const Catalog::NestedCatalogList Catalog::ListOwnNestedCatalogs() const {
660
9
  NestedCatalogList result;
661
662
9
  pthread_mutex_lock(lock_);
663
664
9
  while (sql_own_list_nested_->FetchRow()) {
665
4
    NestedCatalog nested;
666
4
    nested.mountpoint = PlantPath(sql_own_list_nested_->GetPath());
667
4
    nested.hash = sql_own_list_nested_->GetContentHash();
668
4
    nested.size = sql_own_list_nested_->GetSize();
669
4
    result.push_back(nested);
670
  }
671
9
  sql_own_list_nested_->Reset();
672
673
9
  pthread_mutex_unlock(lock_);
674
675
  return result;
676
}
677
678
679
/**
680
 * Drops the nested catalog cache. Usually this is only useful in subclasses
681
 * that implement writable catalogs.
682
 *
683
 * Note: this action is _not_ secured by the catalog's mutex. If serialisation
684
 *       is required the subclass needs to ensure that.
685
 */
686
22
void Catalog::ResetNestedCatalogCacheUnprotected() {
687
22
  nested_catalog_cache_.clear();
688
22
  nested_catalog_cache_dirty_ = true;
689
22
}
690
691
692
/**
693
 * Looks for a specific registered nested catalog based on a path.
694
 */
695
12
bool Catalog::FindNested(const PathString &mountpoint,
696
                         shash::Any *hash, uint64_t *size) const
697
{
698
12
  pthread_mutex_lock(lock_);
699
12
  PathString normalized_mountpoint = NormalizePath2(mountpoint);
700
12
  sql_lookup_nested_->BindSearchPath(normalized_mountpoint);
701
12
  bool found = sql_lookup_nested_->FetchRow();
702

12
  if (found && (hash != NULL)) {
703
12
    *hash = sql_lookup_nested_->GetContentHash();
704
12
    *size = sql_lookup_nested_->GetSize();
705
  }
706
12
  sql_lookup_nested_->Reset();
707
12
  pthread_mutex_unlock(lock_);
708
709
12
  return found;
710
}
711
712
713
/**
714
 * Sets a new object to do inode annotations (or set to NULL)
715
 * The annotation object is not owned by the catalog.
716
 */
717
69
void Catalog::SetInodeAnnotation(InodeAnnotation *new_annotation) {
718
69
  pthread_mutex_lock(lock_);
719
  // Since annotated inodes could come back to the catalog in order to
720
  // get stripped, exchanging the annotation is not allowed
721

69
  assert((inode_annotation_ == NULL) || (inode_annotation_ == new_annotation));
722
69
  inode_annotation_ = new_annotation;
723
69
  pthread_mutex_unlock(lock_);
724
69
}
725
726
727
69
void Catalog::SetOwnerMaps(const OwnerMap *uid_map, const OwnerMap *gid_map) {
728

69
  uid_map_ = (uid_map && uid_map->HasEffect()) ? uid_map : NULL;
729

69
  gid_map_ = (gid_map && gid_map->HasEffect()) ? gid_map : NULL;
730
69
}
731
732
733
/**
734
 * Add a Catalog as child to this Catalog.
735
 * @param child the Catalog to define as child
736
 */
737
22
void Catalog::AddChild(Catalog *child) {
738
22
  assert(NULL == FindChild(child->mountpoint()));
739
740
22
  pthread_mutex_lock(lock_);
741
22
  children_[child->mountpoint()] = child;
742
22
  child->set_parent(this);
743
22
  pthread_mutex_unlock(lock_);
744
22
}
745
746
747
/**
748
 * Removes a Catalog from the children list of this Catalog
749
 * @param child the Catalog to delete as child
750
 */
751
13
void Catalog::RemoveChild(Catalog *child) {
752
13
  assert(NULL != FindChild(child->mountpoint()));
753
754
13
  pthread_mutex_lock(lock_);
755
13
  child->set_parent(NULL);
756
13
  children_.erase(child->mountpoint());
757
13
  pthread_mutex_unlock(lock_);
758
13
}
759
760
761
84
CatalogList Catalog::GetChildren() const {
762
84
  CatalogList result;
763
764
84
  pthread_mutex_lock(lock_);
765
188
  for (NestedCatalogMap::const_iterator i = children_.begin(),
766
84
       iEnd = children_.end(); i != iEnd; ++i)
767
  {
768
20
    result.push_back(i->second);
769
  }
770
84
  pthread_mutex_unlock(lock_);
771
772
  return result;
773
}
774
775
776
/**
777
 * Find the nested catalog that serves the given path.
778
 * It might be possible that the path is in fact served by a child of the found
779
 * nested catalog.
780
 * @param path the path to find a best fitting catalog for
781
 * @return a pointer to the best fitting child or NULL if it does not fit at all
782
 */
783
134
Catalog* Catalog::FindSubtree(const PathString &path) const {
784
  // Check if this catalog fits the beginning of the path.
785
134
  if (!path.StartsWith(mountpoint_))
786
    return NULL;
787
788
134
  PathString remaining(path.Suffix(mountpoint_.GetLength()));
789
134
  remaining.Append("/", 1);
790
791
  // now we recombine the path elements successively
792
  // in order to find a child which serves a part of the path
793
134
  PathString path_prefix(mountpoint_);
794
134
  Catalog *result = NULL;
795
  // Skip the first '/'
796
134
  path_prefix.Append("/", 1);
797
134
  const char *c = remaining.GetChars() + 1;
798
1167
  for (unsigned i = 1; i < remaining.GetLength(); ++i, ++c) {
799
1051
    if (*c == '/') {
800
236
      result = FindChild(path_prefix);
801
802
      // If we found a child serving a part of the path we can stop searching.
803
      // Remaining sub path elements are possbily served by a grand child.
804
236
      if (result != NULL)
805
18
        break;
806
    }
807
1033
    path_prefix.Append(c, 1);
808
  }
809
810
134
  return result;
811
}
812
813
814
/**
815
 * Looks for a child catalog, which is a subset of all registered nested
816
 * catalogs.
817
 */
818
271
Catalog* Catalog::FindChild(const PathString &mountpoint) const {
819
271
  NestedCatalogMap::const_iterator nested_iter;
820
821
271
  pthread_mutex_lock(lock_);
822
271
  nested_iter = children_.find(mountpoint);
823
  Catalog* result =
824
271
    (nested_iter == children_.end()) ? NULL : nested_iter->second;
825
271
  pthread_mutex_unlock(lock_);
826
827
271
  return result;
828
}
829
830
831
/**
832
 * For the transtion points for nested catalogs and bind mountpoints, the inode
833
 * is ambiguous. It has to be set to the parent inode because nested catalogs
834
 * are lazily loaded.
835
 * @param md5path the MD5 hash of the entry to check
836
 * @param dirent the DirectoryEntry to perform coherence fixes on
837
 */
838
198
void Catalog::FixTransitionPoint(const shash::Md5 &md5path,
839
                                 DirectoryEntry *dirent) const
840
{
841
198
  if (!HasParent())
842
159
    return;
843
844
39
  if (dirent->IsNestedCatalogRoot()) {
845
    // Normal nested catalog
846
2
    DirectoryEntry parent_dirent;
847
2
    const bool retval = parent_->LookupMd5Path(md5path, &parent_dirent);
848
2
    assert(retval);
849
2
    dirent->set_inode(parent_dirent.inode());
850
37
  } else if (md5path == kMd5PathEmpty) {
851
    // Bind mountpoint
852
    DirectoryEntry parent_dirent;
853
    const bool retval = parent_->LookupPath(mountpoint_, &parent_dirent);
854
    assert(retval);
855
    dirent->set_inode(parent_dirent.inode());
856
  }
857
}
858
859

45
}  // namespace catalog