GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/catalog_mgr_impl.h Lines: 331 456 72.6 %
Date: 2019-02-03 02:48:13 Branches: 116 197 58.9 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System
3
 */
4
5
#ifndef CVMFS_CATALOG_MGR_IMPL_H_
6
#define CVMFS_CATALOG_MGR_IMPL_H_
7
8
#ifndef __STDC_FORMAT_MACROS
9
#define __STDC_FORMAT_MACROS
10
#endif
11
12
#include "cvmfs_config.h"
13
14
#include <cassert>
15
#include <string>
16
#include <vector>
17
18
#include "logging.h"
19
#include "shortstring.h"
20
#include "statistics.h"
21
#include "xattr.h"
22
23
using namespace std;  // NOLINT
24
25
namespace catalog {
26
27
template <class CatalogT>
28
84
AbstractCatalogManager<CatalogT>::AbstractCatalogManager(
29
    perf::Statistics *statistics) :
30
84
  statistics_(statistics) {
31
84
  inode_watermark_status_ = 0;
32
84
  inode_gauge_ = AbstractCatalogManager<CatalogT>::kInodeOffset;
33
84
  revision_cache_ = 0;
34
84
  catalog_watermark_ = 0;
35
84
  volatile_flag_ = false;
36
84
  has_authz_cache_ = false;
37
84
  inode_annotation_ = NULL;
38
84
  incarnation_ = 0;
39
84
  rwlock_ =
40
    reinterpret_cast<pthread_rwlock_t *>(smalloc(sizeof(pthread_rwlock_t)));
41
84
  int retval = pthread_rwlock_init(rwlock_, NULL);
42
84
  assert(retval == 0);
43
84
  retval = pthread_key_create(&pkey_sqlitemem_, NULL);
44
84
  assert(retval == 0);
45
84
}
46
47
template <class CatalogT>
48
84
AbstractCatalogManager<CatalogT>::~AbstractCatalogManager() {
49
84
  DetachAll();
50
84
  pthread_key_delete(pkey_sqlitemem_);
51
84
  pthread_rwlock_destroy(rwlock_);
52

84
  free(rwlock_);
53
168
}
54
55
template <class CatalogT>
56
50
void AbstractCatalogManager<CatalogT>::SetInodeAnnotation(
57
    InodeAnnotation *new_annotation)
58
{
59

50
  assert(catalogs_.empty() || (new_annotation == inode_annotation_));
60
50
  inode_annotation_ = new_annotation;
61
50
}
62
63
template <class CatalogT>
64
55
void AbstractCatalogManager<CatalogT>::SetOwnerMaps(const OwnerMap &uid_map,
65
                                          const OwnerMap &gid_map)
66
{
67
55
  uid_map_ = uid_map;
68
55
  gid_map_ = gid_map;
69
55
}
70
71
template <class CatalogT>
72
1
void AbstractCatalogManager<CatalogT>::SetCatalogWatermark(unsigned limit) {
73
1
  catalog_watermark_ = limit;
74
1
}
75
76
template <class CatalogT>
77
99
void AbstractCatalogManager<CatalogT>::CheckInodeWatermark() {
78
99
  if (inode_watermark_status_ > 0)
79
    return;
80
81
99
  uint64_t highest_inode = inode_gauge_;
82
99
  if (inode_annotation_)
83
36
    highest_inode += inode_annotation_->GetGeneration();
84
99
  uint64_t uint32_border = 1;
85
99
  uint32_border = uint32_border << 32;
86
99
  if (highest_inode >= uint32_border) {
87
    LogCvmfs(kLogCatalog, kLogDebug | kLogSyslogWarn, "inodes exceed 32bit");
88
    inode_watermark_status_++;
89
  }
90
}
91
92
93
/**
94
 * Initializes the CatalogManager and loads and attaches the root entry.
95
 * @return true on successful init, otherwise false
96
 */
97
template <class CatalogT>
98
51
bool AbstractCatalogManager<CatalogT>::Init() {
99
51
  LogCvmfs(kLogCatalog, kLogDebug, "Initialize catalog");
100
51
  WriteLock();
101
51
  bool attached = MountCatalog(PathString("", 0), shash::Any(), NULL);
102
51
  Unlock();
103
104
51
  if (!attached) {
105
11
    LogCvmfs(kLogCatalog, kLogDebug, "failed to initialize root catalog");
106
  }
107
108
51
  return attached;
109
}
110
111
112
/**
113
 * Remounts the root catalog if necessary.  If a newer root catalog exists,
114
 * it is mounted and replaces the currently mounted tree (all existing catalogs
115
 * are detached)
116
 */
117
template <class CatalogT>
118
2
LoadError AbstractCatalogManager<CatalogT>::Remount(const bool dry_run) {
119
2
  LogCvmfs(kLogCatalog, kLogDebug,
120
           "remounting repositories (dry run %d)", dry_run);
121
2
  if (dry_run)
122
1
    return LoadCatalog(PathString("", 0), shash::Any(), NULL, NULL);
123
124
1
  WriteLock();
125
126
1
  string     catalog_path;
127
1
  shash::Any catalog_hash;
128
  const LoadError load_error = LoadCatalog(PathString("", 0),
129
                                           shash::Any(),
130
                                           &catalog_path,
131
1
                                           &catalog_hash);
132
1
  if (load_error == kLoadNew) {
133
1
    inode_t old_inode_gauge = inode_gauge_;
134
1
    DetachAll();
135
1
    inode_gauge_ = AbstractCatalogManager<CatalogT>::kInodeOffset;
136
137
1
    CatalogT *new_root = CreateCatalog(PathString("", 0), catalog_hash, NULL);
138
1
    assert(new_root);
139
1
    bool retval = AttachCatalog(catalog_path, new_root);
140
1
    assert(retval);
141
142
1
    if (inode_annotation_) {
143
      inode_annotation_->IncGeneration(old_inode_gauge);
144
    }
145
  }
146
1
  CheckInodeWatermark();
147
1
  Unlock();
148
149
1
  return load_error;
150
}
151
152
153
/**
154
 * Detaches everything except the root catalog
155
 */
156
template <class CatalogT>
157
void AbstractCatalogManager<CatalogT>::DetachNested() {
158
  WriteLock();
159
  if (catalogs_.empty()) {
160
    Unlock();
161
    return;
162
  }
163
164
  typename CatalogList::const_iterator i;
165
  typename CatalogList::const_iterator iend;
166
  CatalogList catalogs_to_detach = GetRootCatalog()->GetChildren();
167
  for (i = catalogs_to_detach.begin(), iend = catalogs_to_detach.end();
168
       i != iend; ++i)
169
  {
170
    DetachSubtree(*i);
171
  }
172
173
  Unlock();
174
}
175
176
177
/**
178
 * Returns the NULL hash if the nested catalog is not found.
179
 */
180
template <class CatalogT>
181
5
shash::Any AbstractCatalogManager<CatalogT>::GetNestedCatalogHash(
182
  const PathString &mountpoint)
183
{
184
5
  assert(!mountpoint.IsEmpty());
185
5
  CatalogT *catalog = FindCatalog(mountpoint);
186
5
  assert(catalog != NULL);
187
5
  if (catalog->mountpoint() == mountpoint) {
188
2
    catalog = catalog->parent();
189
2
    assert(catalog != NULL);
190
  }
191
5
  shash::Any result;
192
  uint64_t size;
193
5
  catalog->FindNested(mountpoint, &result, &size);
194
5
  return result;
195
}
196
197
198
/**
199
 * Perform a lookup for a specific DirectoryEntry in the catalogs.
200
 * @param path      the path to find in the catalogs
201
 * @param options   whether to perform another lookup to get the parent entry,
202
 *                  too
203
 * @param dirent    the resulting DirectoryEntry, or special Negative entry
204
 *                  Note: can be set to zero if the result is not important
205
 * @return true if lookup succeeded otherwise false
206
 */
207
template <class CatalogT>
208
32
bool AbstractCatalogManager<CatalogT>::LookupPath(const PathString &path,
209
                                        const LookupOptions options,
210
                                        DirectoryEntry *dirent)
211
{
212
  // initialize as non-negative
213
32
  assert(dirent);
214
32
  *dirent = DirectoryEntry();
215
216
  // create a dummy negative directory entry
217
  const DirectoryEntry dirent_negative =
218
32
    DirectoryEntry(catalog::kDirentNegative);
219
220
32
  EnforceSqliteMemLimit();
221
32
  ReadLock();
222
223
32
  CatalogT *best_fit = FindCatalog(path);
224
32
  assert(best_fit != NULL);
225
226
32
  perf::Inc(statistics_.n_lookup_path);
227
32
  LogCvmfs(kLogCatalog, kLogDebug, "looking up '%s' in catalog: '%s'",
228
           path.c_str(), best_fit->mountpoint().c_str());
229
32
  bool found = best_fit->LookupPath(path, dirent);
230
231
  // Possibly in a nested catalog
232

32
  if (!found && MountSubtree(path, best_fit, NULL)) {
233
8
    LogCvmfs(kLogCatalog, kLogDebug, "looking up '%s' in a nested catalog",
234
             path.c_str());
235
8
    Unlock();
236
8
    WriteLock();
237
    // Check again to avoid race
238
8
    best_fit = FindCatalog(path);
239
8
    assert(best_fit != NULL);
240
8
    perf::Inc(statistics_.n_lookup_path);
241
8
    found = best_fit->LookupPath(path, dirent);
242
243
8
    if (!found) {
244
8
      LogCvmfs(kLogCatalog, kLogDebug,
245
               "entry not found, we may have to load nested catalogs");
246
247
      CatalogT *nested_catalog;
248
8
      found = MountSubtree(path, best_fit, &nested_catalog);
249
250
8
      if (!found) {
251
        LogCvmfs(kLogCatalog, kLogDebug,
252
                 "failed to load nested catalog for '%s'", path.c_str());
253
        goto lookup_path_notfound;
254
      }
255
256
8
      if (nested_catalog != best_fit) {
257
8
        perf::Inc(statistics_.n_lookup_path);
258
8
        found = nested_catalog->LookupPath(path, dirent);
259
8
        if (!found) {
260
          LogCvmfs(kLogCatalog, kLogDebug,
261
                   "nested catalogs loaded but entry '%s' was still not found",
262
                   path.c_str());
263
          if (dirent != NULL) *dirent = dirent_negative;
264
          goto lookup_path_notfound;
265
        } else {
266
8
          best_fit = nested_catalog;
267
        }
268
      } else {
269
        LogCvmfs(kLogCatalog, kLogDebug, "no nested catalog fits");
270
        if (dirent != NULL) *dirent = dirent_negative;
271
        goto lookup_path_notfound;
272
      }
273
    }
274
8
    assert(found);
275
  }
276
  // Not in a nested catalog (because no nested cataog fits), ENOENT
277
32
  if (!found) {
278
3
    LogCvmfs(kLogCatalog, kLogDebug, "ENOENT: '%s'", path.c_str());
279
3
    if (dirent != NULL) *dirent = dirent_negative;
280
3
    goto lookup_path_notfound;
281
  }
282
283
29
  LogCvmfs(kLogCatalog, kLogDebug, "found entry '%s' in catalog '%s'",
284
           path.c_str(), best_fit->mountpoint().c_str());
285
286
29
  if ((options & kLookupRawSymlink) == kLookupRawSymlink) {
287
    LinkString raw_symlink;
288
    bool retval = best_fit->LookupRawSymlink(path, &raw_symlink);
289
    assert(retval);  // Must be true, we have just found the entry
290
    dirent->set_symlink(raw_symlink);
291
  }
292
293
29
  Unlock();
294
29
  return true;
295
296
 lookup_path_notfound:
297
3
  Unlock();
298
  // Includes both: ENOENT and not found due to I/O error
299
3
  perf::Inc(statistics_.n_lookup_path_negative);
300
3
  return false;
301
}
302
303
304
/**
305
 * Perform a lookup for Nested Catalog that serves this path.
306
 *  If the path specified is a catalog mountpoint the catalog at that point is
307
 *  mounted and returned.
308
 * @param path       the path to find in the catalogs
309
 * @param mountpoint the path to the nested catalog found
310
 * @param hash       the hash of the nested catalog found
311
 * @param size       the size of the nested catalog, 0 for root. Root is not a
312
 *                   nested catalog in the database.
313
 * @return true if lookup succeeded otherwise false (available catalog failed
314
 *         to mount)
315
 */
316
template <class CatalogT>
317
3
bool AbstractCatalogManager<CatalogT>::LookupNested(
318
  const PathString &path,
319
  PathString *mountpoint,
320
  shash::Any *hash,
321
  uint64_t *size
322
) {
323
3
  EnforceSqliteMemLimit();
324
3
  bool result = false;
325
3
  ReadLock();
326
327
  // Look past current path to mount up to intended location
328
3
  PathString catalog_path(path);
329
3
  catalog_path.Append("/.cvmfscatalog", 14);
330
331
  // Find catalog, possibly load nested
332
3
  CatalogT *best_fit = FindCatalog(catalog_path);
333
3
  CatalogT *catalog = best_fit;
334
3
  if (MountSubtree(catalog_path, best_fit, NULL)) {
335
1
    Unlock();
336
1
    WriteLock();
337
    // Check again to avoid race
338
1
    best_fit = FindCatalog(catalog_path);
339
1
    result = MountSubtree(catalog_path, best_fit, &catalog);
340
    // Result is false if an available catalog failed to load (error happened)
341
1
    if (!result) {
342
      Unlock();
343
      return false;
344
    }
345
  }
346
347
  // If the found catalog is the Root there is no parent to lookup
348
3
  if (catalog->HasParent()) {
349
2
    result = catalog->parent()->FindNested(catalog->root_prefix(), hash, size);
350
  }
351
352
  // Mountpoint now points to the found catalog
353
3
  mountpoint->Assign(catalog->root_prefix());
354
355
  // If the result is false, it means that no nested catalog was found for
356
  // this path. As the root catalog does not have a Nested Catalog of
357
  // itself, we manually set the values and leave the size as 0.
358
  // TODO(nhazekam) Allow for Root Catalog to be returned
359
3
  if (!result) {
360
1
    *hash =  GetRootCatalog()->hash();
361
1
    *size = 0;
362
1
    result = true;
363
  }
364
365
3
  Unlock();
366
3
  return result;
367
}
368
369
370
/**
371
 * Create a listing of the parents, catalog, and children of the catalog
372
 *  that serves the specified path.
373
 *  If the path specified is a catalog mountpoint the catalog at that point is
374
 *  mounted and returned.
375
 * @param path        the path to find in the catalogs
376
 * @param result_list the list where the results will be added.
377
 * @return true if the list could be created, false if catalog fails to mount.
378
 */
379
template <class CatalogT>
380
2
bool AbstractCatalogManager<CatalogT>::ListCatalogSkein(
381
  const PathString &path,
382
  std::vector<PathString> *result_list
383
) {
384
2
  EnforceSqliteMemLimit();
385
  bool result;
386
2
  ReadLock();
387
388
  // Look past current path to mount up to intended location
389
2
  PathString test(path);
390
2
  test.Append("/.cvmfscatalog", 14);
391
392
  // Find catalog, possibly load nested
393
2
  CatalogT *best_fit = FindCatalog(test);
394
2
  CatalogT *catalog = best_fit;
395
  // True if there is an available nested catalog
396
2
  if (MountSubtree(test, best_fit, NULL)) {
397
    Unlock();
398
    WriteLock();
399
    // Check again to avoid race
400
    best_fit = FindCatalog(test);
401
    result = MountSubtree(test, best_fit, &catalog);
402
    // result is false if an available catalog failed to load
403
    if (!result) {
404
      Unlock();
405
      return false;
406
    }
407
  }
408
409
  // Build listing
410
2
  CatalogT *cur_parent = catalog->parent();
411
2
  if (cur_parent) {
412
    // Walk up parent tree to find base
413
2
    std::vector<catalog::Catalog*> parents;
414
5
    while (cur_parent->HasParent()) {
415
1
      parents.push_back(cur_parent);
416
1
      cur_parent = cur_parent->parent();
417
    }
418
2
    parents.push_back(cur_parent);
419
7
    while (!parents.empty()) {
420
      // Add to list in order starting at root
421
3
      result_list->push_back(parents.back()->root_prefix());
422
3
      parents.pop_back();
423
    }
424
  }
425
  // Add the current catalog
426
2
  result_list->push_back(catalog->root_prefix());
427
428
2
  Catalog::NestedCatalogList children = catalog->ListOwnNestedCatalogs();
429
430
  // Add all children nested catalogs
431
6
  for (unsigned i = 0; i < children.size(); i++) {
432
4
    result_list->push_back(children.at(i).mountpoint);
433
  }
434
435
2
  Unlock();
436
2
  return true;
437
}
438
439
440
template <class CatalogT>
441
bool AbstractCatalogManager<CatalogT>::LookupXattrs(
442
  const PathString &path,
443
  XattrList *xattrs)
444
{
445
  EnforceSqliteMemLimit();
446
  bool result;
447
  ReadLock();
448
449
  // Find catalog, possibly load nested
450
  CatalogT *best_fit = FindCatalog(path);
451
  CatalogT *catalog = best_fit;
452
  if (MountSubtree(path, best_fit, NULL)) {
453
    Unlock();
454
    WriteLock();
455
    // Check again to avoid race
456
    best_fit = FindCatalog(path);
457
    result = MountSubtree(path, best_fit, &catalog);
458
    if (!result) {
459
      Unlock();
460
      return false;
461
    }
462
  }
463
464
  perf::Inc(statistics_.n_lookup_xattrs);
465
  result = catalog->LookupXattrsPath(path, xattrs);
466
467
  Unlock();
468
  return result;
469
}
470
471
472
/**
473
 * Do a listing of the specified directory.
474
 * @param path the path of the directory to list
475
 * @param listing the resulting DirectoryEntryList
476
 * @return true if listing succeeded otherwise false
477
 */
478
template <class CatalogT>
479
25
bool AbstractCatalogManager<CatalogT>::Listing(const PathString &path,
480
                                     DirectoryEntryList *listing)
481
{
482
25
  EnforceSqliteMemLimit();
483
  bool result;
484
25
  ReadLock();
485
486
  // Find catalog, possibly load nested
487
25
  CatalogT *best_fit = FindCatalog(path);
488
25
  CatalogT *catalog = best_fit;
489
25
  if (MountSubtree(path, best_fit, NULL)) {
490
9
    Unlock();
491
9
    WriteLock();
492
    // Check again to avoid race
493
9
    best_fit = FindCatalog(path);
494
9
    result = MountSubtree(path, best_fit, &catalog);
495
9
    if (!result) {
496
      Unlock();
497
      return false;
498
    }
499
  }
500
501
25
  perf::Inc(statistics_.n_listing);
502
25
  result = catalog->ListingPath(path, listing);
503
504
25
  Unlock();
505
25
  return result;
506
}
507
508
509
/**
510
 * Do a listing of the specified directory, return only struct stat values.
511
 * @param path the path of the directory to list
512
 * @param listing the resulting StatEntryList
513
 * @return true if listing succeeded otherwise false
514
 */
515
template <class CatalogT>
516
1
bool AbstractCatalogManager<CatalogT>::ListingStat(const PathString &path,
517
                                        StatEntryList *listing)
518
{
519
1
  EnforceSqliteMemLimit();
520
  bool result;
521
1
  ReadLock();
522
523
  // Find catalog, possibly load nested
524
1
  CatalogT *best_fit = FindCatalog(path);
525
1
  CatalogT *catalog = best_fit;
526
1
  if (MountSubtree(path, best_fit, NULL)) {
527
    Unlock();
528
    WriteLock();
529
    // Check again to avoid race
530
    best_fit = FindCatalog(path);
531
    result = MountSubtree(path, best_fit, &catalog);
532
    if (!result) {
533
      Unlock();
534
      return false;
535
    }
536
  }
537
538
1
  perf::Inc(statistics_.n_listing);
539
1
  result = catalog->ListingPathStat(path, listing);
540
541
1
  Unlock();
542
1
  return result;
543
}
544
545
546
/**
547
 * Collect file chunks (if exist)
548
 * @param path the path of the directory to list
549
 * @param interpret_hashes_as hash of the directory entry (by convention the
550
 *        same than the chunk hashes)
551
 * @return true if listing succeeded otherwise false
552
 */
553
template <class CatalogT>
554
bool AbstractCatalogManager<CatalogT>::ListFileChunks(
555
  const PathString &path,
556
  const shash::Algorithms interpret_hashes_as,
557
  FileChunkList *chunks)
558
{
559
  EnforceSqliteMemLimit();
560
  bool result;
561
  ReadLock();
562
563
  // Find catalog, possibly load nested
564
  CatalogT *best_fit = FindCatalog(path);
565
  CatalogT *catalog = best_fit;
566
  if (MountSubtree(path, best_fit, NULL)) {
567
    Unlock();
568
    WriteLock();
569
    // Check again to avoid race
570
    best_fit = FindCatalog(path);
571
    result = MountSubtree(path, best_fit, &catalog);
572
    if (!result) {
573
      Unlock();
574
      return false;
575
    }
576
  }
577
578
  result = catalog->ListPathChunks(path, interpret_hashes_as, chunks);
579
580
  Unlock();
581
  return result;
582
}
583
584
585
template <class CatalogT>
586
41
uint64_t AbstractCatalogManager<CatalogT>::GetRevision() const {
587
41
  ReadLock();
588
41
  const uint64_t revision = revision_cache_;
589
41
  Unlock();
590
41
  return revision;
591
}
592
593
594
template <class CatalogT>
595
37
bool AbstractCatalogManager<CatalogT>::GetVOMSAuthz(std::string *authz) const {
596
37
  ReadLock();
597
37
  const bool has_authz = has_authz_cache_;
598

37
  if (has_authz && authz)
599
    *authz = authz_cache_;
600
37
  Unlock();
601
37
  return has_authz;
602
}
603
604
605
template <class CatalogT>
606
bool AbstractCatalogManager<CatalogT>::HasExplicitTTL() const {
607
  ReadLock();
608
  const bool result = GetRootCatalog()->HasExplicitTTL();
609
  Unlock();
610
  return result;
611
}
612
613
614
template <class CatalogT>
615
1
uint64_t AbstractCatalogManager<CatalogT>::GetTTL() const {
616
1
  ReadLock();
617
1
  const uint64_t ttl = GetRootCatalog()->GetTTL();
618
1
  Unlock();
619
1
  return ttl;
620
}
621
622
623
template <class CatalogT>
624
32
int AbstractCatalogManager<CatalogT>::GetNumCatalogs() const {
625
32
  ReadLock();
626
32
  int result = catalogs_.size();
627
32
  Unlock();
628
32
  return result;
629
}
630
631
632
/**
633
 * Gets a formatted tree of the currently attached catalogs
634
 */
635
template <class CatalogT>
636
string AbstractCatalogManager<CatalogT>::PrintHierarchy() const {
637
  ReadLock();
638
  const string output = PrintHierarchyRecursively(GetRootCatalog(), 0);
639
  Unlock();
640
  return output;
641
}
642
643
644
/**
645
 * Assigns the next free numbers in the 64 bit space
646
 */
647
template <class CatalogT>
648
98
InodeRange AbstractCatalogManager<CatalogT>::AcquireInodes(uint64_t size) {
649
98
  InodeRange result;
650
98
  result.offset = inode_gauge_;
651
98
  result.size = size;
652
653
98
  inode_gauge_ += size;
654
98
  LogCvmfs(kLogCatalog, kLogDebug, "allocating inodes from %d to %d.",
655
           result.offset + 1, inode_gauge_);
656
657
98
  return result;
658
}
659
660
661
/**
662
 * Called if a catalog is detached which renders the associated InodeChunk
663
 * invalid.
664
 * @param chunk the InodeChunk to be freed
665
 */
666
template <class CatalogT>
667
98
void AbstractCatalogManager<CatalogT>::ReleaseInodes(const InodeRange chunk) {
668
  // TODO(jblomer) currently inodes are only released on remount
669
98
}
670
671
672
/**
673
 * Find the catalog leaf in the tree that fits the path.
674
 * The path might be served by a not yet loaded nested catalog.
675
 * @param path the path a catalog is searched for
676
 * @return the catalog which is best fitting at the given path
677
 */
678
template <class CatalogT>
679
197
CatalogT* AbstractCatalogManager<CatalogT>::FindCatalog(
680
    const PathString &path) const {
681
197
  assert(catalogs_.size() > 0);
682
683
  // Start at the root catalog and successively go down the catalog tree
684
197
  CatalogT *best_fit = GetRootCatalog();
685
197
  CatalogT *next_fit = NULL;
686
417
  while (best_fit->mountpoint() != path) {
687
195
    next_fit = best_fit->FindSubtree(path);
688
195
    if (next_fit == NULL)
689
172
      break;
690
23
    best_fit = next_fit;
691
  }
692
693
197
  return best_fit;
694
}
695
696
697
/**
698
 * Checks if a searched catalog is already mounted to this CatalogManager
699
 * @param root_path the root path of the searched catalog
700
 * @param attached_catalog is set to the searched catalog, if not NULL
701
 * @return true if catalog is already present, false otherwise
702
 */
703
template <class CatalogT>
704
105
bool AbstractCatalogManager<CatalogT>::IsAttached(const PathString &root_path,
705
                                        CatalogT **attached_catalog) const
706
{
707
105
  if (catalogs_.size() == 0)
708
78
    return false;
709
710
27
  CatalogT *best_fit = FindCatalog(root_path);
711
27
  if (best_fit->mountpoint() != root_path)
712
23
    return false;
713
714
4
  if (attached_catalog != NULL) *attached_catalog = best_fit;
715
4
  return true;
716
}
717
718
719
/**
720
 * Recursively mounts all nested catalogs required to serve a path.
721
 * If leaf_catalog is NULL, just indicate if it is necessary to load a
722
 * nested catalog for the given path.
723
 * The final leaf nested catalog is returned.
724
 */
725
template <class CatalogT>
726
171
bool AbstractCatalogManager<CatalogT>::MountSubtree(const PathString &path,
727
                                          const CatalogT *entry_point,
728
                                          CatalogT **leaf_catalog)
729
{
730
171
  bool result = true;
731
  CatalogT *parent = (entry_point == NULL) ?
732
171
                    GetRootCatalog() : const_cast<CatalogT *>(entry_point);
733
171
  assert(path.StartsWith(parent->mountpoint()));
734
735
  // Try to find path as a super string of nested catalog mount points
736
171
  PathString path_slash(path);
737
171
  path_slash.Append("/", 1);
738
171
  perf::Inc(statistics_.n_nested_listing);
739
  typedef typename CatalogT::NestedCatalogList NestedCatalogList;
740
  const NestedCatalogList& nested_catalogs =
741
171
    parent->ListNestedCatalogs();
742

342
  for (typename NestedCatalogList::const_iterator i = nested_catalogs.begin(),
743
171
       iEnd = nested_catalogs.end(); i != iEnd; ++i)
744
  {
745
    // Next nesting level
746
78
    PathString nested_path_slash(i->mountpoint);
747
78
    nested_path_slash.Append("/", 1);
748
78
    if (path_slash.StartsWith(nested_path_slash)) {
749
45
      if (leaf_catalog == NULL)
750
18
        return true;
751
      CatalogT *new_nested;
752
27
      LogCvmfs(kLogCatalog, kLogDebug, "load nested catalog at %s",
753
               i->mountpoint.c_str());
754
      // prevent endless recursion with corrupted catalogs
755
      // (due to reloading root)
756
27
      if (i->hash.IsNull())
757
        return false;
758
27
      new_nested = MountCatalog(i->mountpoint, i->hash, parent);
759
27
      if (!new_nested)
760
        return false;
761
762
27
      result = MountSubtree(path, new_nested, &parent);
763
      break;
764
    }
765
  }
766
767
153
  if (leaf_catalog == NULL)
768
24
    return false;
769
129
  *leaf_catalog = parent;
770
129
  return result;
771
}
772
773
774
/**
775
 * Load a catalog file and attach it to the tree of Catalog objects.
776
 * Loading of catalogs is implemented by derived classes.
777
 */
778
template <class CatalogT>
779
105
CatalogT *AbstractCatalogManager<CatalogT>::MountCatalog(
780
                                              const PathString &mountpoint,
781
                                              const shash::Any &hash,
782
                                              CatalogT *parent_catalog)
783
{
784
105
  CatalogT *attached_catalog = NULL;
785
105
  if (IsAttached(mountpoint, &attached_catalog))
786
4
    return attached_catalog;
787
788
101
  string     catalog_path;
789
101
  shash::Any catalog_hash;
790
  const LoadError retval =
791
101
    LoadCatalog(mountpoint, hash, &catalog_path, &catalog_hash);
792

101
  if ((retval == kLoadFail) || (retval == kLoadNoSpace)) {
793
11
    LogCvmfs(kLogCatalog, kLogDebug, "failed to load catalog '%s' (%d - %s)",
794
             mountpoint.c_str(), retval, Code2Ascii(retval));
795
11
    return NULL;
796
  }
797
798
90
  attached_catalog = CreateCatalog(mountpoint, catalog_hash, parent_catalog);
799
800
  // Attach loaded catalog
801
90
  if (!AttachCatalog(catalog_path, attached_catalog)) {
802
    LogCvmfs(kLogCatalog, kLogDebug, "failed to attach catalog '%s'",
803
             mountpoint.c_str());
804
    UnloadCatalog(attached_catalog);
805
    return NULL;
806
  }
807
808

90
  if ((catalog_watermark_ > 0) && (catalogs_.size() >= catalog_watermark_)) {
809
3
    DetachSiblings(mountpoint);
810
  }
811
812
90
  return attached_catalog;
813
}
814
815
816
/**
817
 * Attaches a newly created catalog.
818
 * @param db_path the file on a local file system containing the database
819
 * @param new_catalog the catalog to attach to this CatalogManager
820
 * @return true on success, false otherwise
821
 */
822
template <class CatalogT>
823
98
bool AbstractCatalogManager<CatalogT>::AttachCatalog(const string &db_path,
824
                                           CatalogT *new_catalog)
825
{
826
98
  LogCvmfs(kLogCatalog, kLogDebug, "attaching catalog file %s",
827
           db_path.c_str());
828
829
  // Initialize the new catalog
830
98
  if (!new_catalog->OpenDatabase(db_path)) {
831
    LogCvmfs(kLogCatalog, kLogDebug, "initialization of catalog %s failed",
832
             db_path.c_str());
833
    return false;
834
  }
835
836
  // Determine the inode offset of this catalog
837
98
  uint64_t inode_chunk_size = new_catalog->max_row_id();
838
98
  InodeRange range = AcquireInodes(inode_chunk_size);
839
98
  new_catalog->set_inode_range(range);
840
98
  new_catalog->SetInodeAnnotation(inode_annotation_);
841
98
  new_catalog->SetOwnerMaps(&uid_map_, &gid_map_);
842
843
  // Add catalog to the manager
844
98
  if (!new_catalog->IsInitialized()) {
845
    LogCvmfs(kLogCatalog, kLogDebug,
846
             "catalog initialization failed (obscure data)");
847
    inode_gauge_ -= inode_chunk_size;
848
    return false;
849
  }
850
98
  CheckInodeWatermark();
851
852
  // The revision of the catalog tree is given by the root catalog revision
853
98
  if (catalogs_.empty()) {
854
68
    revision_cache_ = new_catalog->GetRevision();
855
68
    has_authz_cache_ = new_catalog->GetVOMSAuthz(&authz_cache_);
856
68
    volatile_flag_ = new_catalog->volatile_flag();
857
  }
858
859
98
  catalogs_.push_back(new_catalog);
860
98
  ActivateCatalog(new_catalog);
861
98
  return true;
862
}
863
864
865
/**
866
 * Removes a catalog from this CatalogManager, the catalog pointer is
867
 * freed if the call succeeds.
868
 * This method can create dangling children if a catalog in the middle of
869
 * a tree is removed.
870
 * @param catalog the catalog to detach
871
 * @return true on success, false otherwise
872
 */
873
template <class CatalogT>
874
98
void AbstractCatalogManager<CatalogT>::DetachCatalog(CatalogT *catalog) {
875
98
  if (catalog->HasParent())
876
30
    catalog->parent()->RemoveChild(catalog);
877
878
98
  ReleaseInodes(catalog->inode_range());
879
98
  UnloadCatalog(catalog);
880
881
  // Delete catalog from internal lists
882
98
  typename CatalogList::iterator i;
883
98
  typename CatalogList::const_iterator iend;
884
146
  for (i = catalogs_.begin(), iend = catalogs_.end(); i != iend; ++i) {
885
146
    if (*i == catalog) {
886
98
      catalogs_.erase(i);
887
98
      delete catalog;
888
98
      return;
889
    }
890
  }
891
892
  assert(false);
893
}
894
895
896
/**
897
 * Removes a catalog (and all of it's children) from this CatalogManager.
898
 * The given catalog and all children are freed, if this call succeeds.
899
 * @param catalog the catalog to detach
900
 * @return true on success, false otherwise
901
 */
902
template <class CatalogT>
903
98
void AbstractCatalogManager<CatalogT>::DetachSubtree(CatalogT *catalog) {
904
  // Detach all child catalogs recursively
905
98
  typename CatalogList::const_iterator i;
906
98
  typename CatalogList::const_iterator iend;
907
98
  CatalogList catalogs_to_detach = catalog->GetChildren();
908
127
  for (i = catalogs_to_detach.begin(), iend = catalogs_to_detach.end();
909
       i != iend; ++i)
910
  {
911
29
    DetachSubtree(*i);
912
  }
913
914
98
  DetachCatalog(catalog);
915
98
}
916
917
918
/**
919
 * Detaches all nested catalogs that are not on a prefix of the given tree.
920
 * Used when the catalog_watermark_ is surpassed.
921
 */
922
template <class CatalogT>
923
4
void AbstractCatalogManager<CatalogT>::DetachSiblings(
924
  const PathString &current_tree)
925
{
926
  bool again;
927
4
  do {
928
4
    again = false;
929
4
    unsigned N = catalogs_.size();
930
12
    for (unsigned i = 0; i < N; ++i) {
931

9
      if (!HasPrefix(current_tree.ToString(),
932
                     catalogs_[i]->mountpoint().ToString(),
933
                     false /* ignore_case */))
934
      {
935
1
        DetachSubtree(catalogs_[i]);
936
1
        again = true;
937
1
        break;
938
      }
939
    }
940
  } while (again);
941
3
}
942
943
944
/**
945
 * Formats the catalog hierarchy
946
 */
947
template <class CatalogT>
948
string AbstractCatalogManager<CatalogT>::PrintHierarchyRecursively(
949
                                                      const CatalogT *catalog,
950
                                                      const int level) const
951
{
952
  string output;
953
954
  // Indent according to level
955
  for (int i = 0; i < level; ++i)
956
    output += "    ";
957
958
  output += "-> " + string(catalog->mountpoint().GetChars(),
959
                           catalog->mountpoint().GetLength())
960
            + "\n";
961
962
  CatalogList children = catalog->GetChildren();
963
  typename CatalogList::const_iterator i = children.begin();
964
  typename CatalogList::const_iterator iend = children.end();
965
  for (; i != iend; ++i) {
966
    output += PrintHierarchyRecursively(*i, level + 1);
967
  }
968
969
  return output;
970
}
971
972
973
template <class CatalogT>
974
std::string AbstractCatalogManager<CatalogT>::PrintMemStatsRecursively(
975
  const CatalogT *catalog) const
976
{
977
  string result = catalog->PrintMemStatistics() + "\n";
978
979
  CatalogList children = catalog->GetChildren();
980
  typename CatalogList::const_iterator i = children.begin();
981
  typename CatalogList::const_iterator iend = children.end();
982
  for (; i != iend; ++i) {
983
    result += PrintMemStatsRecursively(*i);
984
  }
985
  return result;
986
}
987
988
989
/**
990
 * Statistics from all catalogs
991
 */
992
template <class CatalogT>
993
std::string AbstractCatalogManager<CatalogT>::PrintAllMemStatistics() const {
994
  string result;
995
  ReadLock();
996
  result = PrintMemStatsRecursively(GetRootCatalog());
997
  Unlock();
998
  return result;
999
}
1000
1001
1002
template <class CatalogT>
1003
55
void AbstractCatalogManager<CatalogT>::EnforceSqliteMemLimit() {
1004
  char *mem_enforced =
1005
55
    static_cast<char *>(pthread_getspecific(pkey_sqlitemem_));
1006
55
  if (mem_enforced == NULL) {
1007
14
    sqlite3_soft_heap_limit(kSqliteMemPerThread);
1008
14
    pthread_setspecific(pkey_sqlitemem_, reinterpret_cast<char *>(1));
1009
  }
1010
55
}
1011
1012
}  // namespace catalog
1013
1014
1015
1016
1017
#endif  // CVMFS_CATALOG_MGR_IMPL_H_