5 #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
17 namespace swissknife {
29 template<
class ObjectFetcherT>
34 typedef typename ObjectFetcherT::CatalogTN
CatalogTN;
35 typedef typename ObjectFetcherT::HistoryTN
HistoryTN;
89 if (root_catalog_hash.
IsNull()) {
123 HashList::const_reverse_iterator i = root_catalog_list.rbegin();
124 const HashList::const_reverse_iterator iend = root_catalog_list.rend();
125 bool has_pushed =
false;
128 for (; i != iend; ++i) {
174 return (uint32_t) *(
reinterpret_cast<const uint32_t *
>(key.
digest) + 1);
209 if (current_job != NULL) {
214 if (current_job->hash.IsNull()) {
254 unsigned int num_children;
284 typename NestedCatalogList::const_iterator i = catalog_list.begin();
285 typename NestedCatalogList::const_iterator iend = catalog_list.end();
286 unsigned int num_children = 0;
287 for (; i != iend; ++i) {
296 child =
new CatalogJob(i->mountpoint.ToString(),
324 if (previous_revision.
IsNull()) {
424 #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
CatalogTraversalBase< ObjectFetcherT > Base
Tube< CatalogJob > post_job_queue_
Tube< CatalogJob > pre_job_queue_
SmallHashDynamic< shash::Any, CatalogJob * > catalogs_processing_
bool IsBelowPruningThresholds(const CatalogJob &job, const uint64_t history_depth, const time_t timestamp_threshold)
unsigned int PushPreviousRevision(CatalogJob *job)
void FinalizeJob(CatalogJob *job)
SmallHashDynamic< shash::Any, bool > catalogs_done_
CatalogTN::NestedCatalogList NestedCatalogList
ObjectFetcherT ObjectFetcherTN
void PushJobUnlocked(CatalogJob *job)
CallbackDataTN GetCallbackData() const
static void * MainProcessQueue(void *data)
void NotifyListeners(const int ¶meter)
CatalogTraversalParallel(const Parameters ¶ms)
const bool no_repeat_history_
assert((mem||(size==0))&&"Out Of Memory")
static const uint64_t kNoHistory
unsigned char digest[digest_size_]
bool PrepareCatalog(CatalogJob *job)
uint64_t effective_history_depth_
void ProcessJobPre(CatalogJob *job)
void PushJob(CatalogJob *job)
Base::Parameters Parameters
bool TraverseRevision(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
Base::TraversalType TraversalType
static uint32_t hasher(const shash::Any &key)
CatalogTraversalData< CatalogTN > CallbackDataTN
bool Traverse(const TraversalType type=Base::kBreadthFirst)
bool Traverse(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
void Insert(const Key &key, const Value &value)
ObjectFetcherT::CatalogTN CatalogTN
void ProcessJobPost(CatalogJob *job)
pthread_t * threads_process_
ObjectFetcherT::CatalogTN CatalogTN
time_t effective_timestamp_threshold_
ObjectFetcherT::HistoryTN HistoryTN
void OnChildFinished(const int &a, CatalogJob *job)
unsigned int num_threads_
CatalogJob(const std::string &path, const shash::Any &hash, const unsigned tree_level, const uint64_t history_depth, CatalogTN *parent=NULL)
const uint64_t default_history_depth_
bool Contains(const Key &key) const
const uint64_t history_depth
bool TraverseList(const HashList &root_catalog_list, const TraversalType type=Base::kBreadthFirst)
static const time_t kNoTimestampThreshold
bool serialize_callbacks_
atomic_int32 children_unprocessed
TraversalType effective_traversal_type_
unsigned int PushNestedCatalogs(CatalogJob *job, const NestedCatalogList &catalog_list)
shash::Any GetRepositoryRootCatalogHash()
pthread_mutex_t catalog_callback_lock_
pthread_mutex_t catalogs_lock_
void Init(uint32_t expected_size, Key empty, uint32_t(*hasher)(const Key &key))
const unsigned tree_level
bool CloseCatalog(const bool unlink_db, CatalogJob *job)
bool ReopenCatalog(CatalogJob *job)
std::vector< shash::Any > HashList
const time_t default_timestamp_threshold_