CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
catalog_traversal_parallel.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
7 
8 #include <stack>
9 #include <string>
10 #include <vector>
11 
12 #include "catalog_traversal.h"
13 #include "util/atomic.h"
14 #include "util/exception.h"
15 #include "util/tube.h"
16 
17 namespace swissknife {
18 
29 template<class ObjectFetcherT>
30 class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> {
31  public:
33  typedef ObjectFetcherT ObjectFetcherTN;
34  typedef typename ObjectFetcherT::CatalogTN CatalogTN;
35  typedef typename ObjectFetcherT::HistoryTN HistoryTN;
37  typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
38  typedef typename Base::Parameters Parameters;
40  typedef std::vector<shash::Any> HashList;
41 
42  explicit CatalogTraversalParallel(const Parameters &params)
43  : CatalogTraversalBase<ObjectFetcherT>(params)
44  , num_threads_(params.num_threads)
45  , serialize_callbacks_(params.serialize_callbacks) {
46  atomic_init32(&num_errors_);
47  shash::Any null_hash;
48  null_hash.SetNull();
49  catalogs_processing_.Init(1024, null_hash, hasher);
50  catalogs_done_.Init(1024, null_hash, hasher);
51  pthread_mutex_init(&catalog_callback_lock_, NULL);
52  pthread_mutex_init(&catalogs_lock_, NULL);
55  }
56 
57  protected:
58  struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob,
59  public Observable<int> {
60  explicit CatalogJob(const std::string &path,
61  const shash::Any &hash,
62  const unsigned tree_level,
63  const uint64_t history_depth,
64  CatalogTN *parent = NULL) :
65  CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level,
66  history_depth, parent) {
67  atomic_init32(&children_unprocessed);
68  }
69 
70  void WakeParents() {
71  this->NotifyListeners(0);
72  }
73 
75  };
76 
77  public:
88  const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash();
89  if (root_catalog_hash.IsNull()) {
90  return false;
91  }
92  return Traverse(root_catalog_hash, type);
93  }
94 
101  bool Traverse(const shash::Any &root_catalog_hash,
103  // add the root catalog of the repository as the first element on the job
104  // stack
105  if (this->no_repeat_history_ &&
106  catalogs_done_.Contains(root_catalog_hash)) {
107  return true;
108  }
110  CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0);
111  PushJob(root_job);
112  return DoTraverse();
113  }
114 
120  bool TraverseList(const HashList &root_catalog_list,
122  // Push in reverse order for CatalogTraversal-like behavior
123  HashList::const_reverse_iterator i = root_catalog_list.rbegin();
124  const HashList::const_reverse_iterator iend = root_catalog_list.rend();
125  bool has_pushed = false;
126  {
128  for (; i != iend; ++i) {
129  if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) {
130  continue;
131  }
132 
133  CatalogJob *root_job = new CatalogJob("", *i, 0, 0);
134  PushJobUnlocked(root_job);
135  has_pushed = true;
136  }
137  }
138  // noop: no catalogs to traverse
139  if (!has_pushed) {
140  return true;
141  }
145  bool result = DoTraverse();
148  return result;
149  }
150 
160  const shash::Any &root_catalog_hash,
162  {
165  bool result = Traverse(root_catalog_hash, type);
168  return result;
169  }
170 
171  protected:
172  static uint32_t hasher(const shash::Any &key) {
173  // Don't start with the first bytes, because == is using them as well
174  return (uint32_t) *(reinterpret_cast<const uint32_t *>(key.digest) + 1);
175  }
176 
177  bool DoTraverse() {
178  // Optimal number of threads is yet to be determined. The main event loop
179  // contains a spin-lock, so it should not be more than number of cores.
180  threads_process_ = reinterpret_cast<pthread_t *>
181  (smalloc(sizeof(pthread_t)*num_threads_));
182  for (unsigned int i = 0; i < num_threads_; ++i) {
183  int retval = pthread_create(&threads_process_[i], NULL,
184  MainProcessQueue, this);
185  if (retval != 0) PANIC(kLogStderr, "failed to create thread");
186  }
187 
188  for (unsigned int i = 0; i < num_threads_; ++i) {
189  int retval = pthread_join(threads_process_[i], NULL);
190  assert(retval == 0);
191  }
192  free(threads_process_);
193 
194  if (atomic_read32(&num_errors_))
195  return false;
196 
197  assert(catalogs_processing_.size() == 0);
198  assert(pre_job_queue_.IsEmpty());
199  assert(post_job_queue_.IsEmpty());
200  return true;
201  }
202 
203  static void *MainProcessQueue(void *data) {
205  reinterpret_cast<CatalogTraversalParallel<ObjectFetcherT> *>(data);
206  CatalogJob *current_job;
207  while (true) {
208  current_job = traversal->post_job_queue_.TryPopFront();
209  if (current_job != NULL) {
210  traversal->ProcessJobPost(current_job);
211  } else {
212  current_job = traversal->pre_job_queue_.PopFront();
213  // NULL means the master thread tells us to finish
214  if (current_job->hash.IsNull()) {
215  delete current_job;
216  break;
217  }
218  traversal->ProcessJobPre(current_job);
219  }
220  }
221  return NULL;
222  }
223 
224  void NotifyFinished() {
225  shash::Any null_hash;
226  null_hash.SetNull();
227  for (unsigned i = 0; i < num_threads_; ++i) {
228  CatalogJob *job = new CatalogJob("", null_hash, 0, 0);
229  pre_job_queue_.EnqueueFront(job);
230  }
231  }
232 
233  void PushJob(CatalogJob *job) {
235  PushJobUnlocked(job);
236  }
237 
239  catalogs_processing_.Insert(job->hash, job);
240  pre_job_queue_.EnqueueFront(job);
241  }
242 
244  if (!this->PrepareCatalog(job)) {
245  atomic_inc32(&num_errors_);
246  NotifyFinished();
247  return;
248  }
249  if (job->ignore) {
250  FinalizeJob(job);
251  return;
252  }
253  NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs();
254  unsigned int num_children;
255  // Ensure that pushed children won't call ProcessJobPost on this job
256  // before this function finishes
257  {
260  num_children = PushPreviousRevision(job) +
261  PushNestedCatalogs(job, catalog_list);
262  } else {
263  num_children = PushNestedCatalogs(job, catalog_list) +
265  atomic_write32(&job->children_unprocessed, num_children);
266  }
267  if (!this->CloseCatalog(false, job)) {
268  atomic_inc32(&num_errors_);
269  NotifyFinished();
270  }
271  }
272 
273  // breadth-first: can post-process immediately
274  // depth-first: no children -> can post-process immediately
276  num_children == 0) {
277  ProcessJobPost(job);
278  return;
279  }
280  }
281 
282  unsigned int PushNestedCatalogs(CatalogJob *job,
283  const NestedCatalogList &catalog_list) {
284  typename NestedCatalogList::const_iterator i = catalog_list.begin();
285  typename NestedCatalogList::const_iterator iend = catalog_list.end();
286  unsigned int num_children = 0;
287  for (; i != iend; ++i) {
288  if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) {
289  continue;
290  }
291 
292  CatalogJob *child;
293  if (!this->no_repeat_history_ ||
294  !catalogs_processing_.Lookup(i->hash, &child)) {
295  CatalogTN *parent = (this->no_close_) ? job->catalog : NULL;
296  child = new CatalogJob(i->mountpoint.ToString(),
297  i->hash,
298  job->tree_level + 1,
299  job->history_depth,
300  parent);
301  PushJobUnlocked(child);
302  }
303 
306  this, job);
307  }
308  ++num_children;
309  }
310  return num_children;
311  }
312 
317  unsigned int PushPreviousRevision(CatalogJob *job) {
318  // only root catalogs are used for entering a previous revision (graph)
319  if (!job->catalog->IsRoot()) {
320  return 0;
321  }
322 
323  const shash::Any previous_revision = job->catalog->GetPreviousRevision();
324  if (previous_revision.IsNull()) {
325  return 0;
326  }
327 
328  // check if the next deeper history level is actually requested
331  return 0;
332  }
333 
334  if (this->no_repeat_history_ &&
335  catalogs_done_.Contains(previous_revision)) {
336  return 0;
337  }
338 
339  CatalogJob *prev_job;
340  if (!this->no_repeat_history_ ||
341  !catalogs_processing_.Lookup(previous_revision, &prev_job)) {
342  prev_job =
343  new CatalogJob("", previous_revision, 0, job->history_depth + 1);
344  PushJobUnlocked(prev_job);
345  }
346 
349  this, job);
350  }
351  return 1;
352  }
353 
355  // Save time by keeping catalog open when suitable
356  if (job->catalog == NULL) {
357  if (!this->ReopenCatalog(job)) {
358  atomic_inc32(&num_errors_);
359  NotifyFinished();
360  return;
361  }
362  }
363  if (serialize_callbacks_) {
365  this->NotifyListeners(job->GetCallbackData());
366  } else {
367  this->NotifyListeners(job->GetCallbackData());
368  }
369  if (!this->no_close_) {
370  if (!this->CloseCatalog(true, job)) {
371  atomic_inc32(&num_errors_);
372  NotifyFinished();
373  return;
374  }
375  }
376  FinalizeJob(job);
377  }
378 
379  void FinalizeJob(CatalogJob *job) {
380  {
382  catalogs_processing_.Erase(job->hash);
383  catalogs_done_.Insert(job->hash, true);
384  // No more catalogs to process -> finish
385  if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty() &&
386  post_job_queue_.IsEmpty()) {
387  NotifyFinished();
388  }
389  }
391  job->WakeParents();
392  }
393  delete job;
394  }
395 
396  void OnChildFinished(const int &a, CatalogJob *job) {
397  // atomic_xadd32 returns value before subtraction -> needs to equal 1
398  if (atomic_xadd32(&job->children_unprocessed, -1) == 1) {
399  post_job_queue_.EnqueueFront(job);
400  }
401  }
402 
403  unsigned int num_threads_;
405 
409 
410  pthread_t *threads_process_;
412 
417  pthread_mutex_t catalogs_lock_;
418 
419  pthread_mutex_t catalog_callback_lock_;
420 };
421 
422 } // namespace swissknife
423 
424 #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
CatalogTraversalBase< ObjectFetcherT > Base
bool IsNull() const
Definition: hash.h:383
SmallHashDynamic< shash::Any, CatalogJob * > catalogs_processing_
bool IsBelowPruningThresholds(const CatalogJob &job, const uint64_t history_depth, const time_t timestamp_threshold)
unsigned int PushPreviousRevision(CatalogJob *job)
SmallHashDynamic< shash::Any, bool > catalogs_done_
CatalogTN::NestedCatalogList NestedCatalogList
#define PANIC(...)
Definition: exception.h:29
void NotifyListeners(const int &parameter)
CatalogTraversalParallel(const Parameters &params)
assert((mem||(size==0))&&"Out Of Memory")
unsigned char digest[digest_size_]
Definition: hash.h:124
bool PrepareCatalog(CatalogJob *job)
int32_t atomic_int32
Definition: atomic.h:17
bool TraverseRevision(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
static uint32_t hasher(const shash::Any &key)
CatalogTraversalData< CatalogTN > CallbackDataTN
bool Traverse(const TraversalType type=Base::kBreadthFirst)
bool Traverse(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
void Insert(const Key &key, const Value &value)
Definition: smallhash.h:109
ObjectFetcherT::CatalogTN CatalogTN
void OnChildFinished(const int &a, CatalogJob *job)
CatalogJob(const std::string &path, const shash::Any &hash, const unsigned tree_level, const uint64_t history_depth, CatalogTN *parent=NULL)
bool Contains(const Key &key) const
Definition: smallhash.h:102
bool TraverseList(const HashList &root_catalog_list, const TraversalType type=Base::kBreadthFirst)
void SetNull()
Definition: hash.h:400
Definition: mutex.h:42
Definition: tube.h:39
unsigned int PushNestedCatalogs(CatalogJob *job, const NestedCatalogList &catalog_list)
void Init(uint32_t expected_size, Key empty, uint32_t(*hasher)(const Key &key))
Definition: smallhash.h:60
bool CloseCatalog(const bool unlink_db, CatalogJob *job)
bool ReopenCatalog(CatalogJob *job)