GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/catalog_traversal.h Lines: 195 210 92.9 %
Date: 2019-02-03 02:48:13 Branches: 105 318 33.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_CATALOG_TRAVERSAL_H_
6
#define CVMFS_CATALOG_TRAVERSAL_H_
7
8
#include <cassert>
9
#include <limits>
10
#include <set>
11
#include <stack>
12
#include <string>
13
#include <vector>
14
15
#include "catalog.h"
16
#include "compression.h"
17
#include "history_sqlite.h"
18
#include "logging.h"
19
#include "manifest.h"
20
#include "object_fetcher.h"
21
#include "signature.h"
22
#include "util_concurrency.h"
23
24
namespace catalog {
25
class Catalog;
26
class WritableCatalog;
27
}
28
29
namespace swissknife {
30
31
/**
32
 * Callback data which has to be implemented by the registered callback
33
 * functions/methods (see Observable<> for further details)
34
 * @param catalog             the catalog object which needs to be processed
35
 * @param catalog_hash        the content hash of the catalog
36
 * @param tree_level          the depth in the nested catalog tree
37
 *                            (starting at zero)
38
 * @param file_size           the size of the downloaded catalog database file
39
 * @param history_depth       the distance from the current HEAD revision
40
 *                            (current HEAD has history_depth 0)
41
 */
42
template <class CatalogT>
43
struct CatalogTraversalData {
44
6191
  CatalogTraversalData(const CatalogT     *catalog,
45
                       const shash::Any   &catalog_hash,
46
                       const unsigned      tree_level,
47
                       const size_t        file_size,
48
                       const unsigned int  history_depth)
49
  : catalog(catalog)
50
  , catalog_hash(catalog_hash)
51
  , tree_level(tree_level)
52
  , file_size(file_size)
53
6191
  , history_depth(history_depth) {}
54
55
  const CatalogT     *catalog;
56
  const shash::Any    catalog_hash;
57
  const unsigned int  tree_level;
58
  const size_t        file_size;
59
  const unsigned int  history_depth;
60
};
61
62
63
/**
64
 * A layer to extract information from a catalog.  Users of the catalog
65
 * traversal can provide derived classes to overwrite behavior.  Currently used
66
 * to get the last-modified timestamp in configurable manner: for the garbage
67
 * collection, the timestamp of the catalog hash in the reflog counts, which
68
 * is the same or newer than the one stored in the catalog.
69
 */
70
template <class CatalogT>
71
176
class CatalogTraversalInfoShim {
72
 public:
73
176
  virtual ~CatalogTraversalInfoShim() { }
74
75
  // Default implementation: use the catalog's timestamp
76
450
  virtual uint64_t GetLastModified(const CatalogT *catalog) {
77
450
    return catalog->GetLastModified();
78
  }
79
};
80
81
82
/**
83
 * This class traverses the catalog hierarchy of a CVMFS repository recursively.
84
 * Also historic catalog trees can be traversed. The user needs to specify a
85
 * callback which is called for each catalog on the way.
86
 *
87
 * CatalogTraversal<> can be configured and used in various ways:
88
 *   -> Historic catalog traversal
89
 *   -> Prune catalogs below a certain history level
90
 *   -> Prune catalogs older than a certain threshold timestamp
91
 *   -> Never traverse a certain catalog twice
92
 *   -> Breadth First Traversal or Depth First Traversal
93
 *   -> Optional catalog memory management (no_close)
94
 *   -> Use all Named Snapshots of a repository as traversal entry point
95
 *   -> Traverse starting from a provided catalog
96
 *   -> Traverse catalogs that were previously skipped
97
 *   -> Produce various flavours of catalogs (writable, mocked, ...)
98
 *
99
 * Breadth First Traversal Strategy
100
 *   Catalogs are handed out to the user identical as they are traversed.
101
 *   Say: From top to buttom. When you would simply print each received catalog
102
 *        the result would be a nice representation of the catalog tree.
103
 *   This method is more efficient, because catalogs are opened, processed and
104
 *   thrown away directly afterwards.
105
 *
106
 * Depth First Traversal Strategy
107
 *   The user gets the catalog tree starting from the leaf nodes.
108
 *   Say: From bottom to top. A user can assume that he got all children or
109
 *        historical ancestors of a catalog before.
110
 *   This method climbs down the full catalog tree and hands it out 'in reverse
111
 *   order'. Thus, catalogs on the way are opened, checked for their descendants
112
 *   and closed. Once all children and historical ancestors are processed, it is
113
 *   re-opened and handed out to the user.
114
 *   Note: This method needs more disk space to temporarily store downloaded but
115
 *         not yet processed catalogs.
116
 *
117
 * Note: Since all CVMFS catalog files together can grow to several gigabytes in
118
 *       file size, each catalog is loaded, processed and removed immediately
119
 *       afterwards. Except if no_close is specified, which allows the user to
120
 *       choose when a catalog should be closed. Keep in mind, that a user is
121
 *       responsible for both deletion of the delivered catalog objects as well
122
 *       as unlinking of the catalog database file.
123
 *
124
 *
125
 * @param ObjectFetcherT  Strategy Pattern implementation that defines how to
126
 *                        retrieve catalogs from various backend storage types.
127
 *                        Furthermore the ObjectFetcherT::CatalogTN is the type
128
 *                        of catalog to be instantiated by CatalogTraversal<>.
129
 *
130
 * CAUTION: the CatalogTN* pointer passed into the callback becomes invalid
131
 *          directly after the callback method returns, unless you create the
132
 *          CatalogTraversal object with no_close = true.
133
 */
134
template<class ObjectFetcherT>
135
class CatalogTraversal
136
  : public Observable<CatalogTraversalData<typename ObjectFetcherT::CatalogTN> >
137

154
{
138
 public:
139
  typedef ObjectFetcherT                      ObjectFetcherTN;
140
  typedef typename ObjectFetcherT::CatalogTN  CatalogTN;
141
  typedef typename ObjectFetcherT::HistoryTN  HistoryTN;
142
  typedef CatalogTraversalData<CatalogTN>     CallbackDataTN;
143
144
  /**
145
   * @param repo_url             the path to the repository to be traversed:
146
   *                             -> either absolute path to the local catalogs
147
   *                             -> or an URL to a remote repository
148
   * @param repo_name            fully qualified repository name (used for remote
149
   *                             repository signature check) (optional)
150
   * @param repo_keys            a comma separated list of public key file
151
   *                             locations to verify the repository manifest file
152
   * @param history              depth of the desired catalog history traversal
153
   *                             (default: 0 - only HEAD catalogs are traversed)
154
   * @param timestamp            timestamp of history traversal threshold
155
   *                             (default: 0 - no threshold, traverse everything)
156
   * @param no_repeat_history    keep track of visited catalogs and don't re-visit
157
   *                             them in previous revisions
158
   * @param no_close             do not close catalogs after they were attached
159
   *                             (catalogs retain their parent/child pointers)
160
   * @param ignore_load_failure  suppressed an error message if a catalog file
161
   *                             could not be loaded (i.e. was sweeped before by
162
   *                             a garbage collection run)
163
   * @param quiet                silence messages that would go to stderr
164
   * @param tmp_dir              path to the temporary directory to be used
165
   *                             (default: /tmp)
166
   */
167
  struct Parameters {
168
154
    Parameters()
169
      : object_fetcher(NULL)
170
      , history(kNoHistory)
171
      , timestamp(kNoTimestampThreshold)
172
      , no_repeat_history(false)
173
      , no_close(false)
174
      , ignore_load_failure(false)
175
154
      , quiet(false) {}
176
177
    static const unsigned int kFullHistory;
178
    static const unsigned int kNoHistory;
179
    static const time_t       kNoTimestampThreshold;
180
181
    ObjectFetcherT *object_fetcher;
182
183
    unsigned int    history;
184
    time_t          timestamp;
185
    bool            no_repeat_history;
186
    bool            no_close;
187
    bool            ignore_load_failure;
188
    bool            quiet;
189
  };
190
191
 public:
192
  enum TraversalType {
193
    kBreadthFirstTraversal,
194
    kDepthFirstTraversal
195
  };
196
197
 protected:
198
  typedef std::set<shash::Any> HashSet;
199
200
 protected:
201
  /**
202
   * This struct keeps information about a catalog that still needs to be
203
   * traversed by a currently running catalog traversal process.
204
   */
205
33654
  struct CatalogJob {
206
6512
    CatalogJob(const std::string  &path,
207
               const shash::Any   &hash,
208
               const unsigned      tree_level,
209
               const unsigned      history_depth,
210
                     CatalogTN    *parent = NULL) :
211
      path(path),
212
      hash(hash),
213
      tree_level(tree_level),
214
      history_depth(history_depth),
215
      parent(parent),
216
      catalog_file_size(0),
217
      ignore(false),
218
      catalog(NULL),
219
      referenced_catalogs(0),
220
6512
      postponed(false) {}
221
222
7220
    bool IsRootCatalog() const { return tree_level == 0; }
223
224
6191
    CallbackDataTN GetCallbackData() const {
225
      return CallbackDataTN(catalog, hash, tree_level,
226
6191
                            catalog_file_size, history_depth);
227
    }
228
229
    // initial state description
230
    const std::string   path;
231
    const shash::Any    hash;
232
    const unsigned      tree_level;
233
    const unsigned      history_depth;
234
          CatalogTN    *parent;
235
236
    // dynamic processing state (used internally)
237
    std::string   catalog_file_path;
238
    size_t        catalog_file_size;
239
    bool          ignore;
240
    CatalogTN    *catalog;
241
    unsigned int  referenced_catalogs;
242
    bool          postponed;
243
  };
244
245
  typedef std::stack<CatalogJob> CatalogJobStack;
246
247
  /**
248
   * This struct represents a catalog traversal context. It needs to be re-
249
   * created for each catalog traversal process and contains information to this
250
   * specific catalog traversal run.
251
   *
252
   * @param history_depth   the history traversal threshold
253
   * @param traversal_type  either breadth or depth first traversal strategy
254
   * @param catalog_stack   the call stack for catalogs to be traversed
255
   * @param callback_stack  used in depth first traversal for deferred yielding
256
   */
257
229
  struct TraversalContext {
258
229
    TraversalContext(const unsigned       history_depth,
259
                     const time_t         timestamp_threshold,
260
                     const TraversalType  traversal_type) :
261
      history_depth(history_depth),
262
      timestamp_threshold(timestamp_threshold),
263
229
      traversal_type(traversal_type) {}
264
265
    const unsigned       history_depth;
266
    const time_t         timestamp_threshold;
267
    const TraversalType  traversal_type;
268
    CatalogJobStack      catalog_stack;
269
    CatalogJobStack      callback_stack;
270
  };
271
272
 public:
273
  /**
274
   * Constructs a new catalog traversal engine based on the construction
275
   * parameters described in struct ConstructionParams.
276
   */
277
154
  explicit CatalogTraversal(const Parameters &params)
278
    : object_fetcher_(params.object_fetcher)
279
    , catalog_info_shim_(&catalog_info_default_shim_)
280
    , no_close_(params.no_close)
281
    , ignore_load_failure_(params.ignore_load_failure)
282
    , no_repeat_history_(params.no_repeat_history)
283
    , default_history_depth_(params.history)
284
    , default_timestamp_threshold_(params.timestamp)
285

154
    , error_sink_((params.quiet) ? kLogDebug : kLogStderr)
286
  {
287

154
    assert(object_fetcher_ != NULL);
288
154
  }
289
290
291
1
  void SetCatalogInfoShim(CatalogTraversalInfoShim<CatalogTN> *shim) {
292
1
    catalog_info_shim_ = shim;
293
1
  }
294
295
296
  /**
297
   * Starts the traversal process.
298
   * After calling this methods CatalogTraversal will go through all catalogs
299
   * and call the registered callback methods for each found catalog.
300
   * If something goes wrong in the process, the traversal will be cancelled.
301
   *
302
   * @param type   breadths or depth first traversal
303
   * @return       true, when all catalogs were successfully processed. On
304
   *               failure the traversal is cancelled and false is returned.
305
   */
306
97
  bool Traverse(const TraversalType type = kBreadthFirstTraversal) {
307
    TraversalContext ctx(default_history_depth_,
308
                         default_timestamp_threshold_,
309
97
                         type);
310
97
    const shash::Any root_catalog_hash = GetRepositoryRootCatalogHash();
311

97
    if (root_catalog_hash.IsNull()) {
312
      return false;
313
    }
314
97
    Push(root_catalog_hash, &ctx);
315
97
    return DoTraverse(&ctx);
316
  }
317
318
  /**
319
   * Starts the traversal process at the catalog pointed to by the given hash
320
   *
321
   * @param root_catalog_hash  the entry point into the catalog traversal
322
   * @param type               breadths or depth first traversal
323
   * @return                   true when catalogs were successfully traversed
324
   */
325
52
  bool Traverse(const shash::Any     &root_catalog_hash,
326
                const TraversalType   type = kBreadthFirstTraversal) {
327
    // add the root catalog of the repository as the first element on the job
328
    // stack
329
    TraversalContext ctx(default_history_depth_,
330
                         default_timestamp_threshold_,
331
52
                         type);
332
52
    Push(root_catalog_hash, &ctx);
333
52
    return DoTraverse(&ctx);
334
  }
335
336
  /**
337
   * Starts the traversal process at the catalog pointed to by the given hash
338
   * but doesn't traverse into predecessor catalog revisions. This overrides the
339
   * TraversalParameter settings provided at construction.
340
   *
341
   * @param root_catalog_hash  the entry point into the catalog traversal
342
   * @param type               breadths or depth first traversal
343
   * @return                   true when catalogs were successfully traversed
344
   */
345
27
  bool TraverseRevision(const shash::Any     &root_catalog_hash,
346
                        const TraversalType   type = kBreadthFirstTraversal) {
347
    // add the given root catalog as the first element on the job stack
348
    TraversalContext ctx(Parameters::kNoHistory,
349
                         Parameters::kNoTimestampThreshold,
350
27
                         type);
351
27
    Push(root_catalog_hash, &ctx);
352
27
    return DoTraverse(&ctx);
353
  }
354
355
  /**
356
   * Figures out all named tags in a repository and uses all of them as entry
357
   * points into the traversal process.
358
   *
359
   * @param type  breadths or depth first traversal
360
   * @return      true when catalog traversal successfully finished
361
   */
362
53
  bool TraverseNamedSnapshots(
363
    const TraversalType type = kBreadthFirstTraversal)
364
  {
365
    typedef std::vector<shash::Any> HashList;
366
367
    TraversalContext ctx(Parameters::kNoHistory,
368
                         Parameters::kNoTimestampThreshold,
369
53
                         type);
370
53
    UniquePtr<HistoryTN> tag_db;
371
    const typename ObjectFetcherT::Failures retval =
372
53
                                         object_fetcher_->FetchHistory(&tag_db);
373
53
    switch (retval) {
374
      case ObjectFetcherT::kFailOk:
375
49
        break;
376
377
      case ObjectFetcherT::kFailNotFound:
378
4
        LogCvmfs(kLogCatalogTraversal, kLogDebug,
379
                 "didn't find a history database to traverse");
380
4
        return true;
381
382
      default:
383
        LogCvmfs(kLogCatalogTraversal, kLogStderr,
384
                 "failed to download history database (%d - %s)",
385
                 retval, Code2Ascii(retval));
386
        return false;
387
    }
388
389
49
    HashList root_hashes;
390
49
    bool success = tag_db->GetHashes(&root_hashes);
391
49
    assert(success);
392
393
    // traversing referenced named root hashes in reverse chronological order
394
    // to make sure that overlapping history traversals don't leave out catalog
395
    // revisions accidentially
396
49
          HashList::const_reverse_iterator i    = root_hashes.rbegin();
397
49
    const HashList::const_reverse_iterator iend = root_hashes.rend();
398
185
    for (; i != iend; ++i) {
399
136
      Push(*i, &ctx);
400
    }
401
402
49
    return DoTraverse(&ctx);
403
  }
404
405
 protected:
406
  /**
407
   * This controls the actual traversal. Using a stack to traverse down the
408
   * catalog hierarchy. This method implements the traversal itself, but not
409
   * in which way catalogs are handed out to the user code.
410
   *
411
   * Each catalog is processed in these steps:
412
   *  1.) Pop the next catalog from the stack
413
   *        Catalogs are always traversed from latest to oldest revision and
414
   *        from root to leaf nested catalogs
415
   *  2.) Prepare the catalog for traversing
416
   *    2.1.) Check if it was visited before
417
   *    2.2.) Fetch the catalog database from the backend storage
418
   *            This might fail and produce an error. For root catalogs this
419
   *            error can be ignored (might be garbage collected before)
420
   *    2.3.) Open the catalog database
421
   *    2.4.) Check if the catalog is older than the timestamp threshold
422
   *        After these steps the catalog is opened either opened and ready for
423
   *        the traversal to continue, or it was marked for ignore (job.ignore)
424
   *  3.) Check if the catalog is marked to be ignored
425
   *        Catalog might not be loadable (sweeped root catalog) or is too old
426
   *        Note: ignored catalogs can still trigger postponed yields
427
   *  4.) Mark the catalog as visited to be able to skip it later on
428
   *  5.) Find and push referencing catalogs
429
   *        This pushes all descendents of the current catalog onto the stack.
430
   *        Note that this is dependent on the strategy (depth or breadth first)
431
   *        and on the history threshold (see history_depth).
432
   *  6.) Hand the catalog out to the user code
433
   *        Depending on the traversal strategy (depth of breadths first) this
434
   *        might immediately yield zero to N catalogs to the user code.
435
   *
436
   * Note: If anything unexpected goes wrong during the traversal process, it
437
   *       is aborted immediately.
438
   *
439
   * @param ctx   the traversal context that steers the whole traversal process
440
   * @return      true on successful traversal and false on abort
441
   */
442
225
  bool DoTraverse(TraversalContext *ctx) {
443

225
    assert(ctx->callback_stack.empty());
444
445


225
    while (!ctx->catalog_stack.empty()) {
446
      // Get the top most catalog for the next processing step
447
6504
      CatalogJob job = Pop(ctx);
448
449
      // download and open the catalog for processing
450

6504
      if (!PrepareCatalog(*ctx, &job)) {
451
4
        return false;
452
      }
453
454
      // ignored catalogs don't need to be processed anymore but they might
455
      // release postponed yields
456

6500
      if (job.ignore) {
457

309
        if (!HandlePostponedYields(job, ctx)) {
458
          return false;
459
        }
460
309
        continue;
461
      }
462
463
      // push catalogs referenced by the current catalog (onto stack)
464
6191
      MarkAsVisited(job);
465
6191
      PushReferencedCatalogs(&job, ctx);
466
467
      // notify listeners
468

6191
      if (!YieldToListeners(&job, ctx)) {
469
        return false;
470
      }
471
    }
472
473
    // invariant: after the traversal finshed, there should be no more catalogs
474
    //            to traverse or to yield!
475

221
    assert(ctx->catalog_stack.empty());
476

221
    assert(ctx->callback_stack.empty());
477
221
    return true;
478
  }
479
480
481
6504
  bool PrepareCatalog(const TraversalContext &ctx, CatalogJob *job) {
482
    // skipping duplicate catalogs might also yield postponed catalogs
483

6504
    if (ShouldBeSkipped(*job)) {
484
293
      job->ignore = true;
485
293
      return true;
486
    }
487
488
    const typename ObjectFetcherT::Failures retval =
489
      object_fetcher_->FetchCatalog(job->hash,
490
                                    job->path,
491
                                    &job->catalog,
492
                                    !job->IsRootCatalog(),
493
6211
                                    job->parent);
494

6211
    switch (retval) {
495
      case ObjectFetcherT::kFailOk:
496
6191
        break;
497
498
      case ObjectFetcherT::kFailNotFound:
499

20
        if (ignore_load_failure_) {
500
16
          LogCvmfs(kLogCatalogTraversal, kLogDebug, "ignoring missing catalog "
501
                                                    "%s (swept before?)",
502
                   job->hash.ToString().c_str());
503
16
          job->ignore = true;
504
16
          return true;
505
        }
506
507
      default:
508


4
        LogCvmfs(kLogCatalogTraversal, error_sink_, "failed to load catalog %s "
509
                                                    "(%d - %s)",
510
                 job->hash.ToStringWithSuffix().c_str(),
511
                 retval, Code2Ascii(retval));
512
4
        return false;
513
    }
514
515
    // catalogs returned by ObjectFetcher<> are managing their database files by
516
    // default... we need to manage this file manually here
517
6191
    job->catalog->DropDatabaseFileOwnership();
518
519
6191
    job->catalog_file_path = job->catalog->database_path();
520
6191
    job->catalog_file_size = GetFileSize(job->catalog->database_path());
521
522
6191
    return true;
523
  }
524
525
526
555
  bool ReopenCatalog(CatalogJob *job) {
527

555
    assert(!job->ignore);
528

555
    assert(job->catalog == NULL);
529
530
555
    job->catalog = CatalogTN::AttachFreely(job->path,
531
                                           job->catalog_file_path,
532
                                           job->hash,
533
                                           job->parent,
534
                                           !job->IsRootCatalog());
535
536

555
    if (job->catalog == NULL) {
537
      LogCvmfs(kLogCatalogTraversal, error_sink_,
538
               "failed to re-open catalog %s", job->hash.ToString().c_str());
539
      return false;
540
    }
541
542
555
    return true;
543
  }
544
545
546
6466
  bool CloseCatalog(const bool unlink_db, CatalogJob *job) {
547

6466
    delete job->catalog;
548
6466
    job->catalog = NULL;
549



6466
    if (!job->catalog_file_path.empty() && unlink_db) {
550
      const int retval = unlink(job->catalog_file_path.c_str());
551
      if (retval != 0) {
552
        LogCvmfs(kLogCatalogTraversal, error_sink_, "Failed to unlink %s - %d",
553
                 job->catalog_file_path.c_str(), errno);
554
        return false;
555
      }
556
    }
557
558
6466
    return true;
559
  }
560
561
562
  /**
563
   * Checks if a root catalog is below one of the pruning thresholds.
564
   * Pruning thesholds can be either the catalog's history depth or a timestamp
565
   * threshold applied to the last modified timestamp of the catalog.
566
   *
567
   * @param ctx  traversal context for traversal-specific state
568
   * @param job  the job defining the current catalog
569
   * @return     true if either history or timestamp threshold are satisfied
570
   */
571
454
  bool IsBelowPruningThresholds(
572
    const CatalogJob &job,
573
    const TraversalContext &ctx
574
  ) {
575

454
    assert(job.IsRootCatalog());
576

454
    assert(job.catalog != NULL);
577
578
454
    const bool h = job.history_depth >= ctx.history_depth;
579

454
    assert(ctx.timestamp_threshold >= 0);
580
    const bool t =
581
      catalog_info_shim_->GetLastModified(job.catalog) <
582
454
      unsigned(ctx.timestamp_threshold);
583
584


454
    return t || h;
585
  }
586
587
588
6191
  void PushReferencedCatalogs(CatalogJob *job, TraversalContext *ctx) {
589

6191
    assert(!job->ignore);
590

6191
    assert(job->catalog != NULL);
591


6191
    assert(ctx->traversal_type == kBreadthFirstTraversal ||
592
           ctx->traversal_type == kDepthFirstTraversal);
593
594
    // this differs, depending on the traversal strategy.
595
    //
596
    // Breadths First Traversal
597
    //   Catalogs are traversed from top (root catalog) to bottom (leaf
598
    //   catalogs) and from more recent (HEAD revision) to older (historic
599
    //   revisions)
600
    //
601
    // Depth First Traversal
602
    //   Catalogs are traversed from oldest revision (depends on the configured
603
    //   maximal history depth) to the HEAD revision and from bottom (leafs) to
604
    //   top (root catalogs)
605

6191
    job->referenced_catalogs = (ctx->traversal_type == kBreadthFirstTraversal)
606
      ? PushPreviousRevision(*job, ctx) + PushNestedCatalogs(*job, ctx)
607
      : PushNestedCatalogs(*job, ctx) + PushPreviousRevision(*job, ctx);
608
6191
  }
609
610
  /**
611
   * Pushes the previous revision of a (root) catalog.
612
   * @return  the number of catalogs pushed on the processing stack
613
   */
614
6191
  unsigned int PushPreviousRevision(
615
    const CatalogJob &job,
616
    TraversalContext *ctx
617
  ) {
618
    // only root catalogs are used for entering a previous revision (graph)
619

6191
    if (!job.catalog->IsRoot()) {
620
5697
      return 0;
621
    }
622
623
494
    const shash::Any previous_revision = job.catalog->GetPreviousRevision();
624

494
    if (previous_revision.IsNull()) {
625
40
      return 0;
626
    }
627
628
    // check if the next deeper history level is actually requested
629
    // Note: if the current catalog is below the timestamp threshold it will be
630
    //       traversed and only its ancestor revision will not be pushed anymore
631

454
    if (IsBelowPruningThresholds(job, *ctx)) {
632
224
      return 0;
633
    }
634
635
230
    Push(CatalogJob("", previous_revision, 0, job.history_depth + 1, NULL),
636
         ctx);
637
230
    return 1;
638
  }
639
640
  /**
641
   * Pushes all the referenced nested catalogs.
642
   * @return  the number of catalogs pushed on the processing stack
643
   */
644
6191
  unsigned int PushNestedCatalogs(
645
    const CatalogJob &job,
646
    TraversalContext *ctx
647
  ) {
648
    typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
649
6191
    const NestedCatalogList nested = job.catalog->ListOwnNestedCatalogs();
650
6191
    typename NestedCatalogList::const_iterator i    = nested.begin();
651
6191
    typename NestedCatalogList::const_iterator iend = nested.end();
652

6191
    for (; i != iend; ++i) {
653

5970
      CatalogTN* parent = (no_close_) ? job.catalog : NULL;
654
      const CatalogJob new_job(i->mountpoint.ToString(),
655
                               i->hash,
656
                               job.tree_level + 1,
657
                               job.history_depth,
658
5970
                               parent);
659
5970
      Push(new_job, ctx);
660
    }
661
662
6191
    return nested.size();
663
  }
664
665
312
  void Push(const shash::Any &root_catalog_hash, TraversalContext *ctx) {
666
312
    Push(CatalogJob("", root_catalog_hash, 0, 0), ctx);
667
312
  }
668
669
670
6191
  bool YieldToListeners(CatalogJob *job, TraversalContext *ctx) {
671

6191
    assert(!job->ignore);
672

6191
    assert(job->catalog != NULL);
673


6191
    assert(ctx->traversal_type == kBreadthFirstTraversal ||
674
           ctx->traversal_type == kDepthFirstTraversal);
675
676
    // in breadth first search mode, every catalog is simply handed out once
677
    // it is visited. No extra magic required...
678

6191
    if (ctx->traversal_type == kBreadthFirstTraversal) {
679
4725
      return Yield(job);
680
    }
681
682
    // in depth first search mode, catalogs might need to wait until all of
683
    // their referenced catalogs are yielded (ctx.callback_stack)...
684

1466
    assert(ctx->traversal_type == kDepthFirstTraversal);
685

1466
    if (job->referenced_catalogs > 0) {
686
555
      PostponeYield(job, ctx);
687
555
      return true;
688
    }
689
690
    // this catalog can be yielded
691


911
    return Yield(job) && HandlePostponedYields(*job, ctx);
692
  }
693
694
695
 private:
696
  /**
697
   * This actually hands out a catalog to the user code
698
   * It is not called by DoTraversa() directly but by wrapper functions in order
699
   * to provide higher level yielding behaviour.
700
   */
701
6191
  bool Yield(CatalogJob *job) {
702

6191
    assert(!job->ignore);
703


6191
    assert(job->catalog != NULL || job->postponed);
704
705
    // catalog was pushed on ctx.callback_stack before, it might need to be re-
706
    // opened. If CatalogTraversal<> is configured with no_close, it was not
707
    // closed before, hence does not need a re-open.
708




6191
    if (job->postponed && !no_close_ && !ReopenCatalog(job)) {
709
      return false;
710
    }
711
712
    // hand the catalog out to the user code (see Observable<>)
713

6191
    assert(job->catalog != NULL);
714
6191
    this->NotifyListeners(job->GetCallbackData());
715
716
    // skip the catalog closing procedure if asked for
717
    // Note: In this case it is the user's responsibility to both delete the
718
    //       yielded catalog object and the underlying database temp file
719

6191
    if (no_close_) {
720
280
      return true;
721
    }
722
723
    // we can close the catalog here and delete the temporary file
724
5911
    const bool unlink_db = true;
725
5911
    return CloseCatalog(unlink_db, job);
726
  }
727
728
729
  /**
730
   * Pushes a catalog to the callback_stack for later yielding
731
   * Note: this is only used for the Depth First Traversal strategy!
732
   */
733
555
  void PostponeYield(CatalogJob *job, TraversalContext *ctx) {
734

555
    assert(job->referenced_catalogs > 0);
735
736
555
    job->postponed = true;
737

555
    if (!no_close_) {
738
555
      const bool unlink_db = false;  // will reopened just before yielding
739
555
      CloseCatalog(unlink_db, job);
740
    }
741
555
    ctx->callback_stack.push(*job);
742
555
  }
743
744
  /**
745
   * Determines if there are postponed yields that can be set free based on
746
   * the catalog currently being yielded
747
   *
748
   * Note: the CatalogJob being handed into this method does not necessarily
749
   *       have an open Catalog attached to it.
750
   *
751
   * @param ctx   the TraversalContext
752
   * @param job   the catalog job that was just yielded
753
   * @return      true on successful execution
754
   */
755
1220
  bool HandlePostponedYields(const CatalogJob &job, TraversalContext *ctx) {
756

1220
    if (ctx->traversal_type == kBreadthFirstTraversal) {
757
228
      return true;
758
    }
759
760

992
    assert(ctx->traversal_type == kDepthFirstTraversal);
761

992
    assert(job.referenced_catalogs == 0);
762
763
    // walk through the callback_stack and yield all catalogs that have no un-
764
    // yielded referenced_catalogs anymore. Every time a CatalogJob in the
765
    // callback_stack gets yielded it decrements the referenced_catalogs of the
766
    // next top of the stack (it's parent CatalogJob waiting for yielding)
767
992
    CatalogJobStack &clbs = ctx->callback_stack;
768

2539
    while (!clbs.empty()) {
769
1476
      CatalogJob &postponed_job = clbs.top();
770

1476
      if (--postponed_job.referenced_catalogs > 0) {
771
921
        break;
772
      }
773
774

555
      if (!Yield(&postponed_job)) {
775
        return false;
776
      }
777
555
      clbs.pop();
778
    }
779
780
992
    return true;
781
  }
782
783
6512
  void Push(const CatalogJob &job, TraversalContext *ctx) {
784
6512
    ctx->catalog_stack.push(job);
785
6512
  }
786
787
6504
  CatalogJob Pop(TraversalContext *ctx) {
788
6504
    CatalogJob job = ctx->catalog_stack.top();
789
6504
    ctx->catalog_stack.pop();
790
6504
    return job;
791
  }
792
793
6191
  void MarkAsVisited(const CatalogJob &job) {
794

6191
    if (no_repeat_history_) {
795
1879
      visited_catalogs_.insert(job.hash);
796
    }
797
6191
  }
798
799
  /**
800
   * Checks the traversal history if the given catalog was traversed or at least
801
   * seen before. If 'no_repeat_history' is not set this is always 'false'.
802
   *
803
   * @param job   the job to be checked against the traversal history
804
   * @return      true if the specified catalog was hit before
805
   */
806
6504
  bool ShouldBeSkipped(const CatalogJob &job) {
807


6504
    return no_repeat_history_ && (visited_catalogs_.count(job.hash) > 0);
808
  }
809
810
97
  shash::Any GetRepositoryRootCatalogHash() {
811
    // get the manifest of the repository to learn about the entry point or the
812
    // root catalog of the repository to be traversed
813
97
    UniquePtr<manifest::Manifest> manifest;
814
    const typename ObjectFetcherT::Failures retval =
815
97
                                      object_fetcher_->FetchManifest(&manifest);
816

97
    if (retval != ObjectFetcherT::kFailOk) {
817
      LogCvmfs(kLogCatalogTraversal, kLogStderr, "failed to load manifest "
818
                                                 "(%d - %s)",
819
                                                 retval, Code2Ascii(retval));
820
      return shash::Any();
821
    }
822
823

97
    assert(manifest.IsValid());
824
97
    return manifest->catalog_hash();
825
  }
826
827
 private:
828
  ObjectFetcherT         *object_fetcher_;
829
  CatalogTraversalInfoShim<CatalogTN> catalog_info_default_shim_;
830
  CatalogTraversalInfoShim<CatalogTN> *catalog_info_shim_;
831
  const bool              no_close_;
832
  const bool              ignore_load_failure_;
833
  const bool              no_repeat_history_;
834
  const unsigned int      default_history_depth_;
835
  const time_t            default_timestamp_threshold_;
836
  HashSet                 visited_catalogs_;
837
  LogFacilities           error_sink_;
838
};
839
840
template <class ObjectFetcherT>
841
const unsigned int
842
30
  CatalogTraversal<ObjectFetcherT>::Parameters::kFullHistory =
843
    std::numeric_limits<unsigned int>::max();
844
845
template <class ObjectFetcherT>
846
const unsigned int
847
  CatalogTraversal<ObjectFetcherT>::Parameters::kNoHistory = 0;
848
849
template <class ObjectFetcherT>
850
const time_t
851
  CatalogTraversal<ObjectFetcherT>::Parameters::kNoTimestampThreshold = 0;
852
853
}  // namespace swissknife
854
855
#endif  // CVMFS_CATALOG_TRAVERSAL_H_