CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
catalog_traversal.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_CATALOG_TRAVERSAL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_H_
7 
8 #include <cassert>
9 #include <limits>
10 #include <set>
11 #include <stack>
12 #include <string>
13 #include <vector>
14 
15 #include "catalog.h"
17 #include "crypto/signature.h"
18 #include "history_sqlite.h"
19 #include "manifest.h"
20 #include "object_fetcher.h"
21 #include "util/concurrency.h"
22 #include "util/logging.h"
23 
24 namespace catalog {
25 class Catalog;
26 class WritableCatalog;
27 }
28 
29 namespace swissknife {
30 
42 template <class CatalogT>
44  CatalogTraversalData(const CatalogT *catalog,
45  const shash::Any &catalog_hash,
46  const unsigned tree_level,
47  const size_t file_size,
48  const uint64_t history_depth)
49  : catalog(catalog)
50  , catalog_hash(catalog_hash)
51  , tree_level(tree_level)
52  , file_size(file_size)
53  , history_depth(history_depth) {}
54 
55  const CatalogT *catalog;
57  const unsigned int tree_level;
58  const size_t file_size;
59  const uint64_t history_depth;
60 };
61 
62 
70 template <class CatalogT>
72  public:
74 
75  // Default implementation: use the catalog's timestamp
76  virtual uint64_t GetLastModified(const CatalogT *catalog) {
77  return catalog->GetLastModified();
78  }
79 };
80 
81 
87 template <class ObjectFetcherT>
89  : public Observable<CatalogTraversalData<typename ObjectFetcherT::CatalogTN> >
90 {
91  public:
92  typedef ObjectFetcherT ObjectFetcherTN;
93  typedef typename ObjectFetcherT::CatalogTN CatalogTN;
94  typedef typename ObjectFetcherT::HistoryTN HistoryTN;
96 
133  struct Parameters {
135  : object_fetcher(NULL)
138  , no_repeat_history(false)
139  , no_close(false)
140  , ignore_load_failure(false)
141  , quiet(false)
142  , num_threads(8)
143  , serialize_callbacks(true) {}
144 
145  static const uint64_t kFullHistory;
146  static const uint64_t kNoHistory;
147  static const time_t kNoTimestampThreshold;
148 
149  ObjectFetcherT *object_fetcher;
150 
151  uint64_t history;
152  time_t timestamp;
154  bool no_close;
156  bool quiet;
157  // These parameters only apply for CatalogTraversalParallel
158  unsigned int num_threads;
160  };
161 
165  };
166 
167  explicit CatalogTraversalBase(const Parameters &params)
168  : object_fetcher_(params.object_fetcher)
171  , default_timestamp_threshold_(params.timestamp)
172  , no_close_(params.no_close)
173  , ignore_load_failure_(params.ignore_load_failure)
174  , no_repeat_history_(params.no_repeat_history)
175  , error_sink_((params.quiet) ? kLogDebug : kLogStderr)
176  {
177  assert(object_fetcher_ != NULL);
178  }
179 
190  virtual bool Traverse(const TraversalType type = kBreadthFirst) = 0;
191 
199  virtual bool Traverse(const shash::Any &root_catalog_hash,
200  const TraversalType type = kBreadthFirst) = 0;
201 
211  virtual bool TraverseList(const std::vector<shash::Any> &catalog_list,
212  const TraversalType type = kBreadthFirst) = 0;
213 
223  virtual bool TraverseRevision(const shash::Any &root_catalog_hash,
224  const TraversalType type = kBreadthFirst) = 0;
225 
235  {
236  typedef std::vector<shash::Any> HashList;
237 
238  UniquePtr<HistoryTN> tag_db;
239  const typename ObjectFetcherT::Failures retval =
240  object_fetcher_->FetchHistory(&tag_db);
241  switch (retval) {
243  break;
244 
247  "didn't find a history database to traverse");
248  return true;
249 
250  default:
252  "failed to download history database (%d - %s)",
253  retval, Code2Ascii(retval));
254  return false;
255  }
256 
257  HashList root_hashes;
258  bool success = tag_db->GetHashes(&root_hashes);
259  assert(success);
260  return TraverseList(root_hashes, type);
261  }
262 
264  catalog_info_shim_ = shim;
265  }
266 
267  protected:
268  typedef std::set<shash::Any> HashSet;
269 
274  struct CatalogJob {
275  CatalogJob(const std::string &path,
276  const shash::Any &hash,
277  const unsigned tree_level,
278  const uint64_t history_depth,
279  CatalogTN *parent = NULL) :
280  path(path),
281  hash(hash),
282  tree_level(tree_level),
283  history_depth(history_depth),
284  parent(parent),
286  ignore(false),
287  catalog(NULL),
289  postponed(false) {}
290 
291  bool IsRootCatalog() const { return tree_level == 0; }
292 
296  }
297 
298  // initial state description
299  const std::string path;
301  const unsigned tree_level;
302  const uint64_t history_depth;
304 
305  // dynamic processing state (used internally)
306  std::string catalog_file_path;
308  bool ignore;
311  bool postponed;
312  };
313 
315  const typename ObjectFetcherT::Failures retval =
316  object_fetcher_->FetchCatalog(job->hash,
317  job->path,
318  &job->catalog,
319  !job->IsRootCatalog(),
320  job->parent);
321  switch (retval) {
323  break;
324 
326  if (ignore_load_failure_) {
327  LogCvmfs(kLogCatalogTraversal, kLogDebug, "ignoring missing catalog "
328  "%s (swept before?)",
329  job->hash.ToString().c_str());
330  job->ignore = true;
331  return true;
332  }
333 
334  default:
335  LogCvmfs(kLogCatalogTraversal, error_sink_, "failed to load catalog %s "
336  "(%d - %s)",
337  job->hash.ToStringWithSuffix().c_str(),
338  retval, Code2Ascii(retval));
339  return false;
340  }
341 
342  // catalogs returned by ObjectFetcher<> are managing their database files by
343  // default... we need to manage this file manually here
344  job->catalog->DropDatabaseFileOwnership();
345 
346  job->catalog_file_path = job->catalog->database_path();
347  job->catalog_file_size = GetFileSize(job->catalog->database_path());
348 
349  return true;
350  }
351 
352 
354  assert(!job->ignore);
355  assert(job->catalog == NULL);
356 
357  job->catalog = CatalogTN::AttachFreely(job->path,
358  job->catalog_file_path,
359  job->hash,
360  job->parent,
361  !job->IsRootCatalog());
362 
363  if (job->catalog == NULL) {
365  "failed to re-open catalog %s", job->hash.ToString().c_str());
366  return false;
367  }
368 
369  return true;
370  }
371 
372 
373  bool CloseCatalog(const bool unlink_db, CatalogJob *job) {
374  delete job->catalog;
375  job->catalog = NULL;
376  if (!job->catalog_file_path.empty() && unlink_db) {
377  const int retval = unlink(job->catalog_file_path.c_str());
378  if (retval != 0) {
379  LogCvmfs(kLogCatalogTraversal, error_sink_, "Failed to unlink %s - %d",
380  job->catalog_file_path.c_str(), errno);
381  return false;
382  }
383  }
384 
385  return true;
386  }
387 
389  // get the manifest of the repository to learn about the entry point or the
390  // root catalog of the repository to be traversed
392  const typename ObjectFetcherT::Failures retval =
393  object_fetcher_->FetchManifest(&manifest);
394  if (retval != ObjectFetcherT::kFailOk) {
395  LogCvmfs(kLogCatalogTraversal, kLogStderr, "failed to load manifest "
396  "(%d - %s)",
397  retval, Code2Ascii(retval));
398  return shash::Any();
399  }
400 
401  assert(manifest.IsValid());
402  return manifest->catalog_hash();
403  }
404 
415  const CatalogJob &job,
416  const uint64_t history_depth,
417  const time_t timestamp_threshold
418  ) {
419  assert(job.IsRootCatalog());
420  assert(job.catalog != NULL);
421 
422  const bool h = job.history_depth >= history_depth;
423  assert(timestamp_threshold >= 0);
424  const bool t =
426  unsigned(timestamp_threshold);
427 
428  return t || h;
429  }
430 
431  ObjectFetcherT *object_fetcher_;
434  const uint64_t default_history_depth_;
436  const bool no_close_;
438  const bool no_repeat_history_;
440 };
441 
494 template<class ObjectFetcherT>
496  : public CatalogTraversalBase<ObjectFetcherT>
497 {
498  public:
500  typedef ObjectFetcherT ObjectFetcherTN;
501  typedef typename ObjectFetcherT::CatalogTN CatalogTN;
502  typedef typename ObjectFetcherT::HistoryTN HistoryTN;
504  typedef typename Base::Parameters Parameters;
506  typedef typename Base::CatalogJob CatalogJob;
507 
508  protected:
509  typedef std::set<shash::Any> HashSet;
510  typedef std::stack<CatalogJob> CatalogJobStack;
511 
524  const time_t timestamp_threshold,
526  history_depth(history_depth),
527  timestamp_threshold(timestamp_threshold),
528  traversal_type(traversal_type) {}
529 
530  const uint64_t history_depth;
531  const time_t timestamp_threshold;
535  };
536 
537  public:
542  explicit CatalogTraversal(const Parameters &params)
543  : CatalogTraversalBase<ObjectFetcherT>(params)
544  { }
545 
547  const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash();
548  if (root_catalog_hash.IsNull()) {
549  return false;
550  }
551  return Traverse(root_catalog_hash, type);
552  }
553 
554  bool Traverse(const shash::Any &root_catalog_hash,
556  // add the root catalog of the repository as the first element on the job
557  // stack
560  type);
561  Push(root_catalog_hash, &ctx);
562  return DoTraverse(&ctx);
563  }
564 
565  bool TraverseList(const std::vector<shash::Any> &catalog_list,
567  std::vector<shash::Any>::const_iterator i = catalog_list.begin();
568  const std::vector<shash::Any>::const_iterator iend = catalog_list.end();
569  bool success = true;
570  for (; i != iend; ++i) {
571  success = success && TraverseRevision(*i, type);
572  }
573  return success;
574  }
575 
577  const shash::Any &root_catalog_hash,
579  {
580  // add the given root catalog as the first element on the job stack
583  type);
584  Push(root_catalog_hash, &ctx);
585  return DoTraverse(&ctx);
586  }
587 
588  protected:
626  assert(ctx->callback_stack.empty());
627 
628  while (!ctx->catalog_stack.empty()) {
629  // Get the top most catalog for the next processing step
630  CatalogJob job = Pop(ctx);
631 
632  if (ShouldBeSkipped(job)) {
633  job.ignore = true;
634  // download and open the catalog for processing
635  } else if (!this->PrepareCatalog(&job)) {
636  return false;
637  }
638 
639  // ignored catalogs don't need to be processed anymore but they might
640  // release postponed yields
641  if (job.ignore) {
642  if (!HandlePostponedYields(job, ctx)) {
643  return false;
644  }
645  continue;
646  }
647 
648  // push catalogs referenced by the current catalog (onto stack)
649  this->MarkAsVisited(job);
650  PushReferencedCatalogs(&job, ctx);
651 
652  // notify listeners
653  if (!YieldToListeners(&job, ctx)) {
654  return false;
655  }
656  }
657 
658  // invariant: after the traversal finished, there should be no more catalogs
659  // to traverse or to yield!
660  assert(ctx->catalog_stack.empty());
661  assert(ctx->callback_stack.empty());
662  return true;
663  }
664 
666  assert(!job->ignore);
667  assert(job->catalog != NULL);
670 
671  // this differs, depending on the traversal strategy.
672  //
673  // Breadths First Traversal
674  // Catalogs are traversed from top (root catalog) to bottom (leaf
675  // catalogs) and from more recent (HEAD revision) to older (historic
676  // revisions)
677  //
678  // Depth First Traversal
679  // Catalogs are traversed from oldest revision (depends on the configured
680  // maximal history depth) to the HEAD revision and from bottom (leafs) to
681  // top (root catalogs)
682  job->referenced_catalogs =
684  ? PushPreviousRevision(*job, ctx) + PushNestedCatalogs(*job, ctx)
685  : PushNestedCatalogs(*job, ctx) + PushPreviousRevision(*job, ctx);
686  }
687 
692  unsigned int PushPreviousRevision(
693  const CatalogJob &job,
695  ) {
696  // only root catalogs are used for entering a previous revision (graph)
697  if (!job.catalog->IsRoot()) {
698  return 0;
699  }
700 
701  const shash::Any previous_revision = job.catalog->GetPreviousRevision();
702  if (previous_revision.IsNull()) {
703  return 0;
704  }
705 
706  // check if the next deeper history level is actually requested
707  // Note: if the current catalog is below the timestamp threshold it will be
708  // traversed and only its ancestor revision will not be pushed anymore
709  if (this->IsBelowPruningThresholds(job, ctx->history_depth,
710  ctx->timestamp_threshold)) {
711  return 0;
712  }
713 
714  Push(CatalogJob("", previous_revision, 0, job.history_depth + 1, NULL),
715  ctx);
716  return 1;
717  }
718 
723  unsigned int PushNestedCatalogs(
724  const CatalogJob &job,
726  ) {
727  typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
728  const NestedCatalogList nested = job.catalog->ListOwnNestedCatalogs();
729  typename NestedCatalogList::const_iterator i = nested.begin();
730  typename NestedCatalogList::const_iterator iend = nested.end();
731  for (; i != iend; ++i) {
732  CatalogTN* parent = (this->no_close_) ? job.catalog : NULL;
733  const CatalogJob new_job(i->mountpoint.ToString(),
734  i->hash,
735  job.tree_level + 1,
736  job.history_depth,
737  parent);
738  Push(new_job, ctx);
739  }
740 
741  return nested.size();
742  }
743 
744  void Push(const shash::Any &root_catalog_hash, TraversalContext *ctx) {
745  Push(CatalogJob("", root_catalog_hash, 0, 0), ctx);
746  }
747 
749  assert(!job->ignore);
750  assert(job->catalog != NULL);
753 
754  // in breadth first search mode, every catalog is simply handed out once
755  // it is visited. No extra magic required...
756  if (ctx->traversal_type == Base::kBreadthFirst) {
757  return Yield(job);
758  }
759 
760  // in depth first search mode, catalogs might need to wait until all of
761  // their referenced catalogs are yielded (ctx.callback_stack)...
763  if (job->referenced_catalogs > 0) {
764  PostponeYield(job, ctx);
765  return true;
766  }
767 
768  // this catalog can be yielded
769  return Yield(job) && HandlePostponedYields(*job, ctx);
770  }
771 
779  bool ShouldBeSkipped(const CatalogJob &job) {
780  return this->no_repeat_history_ && (visited_catalogs_.count(job.hash) > 0);
781  }
782 
783  void MarkAsVisited(const CatalogJob &job) {
784  if (this->no_repeat_history_) {
785  visited_catalogs_.insert(job.hash);
786  }
787  }
788 
789  private:
795  bool Yield(CatalogJob *job) {
796  assert(!job->ignore);
797  assert(job->catalog != NULL || job->postponed);
798 
799  // catalog was pushed on ctx.callback_stack before, it might need to be re-
800  // opened. If CatalogTraversal<> is configured with no_close, it was not
801  // closed before, hence does not need a re-open.
802  if (job->postponed && !this->no_close_ && !this->ReopenCatalog(job)) {
803  return false;
804  }
805 
806  // hand the catalog out to the user code (see Observable<>)
807  assert(job->catalog != NULL);
808  this->NotifyListeners(job->GetCallbackData());
809 
810  // skip the catalog closing procedure if asked for
811  // Note: In this case it is the user's responsibility to both delete the
812  // yielded catalog object and the underlying database temp file
813  if (this->no_close_) {
814  return true;
815  }
816 
817  // we can close the catalog here and delete the temporary file
818  const bool unlink_db = true;
819  return this->CloseCatalog(unlink_db, job);
820  }
821 
822 
828  assert(job->referenced_catalogs > 0);
829 
830  job->postponed = true;
831  if (!this->no_close_) {
832  const bool unlink_db = false; // will reopened just before yielding
833  this->CloseCatalog(unlink_db, job);
834  }
835  ctx->callback_stack.push(*job);
836  }
837 
850  if (ctx->traversal_type == Base::kBreadthFirst) {
851  return true;
852  }
853 
855  assert(job.referenced_catalogs == 0);
856 
857  // walk through the callback_stack and yield all catalogs that have no un-
858  // yielded referenced_catalogs anymore. Every time a CatalogJob in the
859  // callback_stack gets yielded it decrements the referenced_catalogs of the
860  // next top of the stack (it's parent CatalogJob waiting for yielding)
861  CatalogJobStack &clbs = ctx->callback_stack;
862  while (!clbs.empty()) {
863  CatalogJob &postponed_job = clbs.top();
864  if (--postponed_job.referenced_catalogs > 0) {
865  break;
866  }
867 
868  if (!Yield(&postponed_job)) {
869  return false;
870  }
871  clbs.pop();
872  }
873 
874  return true;
875  }
876 
877  void Push(const CatalogJob &job, TraversalContext *ctx) {
878  ctx->catalog_stack.push(job);
879  }
880 
882  CatalogJob job = ctx->catalog_stack.top();
883  ctx->catalog_stack.pop();
884  return job;
885  }
886 
887  protected:
889 };
890 
891 template <class ObjectFetcherT>
893  std::numeric_limits<uint64_t>::max();
894 
895 template <class ObjectFetcherT>
897 
898 template <class ObjectFetcherT>
899 const time_t
901 
902 } // namespace swissknife
903 
904 #endif // CVMFS_CATALOG_TRAVERSAL_H_
CatalogJob(const std::string &path, const shash::Any &hash, const unsigned tree_level, const uint64_t history_depth, CatalogTN *parent=NULL)
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
std::set< shash::Any > HashSet
bool IsNull() const
Definition: hash.h:383
bool Yield(CatalogJob *job)
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct cvmcache_context * ctx
bool HandlePostponedYields(const CatalogJob &job, TraversalContext *ctx)
bool YieldToListeners(CatalogJob *job, TraversalContext *ctx)
virtual uint64_t GetLastModified(const CatalogT *catalog)
bool IsBelowPruningThresholds(const CatalogJob &job, const uint64_t history_depth, const time_t timestamp_threshold)
Failures
Definition: letter.h:18
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
const history::History * history() const
std::string ToStringWithSuffix() const
Definition: hash.h:304
void NotifyListeners(const CatalogTraversalData< ObjectFetcherT::CatalogTN > &parameter)
void PushReferencedCatalogs(CatalogJob *job, TraversalContext *ctx)
void PostponeYield(CatalogJob *job, TraversalContext *ctx)
assert((mem||(size==0))&&"Out Of Memory")
CatalogTraversalBase(const Parameters &params)
bool DoTraverse(TraversalContext *ctx)
bool PrepareCatalog(CatalogJob *job)
CatalogTraversalData< CatalogTN > CallbackDataTN
CatalogTraversalData(const CatalogT *catalog, const shash::Any &catalog_hash, const unsigned tree_level, const size_t file_size, const uint64_t history_depth)
CatalogJob Pop(TraversalContext *ctx)
CatalogTraversalInfoShim< CatalogTN > * catalog_info_shim_
bool TraverseList(const std::vector< shash::Any > &catalog_list, const TraversalType type=Base::kBreadthFirst)
CatalogTraversalInfoShim< CatalogTN > catalog_info_default_shim_
virtual bool TraverseNamedSnapshots(const TraversalType type=kBreadthFirst)
virtual bool TraverseRevision(const shash::Any &root_catalog_hash, const TraversalType type=kBreadthFirst)=0
bool ShouldBeSkipped(const CatalogJob &job)
ObjectFetcherT::HistoryTN HistoryTN
TraversalContext(const uint64_t history_depth, const time_t timestamp_threshold, const TraversalType traversal_type)
CatalogTraversalBase< ObjectFetcherT > Base
Base::TraversalType TraversalType
ObjectFetcherT::CatalogTN CatalogTN
ObjectFetcherT::CatalogTN CatalogTN
std::stack< CatalogJob > CatalogJobStack
bool IsValid() const
Definition: pointer.h:43
bool Traverse(const TraversalType type=Base::kBreadthFirst)
void MarkAsVisited(const CatalogJob &job)
CatalogTraversalData< CatalogTN > CallbackDataTN
virtual bool TraverseList(const std::vector< shash::Any > &catalog_list, const TraversalType type=kBreadthFirst)=0
bool TraverseRevision(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
void Push(const shash::Any &root_catalog_hash, TraversalContext *ctx)
LogFacilities
void SetCatalogInfoShim(CatalogTraversalInfoShim< CatalogTN > *shim)
bool Traverse(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:812
unsigned int PushPreviousRevision(const CatalogJob &job, TraversalContext *ctx)
virtual bool Traverse(const TraversalType type=kBreadthFirst)=0
void Push(const CatalogJob &job, TraversalContext *ctx)
bool CloseCatalog(const bool unlink_db, CatalogJob *job)
CatalogTraversal(const Parameters &params)
ObjectFetcherT::HistoryTN HistoryTN
bool ReopenCatalog(CatalogJob *job)
unsigned int PushNestedCatalogs(const CatalogJob &job, TraversalContext *ctx)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528