CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
catalog_traversal_parallel.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
7 
8 #include <stack>
9 #include <string>
10 #include <vector>
11 
12 #include "catalog_traversal.h"
13 #include "util/atomic.h"
14 #include "util/exception.h"
15 #include "util/tube.h"
16 
17 namespace swissknife {
18 
29 template<class ObjectFetcherT>
30 class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> {
31  public:
33  typedef ObjectFetcherT ObjectFetcherTN;
34  typedef typename ObjectFetcherT::CatalogTN CatalogTN;
35  typedef typename ObjectFetcherT::HistoryTN HistoryTN;
37  typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
38  typedef typename Base::Parameters Parameters;
40  typedef std::vector<shash::Any> HashList;
41 
42  explicit CatalogTraversalParallel(const Parameters &params)
43  : CatalogTraversalBase<ObjectFetcherT>(params)
44  , num_threads_(params.num_threads)
45  , serialize_callbacks_(params.serialize_callbacks) {
46  atomic_init32(&num_errors_);
47  shash::Any null_hash;
48  null_hash.SetNull();
49  catalogs_processing_.Init(1024, null_hash, hasher);
50  catalogs_done_.Init(1024, null_hash, hasher);
51  pthread_mutex_init(&catalog_callback_lock_, NULL);
52  pthread_mutex_init(&catalogs_lock_, NULL);
55  }
56 
57  protected:
58  struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob,
59  public Observable<int> {
60  explicit CatalogJob(const std::string &path,
61  const shash::Any &hash,
62  const unsigned tree_level,
63  const uint64_t history_depth,
64  CatalogTN *parent = NULL)
65  : CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level,
66  history_depth, parent) {
67  atomic_init32(&children_unprocessed);
68  }
69 
70  void WakeParents() { this->NotifyListeners(0); }
71 
73  };
74 
75  public:
86  const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash();
87  if (root_catalog_hash.IsNull()) {
88  return false;
89  }
90  return Traverse(root_catalog_hash, type);
91  }
92 
99  bool Traverse(const shash::Any &root_catalog_hash,
101  // add the root catalog of the repository as the first element on the job
102  // stack
103  if (this->no_repeat_history_
104  && catalogs_done_.Contains(root_catalog_hash)) {
105  return true;
106  }
108  CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0);
109  PushJob(root_job);
110  return DoTraverse();
111  }
112 
118  bool TraverseList(const HashList &root_catalog_list,
120  // Push in reverse order for CatalogTraversal-like behavior
121  HashList::const_reverse_iterator i = root_catalog_list.rbegin();
122  const HashList::const_reverse_iterator iend = root_catalog_list.rend();
123  bool has_pushed = false;
124  {
126  for (; i != iend; ++i) {
127  if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) {
128  continue;
129  }
130 
131  CatalogJob *root_job = new CatalogJob("", *i, 0, 0);
132  PushJobUnlocked(root_job);
133  has_pushed = true;
134  }
135  }
136  // noop: no catalogs to traverse
137  if (!has_pushed) {
138  return true;
139  }
143  bool result = DoTraverse();
146  return result;
147  }
148 
157  bool TraverseRevision(const shash::Any &root_catalog_hash,
161  bool result = Traverse(root_catalog_hash, type);
164  return result;
165  }
166 
167  protected:
168  static uint32_t hasher(const shash::Any &key) {
169  // Don't start with the first bytes, because == is using them as well
170  return (uint32_t) * (reinterpret_cast<const uint32_t *>(key.digest) + 1);
171  }
172 
173  bool DoTraverse() {
174  // Optimal number of threads is yet to be determined. The main event loop
175  // contains a spin-lock, so it should not be more than number of cores.
176  threads_process_ = reinterpret_cast<pthread_t *>(
177  smalloc(sizeof(pthread_t) * num_threads_));
178  for (unsigned int i = 0; i < num_threads_; ++i) {
179  int retval = pthread_create(&threads_process_[i], NULL, MainProcessQueue,
180  this);
181  if (retval != 0)
182  PANIC(kLogStderr, "failed to create thread");
183  }
184 
185  for (unsigned int i = 0; i < num_threads_; ++i) {
186  int retval = pthread_join(threads_process_[i], NULL);
187  assert(retval == 0);
188  }
189  free(threads_process_);
190 
191  if (atomic_read32(&num_errors_))
192  return false;
193 
194  assert(catalogs_processing_.size() == 0);
195  assert(pre_job_queue_.IsEmpty());
196  assert(post_job_queue_.IsEmpty());
197  return true;
198  }
199 
200  static void *MainProcessQueue(void *data) {
201  CatalogTraversalParallel<ObjectFetcherT> *traversal = reinterpret_cast<
203  CatalogJob *current_job;
204  while (true) {
205  current_job = traversal->post_job_queue_.TryPopFront();
206  if (current_job != NULL) {
207  traversal->ProcessJobPost(current_job);
208  } else {
209  current_job = traversal->pre_job_queue_.PopFront();
210  // NULL means the master thread tells us to finish
211  if (current_job->hash.IsNull()) {
212  delete current_job;
213  break;
214  }
215  traversal->ProcessJobPre(current_job);
216  }
217  }
218  return NULL;
219  }
220 
221  void NotifyFinished() {
222  shash::Any null_hash;
223  null_hash.SetNull();
224  for (unsigned i = 0; i < num_threads_; ++i) {
225  CatalogJob *job = new CatalogJob("", null_hash, 0, 0);
226  pre_job_queue_.EnqueueFront(job);
227  }
228  }
229 
230  void PushJob(CatalogJob *job) {
232  PushJobUnlocked(job);
233  }
234 
236  catalogs_processing_.Insert(job->hash, job);
237  pre_job_queue_.EnqueueFront(job);
238  }
239 
241  if (!this->PrepareCatalog(job)) {
242  atomic_inc32(&num_errors_);
243  NotifyFinished();
244  return;
245  }
246  if (job->ignore) {
247  FinalizeJob(job);
248  return;
249  }
250  NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs();
251  unsigned int num_children;
252  // Ensure that pushed children won't call ProcessJobPost on this job
253  // before this function finishes
254  {
257  num_children = PushPreviousRevision(job)
258  + PushNestedCatalogs(job, catalog_list);
259  } else {
260  num_children = PushNestedCatalogs(job, catalog_list)
261  + PushPreviousRevision(job);
262  atomic_write32(&job->children_unprocessed, num_children);
263  }
264  if (!this->CloseCatalog(false, job)) {
265  atomic_inc32(&num_errors_);
266  NotifyFinished();
267  }
268  }
269 
270  // breadth-first: can post-process immediately
271  // depth-first: no children -> can post-process immediately
272  if (effective_traversal_type_ == Base::kBreadthFirst || num_children == 0) {
273  ProcessJobPost(job);
274  return;
275  }
276  }
277 
278  unsigned int PushNestedCatalogs(CatalogJob *job,
279  const NestedCatalogList &catalog_list) {
280  typename NestedCatalogList::const_iterator i = catalog_list.begin();
281  typename NestedCatalogList::const_iterator iend = catalog_list.end();
282  unsigned int num_children = 0;
283  for (; i != iend; ++i) {
284  if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) {
285  continue;
286  }
287 
288  CatalogJob *child;
289  if (!this->no_repeat_history_
290  || !catalogs_processing_.Lookup(i->hash, &child)) {
291  CatalogTN *parent = (this->no_close_) ? job->catalog : NULL;
292  child = new CatalogJob(i->mountpoint.ToString(),
293  i->hash,
294  job->tree_level + 1,
295  job->history_depth,
296  parent);
297  PushJobUnlocked(child);
298  }
299 
302  this, job);
303  }
304  ++num_children;
305  }
306  return num_children;
307  }
308 
313  unsigned int PushPreviousRevision(CatalogJob *job) {
314  // only root catalogs are used for entering a previous revision (graph)
315  if (!job->catalog->IsRoot()) {
316  return 0;
317  }
318 
319  const shash::Any previous_revision = job->catalog->GetPreviousRevision();
320  if (previous_revision.IsNull()) {
321  return 0;
322  }
323 
324  // check if the next deeper history level is actually requested
327  return 0;
328  }
329 
330  if (this->no_repeat_history_
331  && catalogs_done_.Contains(previous_revision)) {
332  return 0;
333  }
334 
335  CatalogJob *prev_job;
336  if (!this->no_repeat_history_
337  || !catalogs_processing_.Lookup(previous_revision, &prev_job)) {
338  prev_job = new CatalogJob("", previous_revision, 0,
339  job->history_depth + 1);
340  PushJobUnlocked(prev_job);
341  }
342 
345  this, job);
346  }
347  return 1;
348  }
349 
351  // Save time by keeping catalog open when suitable
352  if (job->catalog == NULL) {
353  if (!this->ReopenCatalog(job)) {
354  atomic_inc32(&num_errors_);
355  NotifyFinished();
356  return;
357  }
358  }
359  if (serialize_callbacks_) {
361  this->NotifyListeners(job->GetCallbackData());
362  } else {
363  this->NotifyListeners(job->GetCallbackData());
364  }
365  if (!this->no_close_) {
366  if (!this->CloseCatalog(true, job)) {
367  atomic_inc32(&num_errors_);
368  NotifyFinished();
369  return;
370  }
371  }
372  FinalizeJob(job);
373  }
374 
375  void FinalizeJob(CatalogJob *job) {
376  {
378  catalogs_processing_.Erase(job->hash);
379  catalogs_done_.Insert(job->hash, true);
380  // No more catalogs to process -> finish
381  if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty()
382  && post_job_queue_.IsEmpty()) {
383  NotifyFinished();
384  }
385  }
387  job->WakeParents();
388  }
389  delete job;
390  }
391 
392  void OnChildFinished(const int &a, CatalogJob *job) {
393  // atomic_xadd32 returns value before subtraction -> needs to equal 1
394  if (atomic_xadd32(&job->children_unprocessed, -1) == 1) {
395  post_job_queue_.EnqueueFront(job);
396  }
397  }
398 
399  unsigned int num_threads_;
401 
405 
406  pthread_t *threads_process_;
408 
413  pthread_mutex_t catalogs_lock_;
414 
415  pthread_mutex_t catalog_callback_lock_;
416 };
417 
418 } // namespace swissknife
419 
420 #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
CallbackPtr RegisterListener(typename BoundClosure< ParamT, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
CatalogTraversalBase< ObjectFetcherT > Base
bool IsNull() const
Definition: hash.h:371
SmallHashDynamic< shash::Any, CatalogJob * > catalogs_processing_
bool IsBelowPruningThresholds(const CatalogJob &job, const uint64_t history_depth, const time_t timestamp_threshold)
unsigned int PushPreviousRevision(CatalogJob *job)
SmallHashDynamic< shash::Any, bool > catalogs_done_
CatalogTN::NestedCatalogList NestedCatalogList
#define PANIC(...)
Definition: exception.h:29
void NotifyListeners(const int &parameter)
CatalogTraversalParallel(const Parameters &params)
assert((mem||(size==0))&&"Out Of Memory")
unsigned char digest[digest_size_]
Definition: hash.h:121
bool PrepareCatalog(CatalogJob *job)
int32_t atomic_int32
Definition: atomic.h:17
bool TraverseRevision(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
static uint32_t hasher(const shash::Any &key)
CatalogTraversalData< CatalogTN > CallbackDataTN
bool Traverse(const TraversalType type=Base::kBreadthFirst)
bool Traverse(const shash::Any &root_catalog_hash, const TraversalType type=Base::kBreadthFirst)
void Insert(const Key &key, const Value &value)
Definition: smallhash.h:106
ObjectFetcherT::CatalogTN CatalogTN
void OnChildFinished(const int &a, CatalogJob *job)
CatalogJob(const std::string &path, const shash::Any &hash, const unsigned tree_level, const uint64_t history_depth, CatalogTN *parent=NULL)
bool Contains(const Key &key) const
Definition: smallhash.h:99
bool TraverseList(const HashList &root_catalog_list, const TraversalType type=Base::kBreadthFirst)
void SetNull()
Definition: hash.h:388
Definition: mutex.h:42
Definition: tube.h:39
unsigned int PushNestedCatalogs(CatalogJob *job, const NestedCatalogList &catalog_list)
void Init(uint32_t expected_size, Key empty, uint32_t(*hasher)(const Key &key))
Definition: smallhash.h:58
bool CloseCatalog(const bool unlink_db, CatalogJob *job)
bool ReopenCatalog(CatalogJob *job)