GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/catalog_traversal_parallel.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 188 200 94.0%
Branches: 159 248 64.1%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
7
8 #include <stack>
9 #include <string>
10 #include <vector>
11
12 #include "catalog_traversal.h"
13 #include "util/atomic.h"
14 #include "util/exception.h"
15 #include "util/tube.h"
16
17 namespace swissknife {
18
19 /**
20 * This class implements the same functionality as CatalogTraversal, but in
21 * parallel. For common functionality, see the documentation of
22 * CatalogTraversal. Differences:
23 * - can choose number of threads
24 * - traversal types change meaning:
25 * - depth-first -> parallelized post-order traversal (parents are processed
26 * after all children are finished)
27 * - breadth-first -> same as original, but parallelized
28 */
29 template<class ObjectFetcherT>
30 class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> {
31 public:
32 typedef CatalogTraversalBase<ObjectFetcherT> Base;
33 typedef ObjectFetcherT ObjectFetcherTN;
34 typedef typename ObjectFetcherT::CatalogTN CatalogTN;
35 typedef typename ObjectFetcherT::HistoryTN HistoryTN;
36 typedef CatalogTraversalData<CatalogTN> CallbackDataTN;
37 typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
38 typedef typename Base::Parameters Parameters;
39 typedef typename Base::TraversalType TraversalType;
40 typedef std::vector<shash::Any> HashList;
41
42 57 explicit CatalogTraversalParallel(const Parameters &params)
43 : CatalogTraversalBase<ObjectFetcherT>(params)
44 57 , num_threads_(params.num_threads)
45
4/8
✓ Branch 2 taken 57 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 57 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 57 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 57 times.
✗ Branch 12 not taken.
57 , serialize_callbacks_(params.serialize_callbacks) {
46 57 atomic_init32(&num_errors_);
47
1/2
✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
57 shash::Any null_hash;
48 57 null_hash.SetNull();
49
1/2
✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
57 catalogs_processing_.Init(1024, null_hash, hasher);
50
1/2
✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
57 catalogs_done_.Init(1024, null_hash, hasher);
51 57 pthread_mutex_init(&catalog_callback_lock_, NULL);
52 57 pthread_mutex_init(&catalogs_lock_, NULL);
53 57 effective_history_depth_ = this->default_history_depth_;
54 57 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
55 57 }
56
57 protected:
58 struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob,
59 public Observable<int> {
60 368457 explicit CatalogJob(const std::string &path,
61 const shash::Any &hash,
62 const unsigned tree_level,
63 const uint64_t history_depth,
64 CatalogTN *parent = NULL) :
65 CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level,
66 368457 history_depth, parent) {
67 368457 atomic_init32(&children_unprocessed);
68 368457 }
69
70 183701 void WakeParents() {
71
1/2
✓ Branch 1 taken 183685 times.
✗ Branch 2 not taken.
183701 this->NotifyListeners(0);
72 183685 }
73
74 atomic_int32 children_unprocessed;
75 };
76
77 public:
78 /**
79 * Starts the traversal process.
80 * After calling this methods CatalogTraversal will go through all catalogs
81 * and call the registered callback methods for each found catalog.
82 * If something goes wrong in the process, the traversal will be cancelled.
83 *
84 * @return true, when all catalogs were successfully processed. On
85 * failure the traversal is cancelled and false is returned.
86 */
87 40 bool Traverse(const TraversalType type = Base::kBreadthFirst) {
88
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash();
89
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
40 if (root_catalog_hash.IsNull()) {
90 return false;
91 }
92
1/2
✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
40 return Traverse(root_catalog_hash, type);
93 }
94
95 /**
96 * Starts the traversal process at the catalog pointed to by the given hash
97 *
98 * @param root_catalog_hash the entry point into the catalog traversal
99 * @return true when catalogs were successfully traversed
100 */
101 55 bool Traverse(const shash::Any &root_catalog_hash,
102 const TraversalType type = Base::kBreadthFirst) {
103 // add the root catalog of the repository as the first element on the job
104 // stack
105
3/6
✓ Branch 0 taken 33 times.
✓ Branch 1 taken 22 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 55 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
88 if (this->no_repeat_history_ &&
106
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 33 times.
33 catalogs_done_.Contains(root_catalog_hash)) {
107 return true;
108 }
109 55 effective_traversal_type_ = type;
110
3/6
✓ Branch 2 taken 55 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 55 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 55 times.
✗ Branch 9 not taken.
55 CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0);
111 55 PushJob(root_job);
112 55 return DoTraverse();
113 }
114
115 /**
116 * Start the traversal process from a list of root catalogs. Same as
117 * TraverseRevision function, TraverseList does not traverse into predecessor
118 * catalog revisions and ignores TraversalParameter settings.
119 */
120 49 bool TraverseList(const HashList &root_catalog_list,
121 const TraversalType type = Base::kBreadthFirst) {
122 // Push in reverse order for CatalogTraversal-like behavior
123 49 HashList::const_reverse_iterator i = root_catalog_list.rbegin();
124 49 const HashList::const_reverse_iterator iend = root_catalog_list.rend();
125 49 bool has_pushed = false;
126 {
127 49 MutexLockGuard m(&catalogs_lock_);
128
3/4
✓ Branch 2 taken 149 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 100 times.
✓ Branch 5 taken 49 times.
149 for (; i != iend; ++i) {
129
7/8
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 9 times.
✓ Branch 4 taken 91 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 34 times.
✓ Branch 7 taken 57 times.
✓ Branch 8 taken 34 times.
✓ Branch 9 taken 66 times.
100 if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) {
130 34 continue;
131 }
132
133
3/6
✓ Branch 2 taken 66 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 66 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 66 times.
✗ Branch 10 not taken.
66 CatalogJob *root_job = new CatalogJob("", *i, 0, 0);
134
1/2
✓ Branch 1 taken 66 times.
✗ Branch 2 not taken.
66 PushJobUnlocked(root_job);
135 66 has_pushed = true;
136 }
137 49 }
138 // noop: no catalogs to traverse
139
2/2
✓ Branch 0 taken 13 times.
✓ Branch 1 taken 36 times.
49 if (!has_pushed) {
140 13 return true;
141 }
142 36 effective_traversal_type_ = type;
143 36 effective_history_depth_ = Parameters::kNoHistory;
144 36 effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold;
145
1/2
✓ Branch 1 taken 36 times.
✗ Branch 2 not taken.
36 bool result = DoTraverse();
146 36 effective_history_depth_ = this->default_history_depth_;
147 36 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
148 36 return result;
149 }
150
151 /**
152 * Starts the traversal process at the catalog pointed to by the given hash
153 * but doesn't traverse into predecessor catalog revisions. This overrides the
154 * TraversalParameter settings provided at construction.
155 *
156 * @param root_catalog_hash the entry point into the catalog traversal
157 * @return true when catalogs were successfully traversed
158 */
159 2 bool TraverseRevision(
160 const shash::Any &root_catalog_hash,
161 const TraversalType type = Base::kBreadthFirst)
162 {
163 2 effective_history_depth_ = Parameters::kNoHistory;
164 2 effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold;
165 2 bool result = Traverse(root_catalog_hash, type);
166 2 effective_history_depth_ = this->default_history_depth_;
167 2 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
168 2 return result;
169 }
170
171 protected:
172 1688330 static uint32_t hasher(const shash::Any &key) {
173 // Don't start with the first bytes, because == is using them as well
174 1688330 return (uint32_t) *(reinterpret_cast<const uint32_t *>(key.digest) + 1);
175 }
176
177 91 bool DoTraverse() {
178 // Optimal number of threads is yet to be determined. The main event loop
179 // contains a spin-lock, so it should not be more than number of cores.
180 91 threads_process_ = reinterpret_cast<pthread_t *>
181 91 (smalloc(sizeof(pthread_t)*num_threads_));
182
2/2
✓ Branch 0 taken 105 times.
✓ Branch 1 taken 91 times.
196 for (unsigned int i = 0; i < num_threads_; ++i) {
183 105 int retval = pthread_create(&threads_process_[i], NULL,
184 MainProcessQueue, this);
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 105 times.
105 if (retval != 0) PANIC(kLogStderr, "failed to create thread");
186 }
187
188
2/2
✓ Branch 0 taken 105 times.
✓ Branch 1 taken 91 times.
196 for (unsigned int i = 0; i < num_threads_; ++i) {
189 105 int retval = pthread_join(threads_process_[i], NULL);
190
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 105 times.
105 assert(retval == 0);
191 }
192 91 free(threads_process_);
193
194
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 90 times.
91 if (atomic_read32(&num_errors_))
195 1 return false;
196
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 90 times.
90 assert(catalogs_processing_.size() == 0);
198
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 90 times.
90 assert(pre_job_queue_.IsEmpty());
199
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 90 times.
90 assert(post_job_queue_.IsEmpty());
200 90 return true;
201 }
202
203 105 static void *MainProcessQueue(void *data) {
204 105 CatalogTraversalParallel<ObjectFetcherT> *traversal =
205 reinterpret_cast<CatalogTraversalParallel<ObjectFetcherT> *>(data);
206 CatalogJob *current_job;
207 while (true) {
208 402274 current_job = traversal->post_job_queue_.TryPopFront();
209
2/2
✓ Branch 0 taken 33982 times.
✓ Branch 1 taken 368377 times.
402359 if (current_job != NULL) {
210 33982 traversal->ProcessJobPost(current_job);
211 } else {
212 368377 current_job = traversal->pre_job_queue_.PopFront();
213 // NULL means the master thread tells us to finish
214
2/2
✓ Branch 1 taken 105 times.
✓ Branch 2 taken 368342 times.
368449 if (current_job->hash.IsNull()) {
215
1/2
✓ Branch 0 taken 105 times.
✗ Branch 1 not taken.
105 delete current_job;
216 105 break;
217 }
218 368342 traversal->ProcessJobPre(current_job);
219 }
220 }
221 105 return NULL;
222 }
223
224 91 void NotifyFinished() {
225
1/2
✓ Branch 1 taken 91 times.
✗ Branch 2 not taken.
91 shash::Any null_hash;
226 91 null_hash.SetNull();
227
2/2
✓ Branch 0 taken 105 times.
✓ Branch 1 taken 91 times.
196 for (unsigned i = 0; i < num_threads_; ++i) {
228
3/6
✓ Branch 2 taken 105 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 105 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 105 times.
✗ Branch 9 not taken.
105 CatalogJob *job = new CatalogJob("", null_hash, 0, 0);
229
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 pre_job_queue_.EnqueueFront(job);
230 }
231 91 }
232
233 55 void PushJob(CatalogJob *job) {
234 55 MutexLockGuard m(&catalogs_lock_);
235
1/2
✓ Branch 1 taken 55 times.
✗ Branch 2 not taken.
55 PushJobUnlocked(job);
236 55 }
237
238 368352 void PushJobUnlocked(CatalogJob *job) {
239 368352 catalogs_processing_.Insert(job->hash, job);
240 368352 pre_job_queue_.EnqueueFront(job);
241 368352 }
242
243 368336 void ProcessJobPre(CatalogJob *job) {
244
4/6
✓ Branch 0 taken 368336 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 368179 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 368178 times.
368336 if (!this->PrepareCatalog(job)) {
245 1 atomic_inc32(&num_errors_);
246
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 NotifyFinished();
247 334185 return;
248 }
249
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 368172 times.
368178 if (job->ignore) {
250
1/2
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
6 FinalizeJob(job);
251 6 return;
252 }
253
1/2
✓ Branch 1 taken 367246 times.
✗ Branch 2 not taken.
368172 NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs();
254 unsigned int num_children;
255 // Ensure that pushed children won't call ProcessJobPost on this job
256 // before this function finishes
257 {
258 367246 MutexLockGuard m(&catalogs_lock_);
259
2/2
✓ Branch 0 taken 184642 times.
✓ Branch 1 taken 183702 times.
368344 if (effective_traversal_type_ == Base::kBreadthFirst) {
260
1/2
✓ Branch 1 taken 184642 times.
✗ Branch 2 not taken.
184642 num_children = PushPreviousRevision(job) +
261
1/2
✓ Branch 1 taken 184642 times.
✗ Branch 2 not taken.
184642 PushNestedCatalogs(job, catalog_list);
262 } else {
263
1/2
✓ Branch 1 taken 183702 times.
✗ Branch 2 not taken.
183702 num_children = PushNestedCatalogs(job, catalog_list) +
264
1/2
✓ Branch 1 taken 183702 times.
✗ Branch 2 not taken.
183702 PushPreviousRevision(job);
265 183702 atomic_write32(&job->children_unprocessed, num_children);
266 }
267
3/6
✓ Branch 0 taken 368344 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 368344 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 368344 times.
368344 if (!this->CloseCatalog(false, job)) {
268 atomic_inc32(&num_errors_);
269 NotifyFinished();
270 }
271 368344 }
272
273 // breadth-first: can post-process immediately
274 // depth-first: no children -> can post-process immediately
275
4/4
✓ Branch 0 taken 183690 times.
✓ Branch 1 taken 184635 times.
✓ Branch 2 taken 149708 times.
✓ Branch 3 taken 33982 times.
368325 if (effective_traversal_type_ == Base::kBreadthFirst ||
276 num_children == 0) {
277
1/2
✓ Branch 1 taken 334116 times.
✗ Branch 2 not taken.
334343 ProcessJobPost(job);
278 334116 return;
279 }
280
2/2
✓ Branch 1 taken 33982 times.
✓ Branch 2 taken 334178 times.
368098 }
281
282 368344 unsigned int PushNestedCatalogs(CatalogJob *job,
283 const NestedCatalogList &catalog_list) {
284 368344 typename NestedCatalogList::const_iterator i = catalog_list.begin();
285 368344 typename NestedCatalogList::const_iterator iend = catalog_list.end();
286 368344 unsigned int num_children = 0;
287
2/2
✓ Branch 2 taken 368229 times.
✓ Branch 3 taken 368344 times.
736573 for (; i != iend; ++i) {
288
7/8
✓ Branch 0 taken 637 times.
✓ Branch 1 taken 367592 times.
✓ Branch 4 taken 637 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 67 times.
✓ Branch 7 taken 570 times.
✓ Branch 8 taken 67 times.
✓ Branch 9 taken 368162 times.
368229 if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) {
289 67 continue;
290 }
291
292 CatalogJob *child;
293
4/4
✓ Branch 0 taken 570 times.
✓ Branch 1 taken 367592 times.
✓ Branch 2 taken 368154 times.
✓ Branch 3 taken 8 times.
368732 if (!this->no_repeat_history_ ||
294
3/4
✓ Branch 2 taken 570 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 562 times.
✓ Branch 5 taken 8 times.
570 !catalogs_processing_.Lookup(i->hash, &child)) {
295
2/2
✓ Branch 0 taken 366637 times.
✓ Branch 1 taken 1517 times.
368154 CatalogTN *parent = (this->no_close_) ? job->catalog : NULL;
296
1/2
✓ Branch 2 taken 368154 times.
✗ Branch 3 not taken.
736308 child = new CatalogJob(i->mountpoint.ToString(),
297
1/2
✓ Branch 2 taken 368154 times.
✗ Branch 3 not taken.
368154 i->hash,
298 368154 job->tree_level + 1,
299
1/2
✓ Branch 1 taken 368154 times.
✗ Branch 2 not taken.
368154 job->history_depth,
300 parent);
301
1/2
✓ Branch 1 taken 368154 times.
✗ Branch 2 not taken.
368154 PushJobUnlocked(child);
302 }
303
304
2/2
✓ Branch 0 taken 183660 times.
✓ Branch 1 taken 184502 times.
368162 if (effective_traversal_type_ == Base::kDepthFirst) {
305
1/2
✓ Branch 1 taken 183660 times.
✗ Branch 2 not taken.
183660 child->RegisterListener(&CatalogTraversalParallel::OnChildFinished,
306 this, job);
307 }
308 368162 ++num_children;
309 }
310 368344 return num_children;
311 }
312
313 /**
314 * Pushes the previous revision of a root catalog.
315 * @return the number of catalogs pushed on the processing stack
316 */
317 368344 unsigned int PushPreviousRevision(CatalogJob *job) {
318 // only root catalogs are used for entering a previous revision (graph)
319
2/2
✓ Branch 1 taken 368151 times.
✓ Branch 2 taken 193 times.
368344 if (!job->catalog->IsRoot()) {
320 368151 return 0;
321 }
322
323
1/2
✓ Branch 1 taken 193 times.
✗ Branch 2 not taken.
193 const shash::Any previous_revision = job->catalog->GetPreviousRevision();
324
2/2
✓ Branch 1 taken 19 times.
✓ Branch 2 taken 174 times.
193 if (previous_revision.IsNull()) {
325 19 return 0;
326 }
327
328 // check if the next deeper history level is actually requested
329
3/4
✓ Branch 1 taken 174 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 97 times.
✓ Branch 4 taken 77 times.
174 if (this->IsBelowPruningThresholds(*job, effective_history_depth_,
330 effective_timestamp_threshold_)) {
331 97 return 0;
332 }
333
334
3/6
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 77 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
126 if (this->no_repeat_history_ &&
335
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 49 times.
49 catalogs_done_.Contains(previous_revision)) {
336 return 0;
337 }
338
339 CatalogJob *prev_job;
340
3/4
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 28 times.
✓ Branch 2 taken 77 times.
✗ Branch 3 not taken.
126 if (!this->no_repeat_history_ ||
341
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
49 !catalogs_processing_.Lookup(previous_revision, &prev_job)) {
342
1/2
✓ Branch 2 taken 77 times.
✗ Branch 3 not taken.
154 prev_job =
343
2/4
✓ Branch 1 taken 77 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 77 times.
✗ Branch 5 not taken.
77 new CatalogJob("", previous_revision, 0, job->history_depth + 1);
344
1/2
✓ Branch 1 taken 77 times.
✗ Branch 2 not taken.
77 PushJobUnlocked(prev_job);
345 }
346
347
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 62 times.
77 if (effective_traversal_type_ == Base::kDepthFirst) {
348
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished,
349 this, job);
350 }
351 77 return 1;
352 }
353
354 368284 void ProcessJobPost(CatalogJob *job) {
355 // Save time by keeping catalog open when suitable
356
1/2
✓ Branch 0 taken 368285 times.
✗ Branch 1 not taken.
368284 if (job->catalog == NULL) {
357
2/4
✓ Branch 0 taken 368285 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 367213 times.
368285 if (!this->ReopenCatalog(job)) {
358 atomic_inc32(&num_errors_);
359 NotifyFinished();
360 return;
361 }
362 }
363
1/2
✓ Branch 0 taken 367212 times.
✗ Branch 1 not taken.
367212 if (serialize_callbacks_) {
364 367212 MutexLockGuard m(&catalog_callback_lock_);
365
2/4
✓ Branch 1 taken 368344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 368344 times.
✗ Branch 5 not taken.
368344 this->NotifyListeners(job->GetCallbackData());
366 368344 } else {
367 this->NotifyListeners(job->GetCallbackData());
368 }
369
2/2
✓ Branch 0 taken 1702 times.
✓ Branch 1 taken 366642 times.
368344 if (!this->no_close_) {
370
2/4
✓ Branch 0 taken 1702 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1702 times.
1702 if (!this->CloseCatalog(true, job)) {
371 atomic_inc32(&num_errors_);
372 NotifyFinished();
373 return;
374 }
375 }
376 368344 FinalizeJob(job);
377 }
378
379 368349 void FinalizeJob(CatalogJob *job) {
380 {
381 368349 MutexLockGuard m(&catalogs_lock_);
382
1/2
✓ Branch 1 taken 368350 times.
✗ Branch 2 not taken.
368350 catalogs_processing_.Erase(job->hash);
383
1/2
✓ Branch 1 taken 368350 times.
✗ Branch 2 not taken.
368350 catalogs_done_.Insert(job->hash, true);
384 // No more catalogs to process -> finish
385
6/8
✓ Branch 1 taken 90 times.
✓ Branch 2 taken 368260 times.
✓ Branch 4 taken 90 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 90 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 90 times.
✓ Branch 9 taken 368260 times.
368440 if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty() &&
386 90 post_job_queue_.IsEmpty()) {
387
1/2
✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
90 NotifyFinished();
388 }
389 368350 }
390
2/2
✓ Branch 0 taken 183700 times.
✓ Branch 1 taken 184636 times.
368336 if (effective_traversal_type_ == Base::kDepthFirst) {
391 183700 job->WakeParents();
392 }
393
2/2
✓ Branch 0 taken 368308 times.
✓ Branch 1 taken 7 times.
368315 delete job;
394 368250 }
395
396 183636 void OnChildFinished(const int &a, CatalogJob *job) {
397 // atomic_xadd32 returns value before subtraction -> needs to equal 1
398
2/2
✓ Branch 1 taken 33982 times.
✓ Branch 2 taken 149692 times.
183636 if (atomic_xadd32(&job->children_unprocessed, -1) == 1) {
399 33982 post_job_queue_.EnqueueFront(job);
400 }
401 183674 }
402
403 unsigned int num_threads_;
404 bool serialize_callbacks_;
405
406 uint64_t effective_history_depth_;
407 time_t effective_timestamp_threshold_;
408 TraversalType effective_traversal_type_;
409
410 pthread_t *threads_process_;
411 atomic_int32 num_errors_;
412
413 Tube<CatalogJob> pre_job_queue_;
414 Tube<CatalogJob> post_job_queue_;
415 SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_;
416 SmallHashDynamic<shash::Any, bool> catalogs_done_;
417 pthread_mutex_t catalogs_lock_;
418
419 pthread_mutex_t catalog_callback_lock_;
420 };
421
422 } // namespace swissknife
423
424 #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
425