GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/catalog_traversal_parallel.h
Date: 2025-09-28 02:35:26
Exec Total Coverage
Lines: 186 199 93.5%
Branches: 160 244 65.6%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
6 #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
7
8 #include <stack>
9 #include <string>
10 #include <vector>
11
12 #include "catalog_traversal.h"
13 #include "util/atomic.h"
14 #include "util/exception.h"
15 #include "util/tube.h"
16
17 namespace swissknife {
18
19 /**
20 * This class implements the same functionality as CatalogTraversal, but in
21 * parallel. For common functionality, see the documentation of
22 * CatalogTraversal. Differences:
23 * - can choose number of threads
24 * - traversal types change meaning:
25 * - depth-first -> parallelized post-order traversal (parents are processed
26 * after all children are finished)
27 * - breadth-first -> same as original, but parallelized
28 */
29 template<class ObjectFetcherT>
30 class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> {
31 public:
32 typedef CatalogTraversalBase<ObjectFetcherT> Base;
33 typedef ObjectFetcherT ObjectFetcherTN;
34 typedef typename ObjectFetcherT::CatalogTN CatalogTN;
35 typedef typename ObjectFetcherT::HistoryTN HistoryTN;
36 typedef CatalogTraversalData<CatalogTN> CallbackDataTN;
37 typedef typename CatalogTN::NestedCatalogList NestedCatalogList;
38 typedef typename Base::Parameters Parameters;
39 typedef typename Base::TraversalType TraversalType;
40 typedef std::vector<shash::Any> HashList;
41
42 2573 explicit CatalogTraversalParallel(const Parameters &params)
43 : CatalogTraversalBase<ObjectFetcherT>(params)
44 2573 , num_threads_(params.num_threads)
45
4/8
✓ Branch 2 taken 2573 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2573 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2573 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 2573 times.
✗ Branch 12 not taken.
2573 , serialize_callbacks_(params.serialize_callbacks) {
46 2573 atomic_init32(&num_errors_);
47
1/2
✓ Branch 1 taken 2573 times.
✗ Branch 2 not taken.
2573 shash::Any null_hash;
48 2573 null_hash.SetNull();
49
1/2
✓ Branch 1 taken 2573 times.
✗ Branch 2 not taken.
2573 catalogs_processing_.Init(1024, null_hash, hasher);
50
1/2
✓ Branch 1 taken 2573 times.
✗ Branch 2 not taken.
2573 catalogs_done_.Init(1024, null_hash, hasher);
51 2573 pthread_mutex_init(&catalog_callback_lock_, NULL);
52 2573 pthread_mutex_init(&catalogs_lock_, NULL);
53 2573 effective_history_depth_ = this->default_history_depth_;
54 2573 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
55 2573 }
56
57 protected:
58 struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob,
59 public Observable<int> {
60 18050873 explicit CatalogJob(const std::string &path,
61 const shash::Any &hash,
62 const unsigned tree_level,
63 const uint64_t history_depth,
64 CatalogTN *parent = NULL)
65 : CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level,
66 18050873 history_depth, parent) {
67 18050873 atomic_init32(&children_unprocessed);
68 18050873 }
69
70
1/2
✓ Branch 1 taken 9000375 times.
✗ Branch 2 not taken.
9000816 void WakeParents() { this->NotifyListeners(0); }
71
72 atomic_int32 children_unprocessed;
73 };
74
75 public:
76 /**
77 * Starts the traversal process.
78 * After calling this methods CatalogTraversal will go through all catalogs
79 * and call the registered callback methods for each found catalog.
80 * If something goes wrong in the process, the traversal will be cancelled.
81 *
82 * @return true, when all catalogs were successfully processed. On
83 * failure the traversal is cancelled and false is returned.
84 */
85 1750 bool Traverse(const TraversalType type = Base::kBreadthFirst) {
86
1/2
✓ Branch 1 taken 1750 times.
✗ Branch 2 not taken.
1750 const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash();
87
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1750 times.
1750 if (root_catalog_hash.IsNull()) {
88 return false;
89 }
90
1/2
✓ Branch 1 taken 1750 times.
✗ Branch 2 not taken.
1750 return Traverse(root_catalog_hash, type);
91 }
92
93 /**
94 * Starts the traversal process at the catalog pointed to by the given hash
95 *
96 * @param root_catalog_hash the entry point into the catalog traversal
97 * @return true when catalogs were successfully traversed
98 */
99 2485 bool Traverse(const shash::Any &root_catalog_hash,
100 const TraversalType type = Base::kBreadthFirst) {
101 // add the root catalog of the repository as the first element on the job
102 // stack
103 4970 if (this->no_repeat_history_
104
4/6
✓ Branch 0 taken 1407 times.
✓ Branch 1 taken 1078 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1407 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2485 times.
2485 && catalogs_done_.Contains(root_catalog_hash)) {
105 return true;
106 }
107 2485 effective_traversal_type_ = type;
108
3/6
✓ Branch 2 taken 2485 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2485 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2485 times.
✗ Branch 9 not taken.
2485 CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0);
109 2485 PushJob(root_job);
110 2485 return DoTraverse();
111 }
112
113 /**
114 * Start the traversal process from a list of root catalogs. Same as
115 * TraverseRevision function, TraverseList does not traverse into predecessor
116 * catalog revisions and ignores TraversalParameter settings.
117 */
118 1981 bool TraverseList(const HashList &root_catalog_list,
119 const TraversalType type = Base::kBreadthFirst) {
120 // Push in reverse order for CatalogTraversal-like behavior
121 1981 HashList::const_reverse_iterator i = root_catalog_list.rbegin();
122 1981 const HashList::const_reverse_iterator iend = root_catalog_list.rend();
123 1981 bool has_pushed = false;
124 {
125 1981 MutexLockGuard m(&catalogs_lock_);
126
3/4
✓ Branch 2 taken 6091 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4110 times.
✓ Branch 5 taken 1981 times.
6091 for (; i != iend; ++i) {
127
7/8
✓ Branch 0 taken 3669 times.
✓ Branch 1 taken 441 times.
✓ Branch 4 taken 3669 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1326 times.
✓ Branch 7 taken 2343 times.
✓ Branch 8 taken 1326 times.
✓ Branch 9 taken 2784 times.
4110 if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) {
128 1326 continue;
129 }
130
131
3/6
✓ Branch 2 taken 2784 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2784 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2784 times.
✗ Branch 10 not taken.
2784 CatalogJob *root_job = new CatalogJob("", *i, 0, 0);
132
1/2
✓ Branch 1 taken 2784 times.
✗ Branch 2 not taken.
2784 PushJobUnlocked(root_job);
133 2784 has_pushed = true;
134 }
135 1981 }
136 // noop: no catalogs to traverse
137
2/2
✓ Branch 0 taken 507 times.
✓ Branch 1 taken 1474 times.
1981 if (!has_pushed) {
138 507 return true;
139 }
140 1474 effective_traversal_type_ = type;
141 1474 effective_history_depth_ = Parameters::kNoHistory;
142 1474 effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold;
143
1/2
✓ Branch 1 taken 1474 times.
✗ Branch 2 not taken.
1474 bool result = DoTraverse();
144 1474 effective_history_depth_ = this->default_history_depth_;
145 1474 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
146 1474 return result;
147 }
148
149 /**
150 * Starts the traversal process at the catalog pointed to by the given hash
151 * but doesn't traverse into predecessor catalog revisions. This overrides the
152 * TraversalParameter settings provided at construction.
153 *
154 * @param root_catalog_hash the entry point into the catalog traversal
155 * @return true when catalogs were successfully traversed
156 */
157 98 bool TraverseRevision(const shash::Any &root_catalog_hash,
158 const TraversalType type = Base::kBreadthFirst) {
159 98 effective_history_depth_ = Parameters::kNoHistory;
160 98 effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold;
161 98 bool result = Traverse(root_catalog_hash, type);
162 98 effective_history_depth_ = this->default_history_depth_;
163 98 effective_timestamp_threshold_ = this->default_timestamp_threshold_;
164 98 return result;
165 }
166
167 protected:
168 82712241 static uint32_t hasher(const shash::Any &key) {
169 // Don't start with the first bytes, because == is using them as well
170 return static_cast<uint32_t>(
171 82712241 *(reinterpret_cast<const uint32_t *>(key.digest) + 1));
172 }
173
174 3959 bool DoTraverse() {
175 // Optimal number of threads is yet to be determined. The main event loop
176 // contains a spin-lock, so it should not be more than number of cores.
177 3959 threads_process_ = reinterpret_cast<pthread_t *>(
178 3959 smalloc(sizeof(pthread_t) * num_threads_));
179
2/2
✓ Branch 0 taken 4645 times.
✓ Branch 1 taken 3959 times.
8604 for (unsigned int i = 0; i < num_threads_; ++i) {
180 4645 int retval = pthread_create(&threads_process_[i], NULL, MainProcessQueue,
181 this);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4645 times.
4645 if (retval != 0)
183 PANIC(kLogStderr, "failed to create thread");
184 }
185
186
2/2
✓ Branch 0 taken 4645 times.
✓ Branch 1 taken 3959 times.
8604 for (unsigned int i = 0; i < num_threads_; ++i) {
187 4645 int retval = pthread_join(threads_process_[i], NULL);
188
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4645 times.
4645 assert(retval == 0);
189 }
190 3959 free(threads_process_);
191
192
2/2
✓ Branch 1 taken 49 times.
✓ Branch 2 taken 3910 times.
3959 if (atomic_read32(&num_errors_))
193 49 return false;
194
195
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3910 times.
3910 assert(catalogs_processing_.size() == 0);
196
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3910 times.
3910 assert(pre_job_queue_.IsEmpty());
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3910 times.
3910 assert(post_job_queue_.IsEmpty());
198 3910 return true;
199 }
200
201 4645 static void *MainProcessQueue(void *data) {
202 4645 CatalogTraversalParallel<ObjectFetcherT> *traversal = reinterpret_cast<
203 CatalogTraversalParallel<ObjectFetcherT> *>(data);
204 CatalogJob *current_job;
205 while (true) {
206 19711311 current_job = traversal->post_job_queue_.TryPopFront();
207
2/2
✓ Branch 0 taken 1664799 times.
✓ Branch 1 taken 18049697 times.
19714496 if (current_job != NULL) {
208 1664799 traversal->ProcessJobPost(current_job);
209 } else {
210 18049697 current_job = traversal->pre_job_queue_.PopFront();
211 // NULL means the master thread tells us to finish
212
2/2
✓ Branch 1 taken 4645 times.
✓ Branch 2 taken 18045836 times.
18050432 if (current_job->hash.IsNull()) {
213
1/2
✓ Branch 0 taken 4645 times.
✗ Branch 1 not taken.
4645 delete current_job;
214 4645 break;
215 }
216 18045836 traversal->ProcessJobPre(current_job);
217 }
218 }
219 4645 return NULL;
220 }
221
222 3959 void NotifyFinished() {
223
1/2
✓ Branch 1 taken 3959 times.
✗ Branch 2 not taken.
3959 shash::Any null_hash;
224 3959 null_hash.SetNull();
225
2/2
✓ Branch 0 taken 4645 times.
✓ Branch 1 taken 3959 times.
8604 for (unsigned i = 0; i < num_threads_; ++i) {
226
3/6
✓ Branch 2 taken 4645 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4645 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 4645 times.
✗ Branch 9 not taken.
4645 CatalogJob *job = new CatalogJob("", null_hash, 0, 0);
227
1/2
✓ Branch 1 taken 4645 times.
✗ Branch 2 not taken.
4645 pre_job_queue_.EnqueueFront(job);
228 }
229 3959 }
230
231 2485 void PushJob(CatalogJob *job) {
232 2485 MutexLockGuard m(&catalogs_lock_);
233
1/2
✓ Branch 1 taken 2485 times.
✗ Branch 2 not taken.
2485 PushJobUnlocked(job);
234 2485 }
235
236 18046228 void PushJobUnlocked(CatalogJob *job) {
237 18046228 catalogs_processing_.Insert(job->hash, job);
238 18046228 pre_job_queue_.EnqueueFront(job);
239 18046228 }
240
241 18045689 void ProcessJobPre(CatalogJob *job) {
242
4/6
✓ Branch 0 taken 18045689 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18043876 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 49 times.
✓ Branch 6 taken 18043827 times.
18045689 if (!this->PrepareCatalog(job)) {
243 49 atomic_inc32(&num_errors_);
244
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 NotifyFinished();
245 16378391 return;
246 }
247
2/2
✓ Branch 0 taken 264 times.
✓ Branch 1 taken 18043563 times.
18043827 if (job->ignore) {
248
1/2
✓ Branch 1 taken 264 times.
✗ Branch 2 not taken.
264 FinalizeJob(job);
249 264 return;
250 }
251
1/2
✓ Branch 1 taken 18002403 times.
✗ Branch 2 not taken.
18043563 NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs();
252 unsigned int num_children;
253 // Ensure that pushed children won't call ProcessJobPost on this job
254 // before this function finishes
255 {
256 18002403 MutexLockGuard m(&catalogs_lock_);
257
2/2
✓ Branch 0 taken 9045128 times.
✓ Branch 1 taken 9000738 times.
18045866 if (effective_traversal_type_ == Base::kBreadthFirst) {
258
1/2
✓ Branch 1 taken 9045128 times.
✗ Branch 2 not taken.
9045128 num_children = PushPreviousRevision(job)
259
1/2
✓ Branch 1 taken 9045128 times.
✗ Branch 2 not taken.
9045128 + PushNestedCatalogs(job, catalog_list);
260 } else {
261
1/2
✓ Branch 1 taken 9000738 times.
✗ Branch 2 not taken.
9000738 num_children = PushNestedCatalogs(job, catalog_list)
262
1/2
✓ Branch 1 taken 9000738 times.
✗ Branch 2 not taken.
9000738 + PushPreviousRevision(job);
263 9000738 atomic_write32(&job->children_unprocessed, num_children);
264 }
265
3/6
✓ Branch 0 taken 18045866 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18045866 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 18045866 times.
18045866 if (!this->CloseCatalog(false, job)) {
266 atomic_inc32(&num_errors_);
267 NotifyFinished();
268 }
269 18045866 }
270
271 // breadth-first: can post-process immediately
272 // depth-first: no children -> can post-process immediately
273
4/4
✓ Branch 0 taken 9000640 times.
✓ Branch 1 taken 9045177 times.
✓ Branch 2 taken 7335792 times.
✓ Branch 3 taken 1664848 times.
18045817 if (effective_traversal_type_ == Base::kBreadthFirst || num_children == 0) {
274
1/2
✓ Branch 1 taken 16377490 times.
✗ Branch 2 not taken.
16380969 ProcessJobPost(job);
275 16377490 return;
276 }
277
2/2
✓ Branch 1 taken 1664848 times.
✓ Branch 2 taken 16378078 times.
18042338 }
278
279 18045866 unsigned int PushNestedCatalogs(CatalogJob *job,
280 const NestedCatalogList &catalog_list) {
281 18045866 typename NestedCatalogList::const_iterator i = catalog_list.begin();
282 18045866 typename NestedCatalogList::const_iterator iend = catalog_list.end();
283 18045866 unsigned int num_children = 0;
284
2/2
✓ Branch 2 taken 18041001 times.
✓ Branch 3 taken 18045866 times.
36086867 for (; i != iend; ++i) {
285
7/8
✓ Branch 0 taken 28993 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 28993 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3163 times.
✓ Branch 7 taken 25830 times.
✓ Branch 8 taken 3163 times.
✓ Branch 9 taken 18037838 times.
18041001 if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) {
286 3163 continue;
287 }
288
289 CatalogJob *child;
290 36075676 if (!this->no_repeat_history_
291
7/8
✓ Branch 0 taken 25830 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 25830 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 25438 times.
✓ Branch 7 taken 392 times.
✓ Branch 8 taken 18037446 times.
✓ Branch 9 taken 392 times.
18037838 || !catalogs_processing_.Lookup(i->hash, &child)) {
292
2/2
✓ Branch 0 taken 17965213 times.
✓ Branch 1 taken 72233 times.
18037446 CatalogTN *parent = (this->no_close_) ? job->catalog : NULL;
293
1/2
✓ Branch 2 taken 18037446 times.
✗ Branch 3 not taken.
36074892 child = new CatalogJob(i->mountpoint.ToString(),
294
1/2
✓ Branch 2 taken 18037446 times.
✗ Branch 3 not taken.
18037446 i->hash,
295 18037446 job->tree_level + 1,
296
1/2
✓ Branch 1 taken 18037446 times.
✗ Branch 2 not taken.
18037446 job->history_depth,
297 parent);
298
1/2
✓ Branch 1 taken 18037446 times.
✗ Branch 2 not taken.
18037446 PushJobUnlocked(child);
299 }
300
301
2/2
✓ Branch 0 taken 8998920 times.
✓ Branch 1 taken 9038918 times.
18037838 if (effective_traversal_type_ == Base::kDepthFirst) {
302
1/2
✓ Branch 1 taken 8998920 times.
✗ Branch 2 not taken.
8998920 child->RegisterListener(&CatalogTraversalParallel::OnChildFinished,
303 this, job);
304 }
305 18037838 ++num_children;
306 }
307 18045866 return num_children;
308 }
309
310 /**
311 * Pushes the previous revision of a root catalog.
312 * @return the number of catalogs pushed on the processing stack
313 */
314 18045866 unsigned int PushPreviousRevision(CatalogJob *job) {
315 // only root catalogs are used for entering a previous revision (graph)
316
2/2
✓ Branch 1 taken 18037309 times.
✓ Branch 2 taken 8557 times.
18045866 if (!job->catalog->IsRoot()) {
317 18037309 return 0;
318 }
319
320
1/2
✓ Branch 1 taken 8557 times.
✗ Branch 2 not taken.
8557 const shash::Any previous_revision = job->catalog->GetPreviousRevision();
321
2/2
✓ Branch 1 taken 811 times.
✓ Branch 2 taken 7746 times.
8557 if (previous_revision.IsNull()) {
322 811 return 0;
323 }
324
325 // check if the next deeper history level is actually requested
326
3/4
✓ Branch 1 taken 7746 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4233 times.
✓ Branch 4 taken 3513 times.
7746 if (this->IsBelowPruningThresholds(*job, effective_history_depth_,
327 effective_timestamp_threshold_)) {
328 4233 return 0;
329 }
330
331 7026 if (this->no_repeat_history_
332
5/8
✓ Branch 0 taken 2141 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 2141 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 2141 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 3513 times.
3513 && catalogs_done_.Contains(previous_revision)) {
333 return 0;
334 }
335
336 CatalogJob *prev_job;
337 7026 if (!this->no_repeat_history_
338
5/8
✓ Branch 0 taken 2141 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 2141 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2141 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 3513 times.
✗ Branch 8 not taken.
3513 || !catalogs_processing_.Lookup(previous_revision, &prev_job)) {
339
2/4
✓ Branch 2 taken 3513 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3513 times.
✗ Branch 6 not taken.
7026 prev_job = new CatalogJob("", previous_revision, 0,
340
1/2
✓ Branch 1 taken 3513 times.
✗ Branch 2 not taken.
3513 job->history_depth + 1);
341
1/2
✓ Branch 1 taken 3513 times.
✗ Branch 2 not taken.
3513 PushJobUnlocked(prev_job);
342 }
343
344
2/2
✓ Branch 0 taken 735 times.
✓ Branch 1 taken 2778 times.
3513 if (effective_traversal_type_ == Base::kDepthFirst) {
345
1/2
✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
735 prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished,
346 this, job);
347 }
348 3513 return 1;
349 }
350
351 18044347 void ProcessJobPost(CatalogJob *job) {
352 // Save time by keeping catalog open when suitable
353
2/2
✓ Branch 0 taken 18044298 times.
✓ Branch 1 taken 49 times.
18044347 if (job->catalog == NULL) {
354
2/4
✓ Branch 0 taken 18044298 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 17998385 times.
18044298 if (!this->ReopenCatalog(job)) {
355 atomic_inc32(&num_errors_);
356 NotifyFinished();
357 return;
358 }
359 }
360
1/2
✓ Branch 0 taken 17998434 times.
✗ Branch 1 not taken.
17998434 if (serialize_callbacks_) {
361 17998434 MutexLockGuard m(&catalog_callback_lock_);
362
2/4
✓ Branch 1 taken 18045866 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18045866 times.
✗ Branch 5 not taken.
18045866 this->NotifyListeners(job->GetCallbackData());
363 18045866 } else {
364 this->NotifyListeners(job->GetCallbackData());
365 }
366
2/2
✓ Branch 0 taken 80408 times.
✓ Branch 1 taken 17965409 times.
18045817 if (!this->no_close_) {
367
2/4
✓ Branch 0 taken 80408 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 80408 times.
80408 if (!this->CloseCatalog(true, job)) {
368 atomic_inc32(&num_errors_);
369 NotifyFinished();
370 return;
371 }
372 }
373 18045817 FinalizeJob(job);
374 }
375
376 18046032 void FinalizeJob(CatalogJob *job) {
377 {
378 18046032 MutexLockGuard m(&catalogs_lock_);
379
1/2
✓ Branch 1 taken 18046130 times.
✗ Branch 2 not taken.
18046130 catalogs_processing_.Erase(job->hash);
380
1/2
✓ Branch 1 taken 18046130 times.
✗ Branch 2 not taken.
18046130 catalogs_done_.Insert(job->hash, true);
381 // No more catalogs to process -> finish
382
1/2
✓ Branch 2 taken 3910 times.
✗ Branch 3 not taken.
18050040 if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty()
383
5/6
✓ Branch 0 taken 3910 times.
✓ Branch 1 taken 18042220 times.
✓ Branch 3 taken 3910 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 3910 times.
✓ Branch 6 taken 18042220 times.
18050040 && post_job_queue_.IsEmpty()) {
384
1/2
✓ Branch 1 taken 3910 times.
✗ Branch 2 not taken.
3910 NotifyFinished();
385 }
386 18046130 }
387
2/2
✓ Branch 0 taken 9000816 times.
✓ Branch 1 taken 9045216 times.
18046032 if (effective_traversal_type_ == Base::kDepthFirst) {
388 9000816 job->WakeParents();
389 }
390
2/2
✓ Branch 0 taken 18045199 times.
✓ Branch 1 taken 294 times.
18045493 delete job;
391 18042504 }
392
393 8999018 void OnChildFinished(const int &a, CatalogJob *job) {
394 // atomic_xadd32 returns value before subtraction -> needs to equal 1
395
2/2
✓ Branch 1 taken 1664848 times.
✓ Branch 2 taken 7334709 times.
8999018 if (atomic_xadd32(&job->children_unprocessed, -1) == 1) {
396 1664848 post_job_queue_.EnqueueFront(job);
397 }
398 8999557 }
399
400 unsigned int num_threads_;
401 bool serialize_callbacks_;
402
403 uint64_t effective_history_depth_;
404 time_t effective_timestamp_threshold_;
405 TraversalType effective_traversal_type_;
406
407 pthread_t *threads_process_;
408 atomic_int32 num_errors_;
409
410 Tube<CatalogJob> pre_job_queue_;
411 Tube<CatalogJob> post_job_queue_;
412 SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_;
413 SmallHashDynamic<shash::Any, bool> catalogs_done_;
414 pthread_mutex_t catalogs_lock_;
415
416 pthread_mutex_t catalog_callback_lock_;
417 };
418
419 } // namespace swissknife
420
421 #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_
422