GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/catalog_traversal_parallel.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 187 199 94.0%
Branches: 159 244 65.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
7
8 #include <stack>
9 #include <string>
10 #include <vector>
11
12 #include "catalog_traversal.h"
13 #include "util/atomic.h"
14 #include "util/exception.h"
15 #include "util/tube.h"
16
17 namespace swissknife {
18
19 /**
20 * This class implements the same functionality as CatalogTraversal, but in
21 * parallel. For common functionality, see the documentation of
22 * CatalogTraversal. Differences:
23 * - can choose number of threads
24 * - traversal types change meaning:
25 * - depth-first -> parallelized post-order traversal (parents are processed
26 * after all children are finished)
27 * - breadth-first -> same as original, but parallelized
28 */
29 template<class ObjectFetcherT>
30 class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> {
31 public:
32 typedef CatalogTraversalBase<ObjectFetcherT> Base;
33 typedef ObjectFetcherT ObjectFetcherTN;
34 typedef typename ObjectFetcherT::CatalogTN CatalogTN;
35 typedef typename ObjectFetcherT::HistoryTN HistoryTN;
36 typedef CatalogTraversalData<CatalogTN> CallbackDataTN;
37 typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
38 typedef typename Base::Parameters Parameters;
39 typedef typename Base::TraversalType TraversalType;
40 typedef std::vector<shash::Any> HashList;
41
42 1759 explicit CatalogTraversalParallel(const Parameters &params)
43 : CatalogTraversalBase<ObjectFetcherT>(params)
44 1759 , num_threads_(params.num_threads)
45
4/8
✓ Branch 2 taken 1759 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1759 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1759 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1759 times.
✗ Branch 12 not taken.
1759 , serialize_callbacks_(params.serialize_callbacks) {
46 1759 atomic_init32(&num_errors_);
47
1/2
✓ Branch 1 taken 1759 times.
✗ Branch 2 not taken.
1759 shash::Any null_hash;
48 1759 null_hash.SetNull();
49
1/2
✓ Branch 1 taken 1759 times.
✗ Branch 2 not taken.
1759 catalogs_processing_.Init(1024, null_hash, hasher);
50
1/2
✓ Branch 1 taken 1759 times.
✗ Branch 2 not taken.
1759 catalogs_done_.Init(1024, null_hash, hasher);
51 1759 pthread_mutex_init(&catalog_callback_lock_, NULL);
52 1759 pthread_mutex_init(&catalogs_lock_, NULL);
53 1759 effective_history_depth_ = this->default_history_depth_;
54 1759 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
55 1759 }
56
57 protected:
58 struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob,
59 public Observable<int> {
60 18037849 explicit CatalogJob(const std::string &path,
61 const shash::Any &hash,
62 const unsigned tree_level,
63 const uint64_t history_depth,
64 CatalogTN *parent = NULL)
65 : CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level,
66 18037849 history_depth, parent) {
67 18037849 atomic_init32(&children_unprocessed);
68 18037849 }
69
70
1/2
✓ Branch 1 taken 8997565 times.
✗ Branch 2 not taken.
8998202 void WakeParents() { this->NotifyListeners(0); }
71
72 atomic_int32 children_unprocessed;
73 };
74
75 public:
76 /**
77 * Starts the traversal process.
78 * After calling this methods CatalogTraversal will go through all catalogs
79 * and call the registered callback methods for each found catalog.
80 * If something goes wrong in the process, the traversal will be cancelled.
81 *
82 * @return true, when all catalogs were successfully processed. On
83 * failure the traversal is cancelled and false is returned.
84 */
85 973 bool Traverse(const TraversalType type = Base::kBreadthFirst) {
86
1/2
✓ Branch 1 taken 973 times.
✗ Branch 2 not taken.
973 const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash();
87
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 973 times.
973 if (root_catalog_hash.IsNull()) {
88 return false;
89 }
90
1/2
✓ Branch 1 taken 973 times.
✗ Branch 2 not taken.
973 return Traverse(root_catalog_hash, type);
91 }
92
93 /**
94 * Starts the traversal process at the catalog pointed to by the given hash
95 *
96 * @param root_catalog_hash the entry point into the catalog traversal
97 * @return true when catalogs were successfully traversed
98 */
99 1708 bool Traverse(const shash::Any &root_catalog_hash,
100 const TraversalType type = Base::kBreadthFirst) {
101 // add the root catalog of the repository as the first element on the job
102 // stack
103 3416 if (this->no_repeat_history_
104
4/6
✓ Branch 0 taken 630 times.
✓ Branch 1 taken 1078 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 630 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1708 times.
1708 && catalogs_done_.Contains(root_catalog_hash)) {
105 return true;
106 }
107 1708 effective_traversal_type_ = type;
108
3/6
✓ Branch 2 taken 1708 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1708 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1708 times.
✗ Branch 9 not taken.
1708 CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0);
109 1708 PushJob(root_job);
110 1708 return DoTraverse();
111 }
112
113 /**
114 * Start the traversal process from a list of root catalogs. Same as
115 * TraverseRevision function, TraverseList does not traverse into predecessor
116 * catalog revisions and ignores TraversalParameter settings.
117 */
118 427 bool TraverseList(const HashList &root_catalog_list,
119 const TraversalType type = Base::kBreadthFirst) {
120 // Push in reverse order for CatalogTraversal-like behavior
121 427 HashList::const_reverse_iterator i = root_catalog_list.rbegin();
122 427 const HashList::const_reverse_iterator iend = root_catalog_list.rend();
123 427 bool has_pushed = false;
124 {
125 427 MutexLockGuard m(&catalogs_lock_);
126
3/4
✓ Branch 2 taken 1614 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1187 times.
✓ Branch 5 taken 427 times.
1614 for (; i != iend; ++i) {
127
7/8
✓ Branch 0 taken 746 times.
✓ Branch 1 taken 441 times.
✓ Branch 4 taken 746 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 68 times.
✓ Branch 7 taken 678 times.
✓ Branch 8 taken 68 times.
✓ Branch 9 taken 1119 times.
1187 if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) {
128 68 continue;
129 }
130
131
3/6
✓ Branch 2 taken 1119 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1119 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1119 times.
✗ Branch 10 not taken.
1119 CatalogJob *root_job = new CatalogJob("", *i, 0, 0);
132
1/2
✓ Branch 1 taken 1119 times.
✗ Branch 2 not taken.
1119 PushJobUnlocked(root_job);
133 1119 has_pushed = true;
134 }
135 427 }
136 // noop: no catalogs to traverse
137
2/2
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 401 times.
427 if (!has_pushed) {
138 26 return true;
139 }
140 401 effective_traversal_type_ = type;
141 401 effective_history_depth_ = Parameters::kNoHistory;
142 401 effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold;
143
1/2
✓ Branch 1 taken 401 times.
✗ Branch 2 not taken.
401 bool result = DoTraverse();
144 401 effective_history_depth_ = this->default_history_depth_;
145 401 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
146 401 return result;
147 }
148
149 /**
150 * Starts the traversal process at the catalog pointed to by the given hash
151 * but doesn't traverse into predecessor catalog revisions. This overrides the
152 * TraversalParameter settings provided at construction.
153 *
154 * @param root_catalog_hash the entry point into the catalog traversal
155 * @return true when catalogs were successfully traversed
156 */
157 98 bool TraverseRevision(const shash::Any &root_catalog_hash,
158 const TraversalType type = Base::kBreadthFirst) {
159 98 effective_history_depth_ = Parameters::kNoHistory;
160 98 effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold;
161 98 bool result = Traverse(root_catalog_hash, type);
162 98 effective_history_depth_ = this->default_history_depth_;
163 98 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
164 98 return result;
165 }
166
167 protected:
168 82451997 static uint32_t hasher(const shash::Any &key) {
169 // Don't start with the first bytes, because == is using them as well
170 82451997 return (uint32_t) * (reinterpret_cast<const uint32_t *>(key.digest) + 1);
171 }
172
173 2109 bool DoTraverse() {
174 // Optimal number of threads is yet to be determined. The main event loop
175 // contains a spin-lock, so it should not be more than number of cores.
176 2109 threads_process_ = reinterpret_cast<pthread_t *>(
177 2109 smalloc(sizeof(pthread_t) * num_threads_));
178
2/2
✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 2109 times.
4904 for (unsigned int i = 0; i < num_threads_; ++i) {
179 2795 int retval = pthread_create(&threads_process_[i], NULL, MainProcessQueue,
180 this);
181
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2795 times.
2795 if (retval != 0)
182 PANIC(kLogStderr, "failed to create thread");
183 }
184
185
2/2
✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 2109 times.
4904 for (unsigned int i = 0; i < num_threads_; ++i) {
186 2795 int retval = pthread_join(threads_process_[i], NULL);
187
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2795 times.
2795 assert(retval == 0);
188 }
189 2109 free(threads_process_);
190
191
2/2
✓ Branch 1 taken 49 times.
✓ Branch 2 taken 2060 times.
2109 if (atomic_read32(&num_errors_))
192 49 return false;
193
194
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2060 times.
2060 assert(catalogs_processing_.size() == 0);
195
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2060 times.
2060 assert(pre_job_queue_.IsEmpty());
196
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2060 times.
2060 assert(post_job_queue_.IsEmpty());
197 2060 return true;
198 }
199
200 2795 static void *MainProcessQueue(void *data) {
201 2795 CatalogTraversalParallel<ObjectFetcherT> *traversal = reinterpret_cast<
202 CatalogTraversalParallel<ObjectFetcherT> *>(data);
203 CatalogJob *current_job;
204 while (true) {
205 19694495 current_job = traversal->post_job_queue_.TryPopFront();
206
2/2
✓ Branch 0 taken 1663751 times.
✓ Branch 1 taken 18034272 times.
19698023 if (current_job != NULL) {
207 1663751 traversal->ProcessJobPost(current_job);
208 } else {
209 18034272 current_job = traversal->pre_job_queue_.PopFront();
210 // NULL means the master thread tells us to finish
211
2/2
✓ Branch 1 taken 2697 times.
✓ Branch 2 taken 18034123 times.
18037163 if (current_job->hash.IsNull()) {
212
1/2
✓ Branch 0 taken 2697 times.
✗ Branch 1 not taken.
2697 delete current_job;
213 2746 break;
214 }
215 18034123 traversal->ProcessJobPre(current_job);
216 }
217 }
218 2746 return NULL;
219 }
220
221 2109 void NotifyFinished() {
222
1/2
✓ Branch 1 taken 2109 times.
✗ Branch 2 not taken.
2109 shash::Any null_hash;
223 2109 null_hash.SetNull();
224
2/2
✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 2109 times.
4904 for (unsigned i = 0; i < num_threads_; ++i) {
225
3/6
✓ Branch 2 taken 2795 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2795 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2795 times.
✗ Branch 9 not taken.
2795 CatalogJob *job = new CatalogJob("", null_hash, 0, 0);
226
1/2
✓ Branch 1 taken 2795 times.
✗ Branch 2 not taken.
2795 pre_job_queue_.EnqueueFront(job);
227 }
228 2109 }
229
230 1708 void PushJob(CatalogJob *job) {
231 1708 MutexLockGuard m(&catalogs_lock_);
232
1/2
✓ Branch 1 taken 1708 times.
✗ Branch 2 not taken.
1708 PushJobUnlocked(job);
233 1708 }
234
235 18035054 void PushJobUnlocked(CatalogJob *job) {
236 18035054 catalogs_processing_.Insert(job->hash, job);
237 18035054 pre_job_queue_.EnqueueFront(job);
238 18035054 }
239
240 18033584 void ProcessJobPre(CatalogJob *job) {
241
4/6
✓ Branch 0 taken 18033584 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18033045 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 49 times.
✓ Branch 6 taken 18032996 times.
18033584 if (!this->PrepareCatalog(job)) {
242 49 atomic_inc32(&num_errors_);
243
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 NotifyFinished();
244 16364835 return;
245 }
246
2/2
✓ Branch 0 taken 153 times.
✓ Branch 1 taken 18032843 times.
18032996 if (job->ignore) {
247
1/2
✓ Branch 1 taken 153 times.
✗ Branch 2 not taken.
153 FinalizeJob(job);
248 153 return;
249 }
250
1/2
✓ Branch 1 taken 17990311 times.
✗ Branch 2 not taken.
18032843 NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs();
251 unsigned int num_children;
252 // Ensure that pushed children won't call ProcessJobPost on this job
253 // before this function finishes
254 {
255 17990311 MutexLockGuard m(&catalogs_lock_);
256
2/2
✓ Branch 0 taken 9036507 times.
✓ Branch 1 taken 8998296 times.
18034803 if (effective_traversal_type_ == Base::kBreadthFirst) {
257
1/2
✓ Branch 1 taken 9036507 times.
✗ Branch 2 not taken.
9036507 num_children = PushPreviousRevision(job)
258
1/2
✓ Branch 1 taken 9036507 times.
✗ Branch 2 not taken.
9036507 + PushNestedCatalogs(job, catalog_list);
259 } else {
260
1/2
✓ Branch 1 taken 8998296 times.
✗ Branch 2 not taken.
8998296 num_children = PushNestedCatalogs(job, catalog_list)
261
1/2
✓ Branch 1 taken 8998296 times.
✗ Branch 2 not taken.
8998296 + PushPreviousRevision(job);
262 8998296 atomic_write32(&job->children_unprocessed, num_children);
263 }
264
3/6
✓ Branch 0 taken 18034803 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18034803 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 18034803 times.
18034803 if (!this->CloseCatalog(false, job)) {
265 atomic_inc32(&num_errors_);
266 NotifyFinished();
267 }
268 18034803 }
269
270 // breadth-first: can post-process immediately
271 // depth-first: no children -> can post-process immediately
272
4/4
✓ Branch 0 taken 8998149 times.
✓ Branch 1 taken 9036213 times.
✓ Branch 2 taken 7334300 times.
✓ Branch 3 taken 1663849 times.
18034362 if (effective_traversal_type_ == Base::kBreadthFirst || num_children == 0) {
273
1/2
✓ Branch 1 taken 16364731 times.
✗ Branch 2 not taken.
16370513 ProcessJobPost(job);
274 16364731 return;
275 }
276
2/2
✓ Branch 1 taken 1663849 times.
✓ Branch 2 taken 16364633 times.
18028580 }
277
278 18034803 unsigned int PushNestedCatalogs(CatalogJob *job,
279 const NestedCatalogList &catalog_list) {
280 18034803 typename NestedCatalogList::const_iterator i = catalog_list.begin();
281 18034803 typename NestedCatalogList::const_iterator iend = catalog_list.end();
282 18034803 unsigned int num_children = 0;
283
2/2
✓ Branch 2 taken 18032787 times.
✓ Branch 3 taken 18034803 times.
36067590 for (; i != iend; ++i) {
284
7/8
✓ Branch 0 taken 20779 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 20779 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2719 times.
✓ Branch 7 taken 18060 times.
✓ Branch 8 taken 2719 times.
✓ Branch 9 taken 18030068 times.
18032787 if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) {
285 2719 continue;
286 }
287
288 CatalogJob *child;
289 36060136 if (!this->no_repeat_history_
290
7/8
✓ Branch 0 taken 18060 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 18060 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 17668 times.
✓ Branch 7 taken 392 times.
✓ Branch 8 taken 18029676 times.
✓ Branch 9 taken 392 times.
18030068 || !catalogs_processing_.Lookup(i->hash, &child)) {
291
2/2
✓ Branch 0 taken 17965213 times.
✓ Branch 1 taken 64463 times.
18029676 CatalogTN *parent = (this->no_close_) ? job->catalog : NULL;
292
1/2
✓ Branch 2 taken 18029676 times.
✗ Branch 3 not taken.
36059352 child = new CatalogJob(i->mountpoint.ToString(),
293
1/2
✓ Branch 2 taken 18029676 times.
✗ Branch 3 not taken.
18029676 i->hash,
294 18029676 job->tree_level + 1,
295
1/2
✓ Branch 1 taken 18029676 times.
✗ Branch 2 not taken.
18029676 job->history_depth,
296 parent);
297
1/2
✓ Branch 1 taken 18029676 times.
✗ Branch 2 not taken.
18029676 PushJobUnlocked(child);
298 }
299
300
2/2
✓ Branch 0 taken 8997366 times.
✓ Branch 1 taken 9032702 times.
18030068 if (effective_traversal_type_ == Base::kDepthFirst) {
301
1/2
✓ Branch 1 taken 8997366 times.
✗ Branch 2 not taken.
8997366 child->RegisterListener(&CatalogTraversalParallel::OnChildFinished,
302 this, job);
303 }
304 18030068 ++num_children;
305 }
306 18034803 return num_children;
307 }
308
309 /**
310 * Pushes the previous revision of a root catalog.
311 * @return the number of catalogs pushed on the processing stack
312 */
313 18034803 unsigned int PushPreviousRevision(CatalogJob *job) {
314 // only root catalogs are used for entering a previous revision (graph)
315
2/2
✓ Branch 1 taken 18029576 times.
✓ Branch 2 taken 5227 times.
18034803 if (!job->catalog->IsRoot()) {
316 18029576 return 0;
317 }
318
319
1/2
✓ Branch 1 taken 5227 times.
✗ Branch 2 not taken.
5227 const shash::Any previous_revision = job->catalog->GetPreviousRevision();
320
2/2
✓ Branch 1 taken 367 times.
✓ Branch 2 taken 4860 times.
5227 if (previous_revision.IsNull()) {
321 367 return 0;
322 }
323
324 // check if the next deeper history level is actually requested
325
3/4
✓ Branch 1 taken 4860 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2309 times.
✓ Branch 4 taken 2551 times.
4860 if (this->IsBelowPruningThresholds(*job, effective_history_depth_,
326 effective_timestamp_threshold_)) {
327 2309 return 0;
328 }
329
330 5102 if (this->no_repeat_history_
331
5/8
✓ Branch 0 taken 1179 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 1179 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1179 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 2551 times.
2551 && catalogs_done_.Contains(previous_revision)) {
332 return 0;
333 }
334
335 CatalogJob *prev_job;
336 5102 if (!this->no_repeat_history_
337
5/8
✓ Branch 0 taken 1179 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 1179 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1179 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 2551 times.
✗ Branch 8 not taken.
2551 || !catalogs_processing_.Lookup(previous_revision, &prev_job)) {
338
2/4
✓ Branch 2 taken 2551 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2551 times.
✗ Branch 6 not taken.
5102 prev_job = new CatalogJob("", previous_revision, 0,
339
1/2
✓ Branch 1 taken 2551 times.
✗ Branch 2 not taken.
2551 job->history_depth + 1);
340
1/2
✓ Branch 1 taken 2551 times.
✗ Branch 2 not taken.
2551 PushJobUnlocked(prev_job);
341 }
342
343
2/2
✓ Branch 0 taken 735 times.
✓ Branch 1 taken 1816 times.
2551 if (effective_traversal_type_ == Base::kDepthFirst) {
344
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished,
345 this, job);
346 }
347 2551 return 1;
348 }
349
350 18032108 void ProcessJobPost(CatalogJob *job) {
351 // Save time by keeping catalog open when suitable
352
1/2
✓ Branch 0 taken 18032108 times.
✗ Branch 1 not taken.
18032108 if (job->catalog == NULL) {
353
3/4
✓ Branch 0 taken 18032108 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2254 times.
✓ Branch 4 taken 17975611 times.
18032108 if (!this->ReopenCatalog(job)) {
354 2254 atomic_inc32(&num_errors_);
355 NotifyFinished();
356 return;
357 }
358 }
359
1/2
✓ Branch 0 taken 17975611 times.
✗ Branch 1 not taken.
17975611 if (serialize_callbacks_) {
360 17975611 MutexLockGuard m(&catalog_callback_lock_);
361
2/4
✓ Branch 1 taken 18034803 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18034803 times.
✗ Branch 5 not taken.
18034803 this->NotifyListeners(job->GetCallbackData());
362 18034803 } else {
363 this->NotifyListeners(job->GetCallbackData());
364 }
365
2/2
✓ Branch 0 taken 69345 times.
✓ Branch 1 taken 17965409 times.
18034754 if (!this->no_close_) {
366
2/4
✓ Branch 0 taken 69345 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 69345 times.
69345 if (!this->CloseCatalog(true, job)) {
367 atomic_inc32(&num_errors_);
368 NotifyFinished();
369 return;
370 }
371 }
372 18034754 FinalizeJob(job);
373 }
374
375 18034711 void FinalizeJob(CatalogJob *job) {
376 {
377 18034711 MutexLockGuard m(&catalogs_lock_);
378
1/2
✓ Branch 1 taken 18034956 times.
✗ Branch 2 not taken.
18034956 catalogs_processing_.Erase(job->hash);
379
1/2
✓ Branch 1 taken 18034956 times.
✗ Branch 2 not taken.
18034956 catalogs_done_.Insert(job->hash, true);
380 // No more catalogs to process -> finish
381
1/2
✓ Branch 2 taken 2060 times.
✗ Branch 3 not taken.
18037016 if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty()
382
5/6
✓ Branch 0 taken 2060 times.
✓ Branch 1 taken 18032896 times.
✓ Branch 3 taken 2060 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2060 times.
✓ Branch 6 taken 18032896 times.
18037016 && post_job_queue_.IsEmpty()) {
383
1/2
✓ Branch 1 taken 2060 times.
✗ Branch 2 not taken.
2060 NotifyFinished();
384 }
385 18034956 }
386
2/2
✓ Branch 0 taken 8998202 times.
✓ Branch 1 taken 9036362 times.
18034564 if (effective_traversal_type_ == Base::kDepthFirst) {
387 8998202 job->WakeParents();
388 }
389
1/2
✓ Branch 0 taken 18033682 times.
✗ Branch 1 not taken.
18033682 delete job;
390 18031526 }
391
392 8996337 void OnChildFinished(const int &a, CatalogJob *job) {
393 // atomic_xadd32 returns value before subtraction -> needs to equal 1
394
2/2
✓ Branch 1 taken 1663849 times.
✓ Branch 2 taken 7333958 times.
8996337 if (atomic_xadd32(&job->children_unprocessed, -1) == 1) {
395 1663849 post_job_queue_.EnqueueFront(job);
396 }
397 8997758 }
398
399 unsigned int num_threads_;
400 bool serialize_callbacks_;
401
402 uint64_t effective_history_depth_;
403 time_t effective_timestamp_threshold_;
404 TraversalType effective_traversal_type_;
405
406 pthread_t *threads_process_;
407 atomic_int32 num_errors_;
408
409 Tube<CatalogJob> pre_job_queue_;
410 Tube<CatalogJob> post_job_queue_;
411 SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_;
412 SmallHashDynamic<shash::Any, bool> catalogs_done_;
413 pthread_mutex_t catalogs_lock_;
414
415 pthread_mutex_t catalog_callback_lock_;
416 };
417
418 } // namespace swissknife
419
420 #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
421