CernVM-FS  2.13.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
catalog_traversal.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_CATALOG_TRAVERSAL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_H_
7 
8 #include <cassert>
9 #include <limits>
10 #include <set>
11 #include <stack>
12 #include <string>
13 #include <vector>
14 
15 #include "catalog.h"
17 #include "crypto/signature.h"
18 #include "history_sqlite.h"
19 #include "manifest.h"
20 #include "object_fetcher.h"
21 #include "util/concurrency.h"
22 #include "util/logging.h"
23 
24 namespace catalog {
25 class Catalog;
26 class WritableCatalog;
27 } // namespace catalog
28 
29 namespace swissknife {
30 
42 template<class CatalogT>
44  CatalogTraversalData(const CatalogT *catalog,
45  const shash::Any &catalog_hash,
46  const unsigned tree_level,
47  const size_t file_size,
48  const uint64_t history_depth)
49  : catalog(catalog)
50  , catalog_hash(catalog_hash)
51  , tree_level(tree_level)
52  , file_size(file_size)
53  , history_depth(history_depth) { }
54 
55  const CatalogT *catalog;
57  const unsigned int tree_level;
58  const size_t file_size;
59  const uint64_t history_depth;
60 };
61 
62 
70 template<class CatalogT>
72  public:
74 
75  // Default implementation: use the catalog's timestamp
76  virtual uint64_t GetLastModified(const CatalogT *catalog) {
77  return catalog->GetLastModified();
78  }
79 };
80 
81 
87 template<class ObjectFetcherT>
89  : public Observable<
90  CatalogTraversalData<typename ObjectFetcherT::CatalogTN> > {
91  public:
92  typedef ObjectFetcherT ObjectFetcherTN;
93  typedef typename ObjectFetcherT::CatalogTN CatalogTN;
94  typedef typename ObjectFetcherT::HistoryTN HistoryTN;
96 
135  struct Parameters {
137  : object_fetcher(NULL)
140  , no_repeat_history(false)
141  , no_close(false)
142  , ignore_load_failure(false)
143  , quiet(false)
144  , num_threads(8)
145  , serialize_callbacks(true) { }
146 
147  static const uint64_t kFullHistory;
148  static const uint64_t kNoHistory;
149  static const time_t kNoTimestampThreshold;
150 
151  ObjectFetcherT *object_fetcher;
152 
153  uint64_t history;
154  time_t timestamp;
156  bool no_close;
158  bool quiet;
159  // These parameters only apply for CatalogTraversalParallel
160  unsigned int num_threads;
162  };
163 
167  };
168 
169  explicit CatalogTraversalBase(const Parameters &params)
170  : object_fetcher_(params.object_fetcher)
173  , default_timestamp_threshold_(params.timestamp)
174  , no_close_(params.no_close)
175  , ignore_load_failure_(params.ignore_load_failure)
176  , no_repeat_history_(params.no_repeat_history)
177  , error_sink_((params.quiet) ? kLogDebug : kLogStderr) {
178  assert(object_fetcher_ != NULL);
179  }
180 
191  virtual bool Traverse(const TraversalType type = kBreadthFirst) = 0;
192 
200  virtual bool Traverse(const shash::Any &root_catalog_hash,
201  const TraversalType type = kBreadthFirst) = 0;
202 
212  virtual bool TraverseList(const std::vector<shash::Any> &catalog_list,
213  const TraversalType type = kBreadthFirst) = 0;
214 
224  virtual bool TraverseRevision(const shash::Any &root_catalog_hash,
225  const TraversalType type = kBreadthFirst) = 0;
226 
235  const TraversalType type = kBreadthFirst) {
236  typedef std::vector<shash::Any> HashList;
237 
238  UniquePtr<HistoryTN> tag_db;
239  const typename ObjectFetcherT::Failures
240  retval = object_fetcher_->FetchHistory(&tag_db);
241  switch (retval) {
243  break;
244 
247  "didn't find a history database to traverse");
248  return true;
249 
250  default:
252  "failed to download history database (%d - %s)", retval,
253  Code2Ascii(retval));
254  return false;
255  }
256 
257  HashList root_hashes;
258  bool success = tag_db->GetHashes(&root_hashes);
259  assert(success);
260  return TraverseList(root_hashes, type);
261  }
262 
264  catalog_info_shim_ = shim;
265  }
266 
267  protected:
268  typedef std::set<shash::Any> HashSet;
269 
274  struct CatalogJob {
275  CatalogJob(const std::string &path,
276  const shash::Any &hash,
277  const unsigned tree_level,
278  const uint64_t history_depth,
279  CatalogTN *parent = NULL)
280  : path(path)
281  , hash(hash)
282  , tree_level(tree_level)
283  , history_depth(history_depth)
284  , parent(parent)
285  , catalog_file_size(0)
286  , ignore(false)
287  , catalog(NULL)
289  , postponed(false) { }
290 
291  bool IsRootCatalog() const { return tree_level == 0; }
292 
295  history_depth);
296  }
297 
298  // initial state description
299  const std::string path;
301  const unsigned tree_level;
302  const uint64_t history_depth;
304 
305  // dynamic processing state (used internally)
306  std::string catalog_file_path;
308  bool ignore;
311  bool postponed;
312  };
313 
315  const typename ObjectFetcherT::Failures
316  retval = object_fetcher_->FetchCatalog(job->hash,
317  job->path,
318  &job->catalog,
319  !job->IsRootCatalog(),
320  job->parent);
321  switch (retval) {
323  break;
324 
326  if (ignore_load_failure_) {
328  "ignoring missing catalog "
329  "%s (swept before?)",
330  job->hash.ToString().c_str());
331  job->ignore = true;
332  return true;
333  }
334 
335  default:
337  "failed to load catalog %s "
338  "(%d - %s)",
339  job->hash.ToStringWithSuffix().c_str(), retval,
340  Code2Ascii(retval));
341  return false;
342  }
343 
344  // catalogs returned by ObjectFetcher<> are managing their database files by
345  // default... we need to manage this file manually here
346  job->catalog->DropDatabaseFileOwnership();
347 
348  job->catalog_file_path = job->catalog->database_path();
349  job->catalog_file_size = GetFileSize(job->catalog->database_path());
350 
351  return true;
352  }
353 
354 
356  assert(!job->ignore);
357  assert(job->catalog == NULL);
358 
359  job->catalog = CatalogTN::AttachFreely(job->path,
360  job->catalog_file_path,
361  job->hash,
362  job->parent,
363  !job->IsRootCatalog());
364 
365  if (job->catalog == NULL) {
367  "failed to re-open catalog %s", job->hash.ToString().c_str());
368  return false;
369  }
370 
371  return true;
372  }
373 
374 
375  bool CloseCatalog(const bool unlink_db, CatalogJob *job) {
376  delete job->catalog;
377  job->catalog = NULL;
378  if (!job->catalog_file_path.empty() && unlink_db) {
379  const int retval = unlink(job->catalog_file_path.c_str());
380  if (retval != 0) {
381  LogCvmfs(kLogCatalogTraversal, error_sink_, "Failed to unlink %s - %d",
382  job->catalog_file_path.c_str(), errno);
383  return false;
384  }
385  }
386 
387  return true;
388  }
389 
391  // get the manifest of the repository to learn about the entry point or the
392  // root catalog of the repository to be traversed
394  const typename ObjectFetcherT::Failures
395  retval = object_fetcher_->FetchManifest(&manifest);
396  if (retval != ObjectFetcherT::kFailOk) {
398  "failed to load manifest "
399  "(%d - %s)",
400  retval, Code2Ascii(retval));
401  return shash::Any();
402  }
403 
404  assert(manifest.IsValid());
405  return manifest->catalog_hash();
406  }
407 
418  const uint64_t history_depth,
419  const time_t timestamp_threshold) {
420  assert(job.IsRootCatalog());
421  assert(job.catalog != NULL);
422 
423  const bool h = job.history_depth >= history_depth;
424  assert(timestamp_threshold >= 0);
425  const bool t = catalog_info_shim_->GetLastModified(job.catalog)
426  < unsigned(timestamp_threshold);
427 
428  return t || h;
429  }
430 
431  ObjectFetcherT *object_fetcher_;
434  const uint64_t default_history_depth_;
436  const bool no_close_;
438  const bool no_repeat_history_;
440 };
441 
494 template<class ObjectFetcherT>
495 class CatalogTraversal : public CatalogTraversalBase<ObjectFetcherT> {
496  public:
498  typedef ObjectFetcherT ObjectFetcherTN;
499  typedef typename ObjectFetcherT::CatalogTN CatalogTN;
500  typedef typename ObjectFetcherT::HistoryTN HistoryTN;
502  typedef typename Base::Parameters Parameters;
504  typedef typename Base::CatalogJob CatalogJob;
505 
506  protected:
507  typedef std::set<shash::Any> HashSet;
508  typedef std::stack<CatalogJob> CatalogJobStack;
509 
522  const time_t timestamp_threshold,
524  : history_depth(history_depth)
525  , timestamp_threshold(timestamp_threshold)
526  , traversal_type(traversal_type) { }
527 
528  const uint64_t history_depth;
529  const time_t timestamp_threshold;
533  };
534 
535  public:
540  explicit CatalogTraversal(const Parameters &params)
541  : CatalogTraversalBase<ObjectFetcherT>(params) { }
542 
544  const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash();
545  if (root_catalog_hash.IsNull()) {
546  return false;
547  }
548  return Traverse(root_catalog_hash, type);
549  }
550 
551  bool Traverse(const shash::Any &root_catalog_hash,
553  // add the root catalog of the repository as the first element on the job
554  // stack
557  Push(root_catalog_hash, &ctx);
558  return DoTraverse(&ctx);
559  }
560 
561  bool TraverseList(const std::vector<shash::Any> &catalog_list,
563  std::vector<shash::Any>::const_iterator i = catalog_list.begin();
564  const std::vector<shash::Any>::const_iterator iend = catalog_list.end();
565  bool success = true;
566  for (; i != iend; ++i) {
567  success = success && TraverseRevision(*i, type);
568  }
569  return success;
570  }
571 
572  bool TraverseRevision(const shash::Any &root_catalog_hash,
574  // add the given root catalog as the first element on the job stack
577  Push(root_catalog_hash, &ctx);
578  return DoTraverse(&ctx);
579  }
580 
581  protected:
619  assert(ctx->callback_stack.empty());
620 
621  while (!ctx->catalog_stack.empty()) {
622  // Get the top most catalog for the next processing step
623  CatalogJob job = Pop(ctx);
624 
625  if (ShouldBeSkipped(job)) {
626  job.ignore = true;
627  // download and open the catalog for processing
628  } else if (!this->PrepareCatalog(&job)) {
629  return false;
630  }
631 
632  // ignored catalogs don't need to be processed anymore but they might
633  // release postponed yields
634  if (job.ignore) {
635  if (!HandlePostponedYields(job, ctx)) {
636  return false;
637  }
638  continue;
639  }
640 
641  // push catalogs referenced by the current catalog (onto stack)
642  this->MarkAsVisited(job);
643  PushReferencedCatalogs(&job, ctx);
644 
645  // notify listeners
646  if (!YieldToListeners(&job, ctx)) {
647  return false;
648  }
649  }
650 
651  // invariant: after the traversal finished, there should be no more catalogs
652  // to traverse or to yield!
653  assert(ctx->catalog_stack.empty());
654  assert(ctx->callback_stack.empty());
655  return true;
656  }
657 
659  assert(!job->ignore);
660  assert(job->catalog != NULL);
662  || ctx->traversal_type == Base::kDepthFirst);
663 
664  // this differs, depending on the traversal strategy.
665  //
666  // Breadths First Traversal
667  // Catalogs are traversed from top (root catalog) to bottom (leaf
668  // catalogs) and from more recent (HEAD revision) to older (historic
669  // revisions)
670  //
671  // Depth First Traversal
672  // Catalogs are traversed from oldest revision (depends on the configured
673  // maximal history depth) to the HEAD revision and from bottom (leafs) to
674  // top (root catalogs)
676  ? PushPreviousRevision(*job, ctx)
677  + PushNestedCatalogs(*job, ctx)
678  : PushNestedCatalogs(*job, ctx)
679  + PushPreviousRevision(*job, ctx);
680  }
681 
686  unsigned int PushPreviousRevision(const CatalogJob &job,
688  // only root catalogs are used for entering a previous revision (graph)
689  if (!job.catalog->IsRoot()) {
690  return 0;
691  }
692 
693  const shash::Any previous_revision = job.catalog->GetPreviousRevision();
694  if (previous_revision.IsNull()) {
695  return 0;
696  }
697 
698  // check if the next deeper history level is actually requested
699  // Note: if the current catalog is below the timestamp threshold it will be
700  // traversed and only its ancestor revision will not be pushed anymore
701  if (this->IsBelowPruningThresholds(job, ctx->history_depth,
702  ctx->timestamp_threshold)) {
703  return 0;
704  }
705 
706  Push(CatalogJob("", previous_revision, 0, job.history_depth + 1, NULL),
707  ctx);
708  return 1;
709  }
710 
715  unsigned int PushNestedCatalogs(const CatalogJob &job,
717  typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
718  const NestedCatalogList nested = job.catalog->ListOwnNestedCatalogs();
719  typename NestedCatalogList::const_iterator i = nested.begin();
720  typename NestedCatalogList::const_iterator iend = nested.end();
721  for (; i != iend; ++i) {
722  CatalogTN *parent = (this->no_close_) ? job.catalog : NULL;
723  const CatalogJob new_job(i->mountpoint.ToString(),
724  i->hash,
725  job.tree_level + 1,
726  job.history_depth,
727  parent);
728  Push(new_job, ctx);
729  }
730 
731  return nested.size();
732  }
733 
734  void Push(const shash::Any &root_catalog_hash, TraversalContext *ctx) {
735  Push(CatalogJob("", root_catalog_hash, 0, 0), ctx);
736  }
737 
739  assert(!job->ignore);
740  assert(job->catalog != NULL);
742  || ctx->traversal_type == Base::kDepthFirst);
743 
744  // in breadth first search mode, every catalog is simply handed out once
745  // it is visited. No extra magic required...
746  if (ctx->traversal_type == Base::kBreadthFirst) {
747  return Yield(job);
748  }
749 
750  // in depth first search mode, catalogs might need to wait until all of
751  // their referenced catalogs are yielded (ctx.callback_stack)...
753  if (job->referenced_catalogs > 0) {
754  PostponeYield(job, ctx);
755  return true;
756  }
757 
758  // this catalog can be yielded
759  return Yield(job) && HandlePostponedYields(*job, ctx);
760  }
761 
769  bool ShouldBeSkipped(const CatalogJob &job) {
770  return this->no_repeat_history_ && (visited_catalogs_.count(job.hash) > 0);
771  }
772 
773  void MarkAsVisited(const CatalogJob &job) {
774  if (this->no_repeat_history_) {
775  visited_catalogs_.insert(job.hash);
776  }
777  }
778 
779  private:
785  bool Yield(CatalogJob *job) {
786  assert(!job->ignore);
787  assert(job->catalog != NULL || job->postponed);
788 
789  // catalog was pushed on ctx.callback_stack before, it might need to be re-
790  // opened. If CatalogTraversal<> is configured with no_close, it was not
791  // closed before, hence does not need a re-open.
792  if (job->postponed && !this->no_close_ && !this->ReopenCatalog(job)) {
793  return false;
794  }
795 
796  // hand the catalog out to the user code (see Observable<>)
797  assert(job->catalog != NULL);
798  this->NotifyListeners(job->GetCallbackData());
799 
800  // skip the catalog closing procedure if asked for
801  // Note: In this case it is the user's responsibility to both delete the
802  // yielded catalog object and the underlying database temp file
803  if (this->no_close_) {
804  return true;
805  }
806 
807  // we can close the catalog here and delete the temporary file
808  const bool unlink_db = true;
809  return this->CloseCatalog(unlink_db, job);
810  }
811 
812 
818  assert(job->referenced_catalogs > 0);
819 
820  job->postponed = true;
821  if (!this->no_close_) {
822  const bool unlink_db = false; // will reopened just before yielding
823  this->CloseCatalog(unlink_db, job);
824  }
825  ctx->callback_stack.push(*job);
826  }
827 
840  if (ctx->traversal_type == Base::kBreadthFirst) {
841  return true;
842  }
843 
845  assert(job.referenced_catalogs == 0);
846 
847  // walk through the callback_stack and yield all catalogs that have no un-
848  // yielded referenced_catalogs anymore. Every time a CatalogJob in the
849  // callback_stack gets yielded it decrements the referenced_catalogs of the
850  // next top of the stack (it's parent CatalogJob waiting for yielding)
851  CatalogJobStack &clbs = ctx->callback_stack;
852  while (!clbs.empty()) {
853  CatalogJob &postponed_job = clbs.top();
854  if (--postponed_job.referenced_catalogs > 0) {
855  break;
856  }
857 
858  if (!Yield(&postponed_job)) {
859  return false;
860  }
861  clbs.pop();
862  }
863 
864  return true;
865  }
866 
867  void Push(const CatalogJob &job, TraversalContext *ctx) {
868  ctx->catalog_stack.push(job);
869  }
870 
872  CatalogJob job = ctx->catalog_stack.top();
873  ctx->catalog_stack.pop();
874  return job;
875  }
876 
877  protected:
879 };
880 
881 template<class ObjectFetcherT>
883  kFullHistory = std::numeric_limits<uint64_t>::max();
884 
885 template<class ObjectFetcherT>
887 
888 template<class ObjectFetcherT>
889 const time_t
891 
892 } // namespace swissknife
893 
894 #endif // CVMFS_CATALOG_TRAVERSAL_H_
CatalogJob(const std::string &path, const shash::Any &hash, const unsigned tree_level, const uint64_t history_depth, CatalogTN *parent=NULL)
const char * Code2Ascii(const ObjectFetcherFailures::Failures error)
std::set< shash::Any > HashSet
bool IsNull() const
Definition: hash.h:371
bool Yield(CatalogJob *job)
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct cvmcache_context * ctx
bool HandlePostponedYields(const CatalogJob &job, TraversalContext *ctx)
bool YieldToListeners(CatalogJob *job, TraversalContext *ctx)
virtual uint64_t GetLastModified(const CatalogT *catalog)
bool IsBelowPruningThresholds(const CatalogJob &job, const uint64_t history_depth, const time_t timestamp_threshold)
Failures
Definition: letter.h:18
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
const history::History * history() const
std::string ToStringWithSuffix() const
Definition: hash.h:296
void NotifyListeners(const CatalogTraversalData< ObjectFetcherT::CatalogTN > &parameter)
void PushReferencedCatalogs(CatalogJob *job, TraversalContext *ctx)
void PostponeYield(CatalogJob *job, TraversalContext *ctx)
assert((mem||(size==0))&&"Out Of Memory")
CatalogTraversalBase(const Parameters &params)
bool DoTraverse(TraversalContext *ctx)
bool PrepareCatalog(CatalogJob *job)
CatalogTraversalData< CatalogTN > CallbackDataTN
CatalogTraversalData(const CatalogT *catalog, const shash::Any &catalog_hash, const unsigned tree_level, const size_t file_size, const uint64_t history_depth)
CatalogJob Pop(TraversalContext *ctx)
CatalogTraversalInfoShim< CatalogTN > * catalog_info_shim_
bool TraverseList(const std::vector< shash::Any > &catalog_list, const TraversalType type=Base::kBreadthFirst)
CatalogTraversalInfoShim< CatalogTN > catalog_info_default_shim_
virtual bool TraverseNamedSnapshots(const TraversalType type=kBreadthFirst)
virtual bool TraverseRevision(const shash::Any &root_catalog_hash, const TraversalType type=kBreadthFirst)=0
bool ShouldBeSkipped(const CatalogJob &job)
ObjectFetcherT::HistoryTN HistoryTN
TraversalContext(const uint64_t history_depth, const time_t timestamp_threshold, const TraversalType traversal_type)
CatalogTraversalBase< ObjectFetcherT > Base
Base::TraversalType TraversalType
ObjectFetcherT::CatalogTN CatalogTN
ObjectFetcherT::CatalogTN CatalogTN
std::stack< CatalogJob > CatalogJobStack
bool IsValid() const
Definition: pointer.h:47
bool Traverse(const TraversalType type=Base::kBreadthFirst)
void MarkAsVisited(const CatalogJob &job)
CatalogTraversalData< CatalogTN > CallbackDataTN
virtual bool TraverseList(const std::vector< shash::Any > &catalog_list, const TraversalType type=kBreadthFirst)=0
bool TraverseRevision(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
void Push(const shash::Any &root_catalog_hash, TraversalContext *ctx)
LogFacilities
void SetCatalogInfoShim(CatalogTraversalInfoShim< CatalogTN > *shim)
bool Traverse(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:812
unsigned int PushPreviousRevision(const CatalogJob &job, TraversalContext *ctx)
virtual bool Traverse(const TraversalType type=kBreadthFirst)=0
void Push(const CatalogJob &job, TraversalContext *ctx)
bool CloseCatalog(const bool unlink_db, CatalogJob *job)
CatalogTraversal(const Parameters &params)
ObjectFetcherT::HistoryTN HistoryTN
bool ReopenCatalog(CatalogJob *job)
unsigned int PushNestedCatalogs(const CatalogJob &job, TraversalContext *ctx)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545