Directory: | cvmfs/ |
---|---|
File: | cvmfs/catalog_traversal_parallel.h |
Date: | 2025-10-19 02:35:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 186 | 199 | 93.5% |
Branches: | 159 | 244 | 65.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
6 | #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
7 | |||
8 | #include <stack> | ||
9 | #include <string> | ||
10 | #include <vector> | ||
11 | |||
12 | #include "catalog_traversal.h" | ||
13 | #include "util/atomic.h" | ||
14 | #include "util/exception.h" | ||
15 | #include "util/tube.h" | ||
16 | |||
17 | namespace swissknife { | ||
18 | |||
19 | /** | ||
20 | * This class implements the same functionality as CatalogTraversal, but in | ||
21 | * parallel. For common functionality, see the documentation of | ||
22 | * CatalogTraversal. Differences: | ||
23 | * - can choose number of threads | ||
24 | * - traversal types change meaning: | ||
25 | * - depth-first -> parallelized post-order traversal (parents are processed | ||
26 | * after all children are finished) | ||
27 | * - breadth-first -> same as original, but parallelized | ||
28 | */ | ||
29 | template<class ObjectFetcherT> | ||
30 | class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> { | ||
31 | public: | ||
32 | typedef CatalogTraversalBase<ObjectFetcherT> Base; | ||
33 | typedef ObjectFetcherT ObjectFetcherTN; | ||
34 | typedef typename ObjectFetcherT::CatalogTN CatalogTN; | ||
35 | typedef typename ObjectFetcherT::HistoryTN HistoryTN; | ||
36 | typedef CatalogTraversalData<CatalogTN> CallbackDataTN; | ||
37 | typedef typename CatalogTN::NestedCatalogList NestedCatalogList; | ||
38 | typedef typename Base::Parameters Parameters; | ||
39 | typedef typename Base::TraversalType TraversalType; | ||
40 | typedef std::vector<shash::Any> HashList; | ||
41 | |||
42 | 1488 | explicit CatalogTraversalParallel(const Parameters ¶ms) | |
43 | : CatalogTraversalBase<ObjectFetcherT>(params) | ||
44 | 1488 | , num_threads_(params.num_threads) | |
45 |
4/8✓ Branch 2 taken 1488 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1488 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1488 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1488 times.
✗ Branch 12 not taken.
|
1488 | , serialize_callbacks_(params.serialize_callbacks) { |
46 | 1488 | atomic_init32(&num_errors_); | |
47 |
1/2✓ Branch 1 taken 1488 times.
✗ Branch 2 not taken.
|
1488 | shash::Any null_hash; |
48 | 1488 | null_hash.SetNull(); | |
49 |
1/2✓ Branch 1 taken 1488 times.
✗ Branch 2 not taken.
|
1488 | catalogs_processing_.Init(1024, null_hash, hasher); |
50 |
1/2✓ Branch 1 taken 1488 times.
✗ Branch 2 not taken.
|
1488 | catalogs_done_.Init(1024, null_hash, hasher); |
51 | 1488 | pthread_mutex_init(&catalog_callback_lock_, NULL); | |
52 | 1488 | pthread_mutex_init(&catalogs_lock_, NULL); | |
53 | 1488 | effective_history_depth_ = this->default_history_depth_; | |
54 | 1488 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
55 | 1488 | } | |
56 | |||
57 | protected: | ||
58 | struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob, | ||
59 | public Observable<int> { | ||
60 | 14725608 | explicit CatalogJob(const std::string &path, | |
61 | const shash::Any &hash, | ||
62 | const unsigned tree_level, | ||
63 | const uint64_t history_depth, | ||
64 | CatalogTN *parent = NULL) | ||
65 | : CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level, | ||
66 | 14725608 | history_depth, parent) { | |
67 | 14725608 | atomic_init32(&children_unprocessed); | |
68 | 14725608 | } | |
69 | |||
70 |
1/2✓ Branch 1 taken 7344592 times.
✗ Branch 2 not taken.
|
7345632 | void WakeParents() { this->NotifyListeners(0); } |
71 | |||
72 | atomic_int32 children_unprocessed; | ||
73 | }; | ||
74 | |||
75 | public: | ||
76 | /** | ||
77 | * Starts the traversal process. | ||
78 | * After calling this methods CatalogTraversal will go through all catalogs | ||
79 | * and call the registered callback methods for each found catalog. | ||
80 | * If something goes wrong in the process, the traversal will be cancelled. | ||
81 | * | ||
82 | * @return true, when all catalogs were successfully processed. On | ||
83 | * failure the traversal is cancelled and false is returned. | ||
84 | */ | ||
85 | 844 | bool Traverse(const TraversalType type = Base::kBreadthFirst) { | |
86 |
1/2✓ Branch 1 taken 844 times.
✗ Branch 2 not taken.
|
844 | const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash(); |
87 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 844 times.
|
844 | if (root_catalog_hash.IsNull()) { |
88 | ✗ | return false; | |
89 | } | ||
90 |
1/2✓ Branch 1 taken 844 times.
✗ Branch 2 not taken.
|
844 | return Traverse(root_catalog_hash, type); |
91 | } | ||
92 | |||
93 | /** | ||
94 | * Starts the traversal process at the catalog pointed to by the given hash | ||
95 | * | ||
96 | * @param root_catalog_hash the entry point into the catalog traversal | ||
97 | * @return true when catalogs were successfully traversed | ||
98 | */ | ||
99 | 1444 | bool Traverse(const shash::Any &root_catalog_hash, | |
100 | const TraversalType type = Base::kBreadthFirst) { | ||
101 | // add the root catalog of the repository as the first element on the job | ||
102 | // stack | ||
103 | 2888 | if (this->no_repeat_history_ | |
104 |
4/6✓ Branch 0 taken 564 times.
✓ Branch 1 taken 880 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 564 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1444 times.
|
1444 | && catalogs_done_.Contains(root_catalog_hash)) { |
105 | ✗ | return true; | |
106 | } | ||
107 | 1444 | effective_traversal_type_ = type; | |
108 |
3/6✓ Branch 2 taken 1444 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1444 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1444 times.
✗ Branch 9 not taken.
|
1444 | CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0); |
109 | 1444 | PushJob(root_job); | |
110 | 1444 | return DoTraverse(); | |
111 | } | ||
112 | |||
113 | /** | ||
114 | * Start the traversal process from a list of root catalogs. Same as | ||
115 | * TraverseRevision function, TraverseList does not traverse into predecessor | ||
116 | * catalog revisions and ignores TraversalParameter settings. | ||
117 | */ | ||
118 | 448 | bool TraverseList(const HashList &root_catalog_list, | |
119 | const TraversalType type = Base::kBreadthFirst) { | ||
120 | // Push in reverse order for CatalogTraversal-like behavior | ||
121 | 448 | HashList::const_reverse_iterator i = root_catalog_list.rbegin(); | |
122 | 448 | const HashList::const_reverse_iterator iend = root_catalog_list.rend(); | |
123 | 448 | bool has_pushed = false; | |
124 | { | ||
125 | 448 | MutexLockGuard m(&catalogs_lock_); | |
126 |
3/4✓ Branch 2 taken 1604 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1156 times.
✓ Branch 5 taken 448 times.
|
1604 | for (; i != iend; ++i) { |
127 |
7/8✓ Branch 0 taken 796 times.
✓ Branch 1 taken 360 times.
✓ Branch 4 taken 796 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 136 times.
✓ Branch 7 taken 660 times.
✓ Branch 8 taken 136 times.
✓ Branch 9 taken 1020 times.
|
1156 | if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) { |
128 | 136 | continue; | |
129 | } | ||
130 | |||
131 |
3/6✓ Branch 2 taken 1020 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1020 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1020 times.
✗ Branch 10 not taken.
|
1020 | CatalogJob *root_job = new CatalogJob("", *i, 0, 0); |
132 |
1/2✓ Branch 1 taken 1020 times.
✗ Branch 2 not taken.
|
1020 | PushJobUnlocked(root_job); |
133 | 1020 | has_pushed = true; | |
134 | } | ||
135 | 448 | } | |
136 | // noop: no catalogs to traverse | ||
137 |
2/2✓ Branch 0 taken 52 times.
✓ Branch 1 taken 396 times.
|
448 | if (!has_pushed) { |
138 | 52 | return true; | |
139 | } | ||
140 | 396 | effective_traversal_type_ = type; | |
141 | 396 | effective_history_depth_ = Parameters::kNoHistory; | |
142 | 396 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
143 |
1/2✓ Branch 1 taken 396 times.
✗ Branch 2 not taken.
|
396 | bool result = DoTraverse(); |
144 | 396 | effective_history_depth_ = this->default_history_depth_; | |
145 | 396 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
146 | 396 | return result; | |
147 | } | ||
148 | |||
149 | /** | ||
150 | * Starts the traversal process at the catalog pointed to by the given hash | ||
151 | * but doesn't traverse into predecessor catalog revisions. This overrides the | ||
152 | * TraversalParameter settings provided at construction. | ||
153 | * | ||
154 | * @param root_catalog_hash the entry point into the catalog traversal | ||
155 | * @return true when catalogs were successfully traversed | ||
156 | */ | ||
157 | 80 | bool TraverseRevision(const shash::Any &root_catalog_hash, | |
158 | const TraversalType type = Base::kBreadthFirst) { | ||
159 | 80 | effective_history_depth_ = Parameters::kNoHistory; | |
160 | 80 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
161 | 80 | bool result = Traverse(root_catalog_hash, type); | |
162 | 80 | effective_history_depth_ = this->default_history_depth_; | |
163 | 80 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
164 | 80 | return result; | |
165 | } | ||
166 | |||
167 | protected: | ||
168 | 67460404 | static uint32_t hasher(const shash::Any &key) { | |
169 | // Don't start with the first bytes, because == is using them as well | ||
170 | return static_cast<uint32_t>( | ||
171 | 67460404 | *(reinterpret_cast<const uint32_t *>(key.digest) + 1)); | |
172 | } | ||
173 | |||
174 | 1840 | bool DoTraverse() { | |
175 | // Optimal number of threads is yet to be determined. The main event loop | ||
176 | // contains a spin-lock, so it should not be more than number of cores. | ||
177 | 1840 | threads_process_ = reinterpret_cast<pthread_t *>( | |
178 | 1840 | smalloc(sizeof(pthread_t) * num_threads_)); | |
179 |
2/2✓ Branch 0 taken 2400 times.
✓ Branch 1 taken 1840 times.
|
4240 | for (unsigned int i = 0; i < num_threads_; ++i) { |
180 | 2400 | int retval = pthread_create(&threads_process_[i], NULL, MainProcessQueue, | |
181 | this); | ||
182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2400 times.
|
2400 | if (retval != 0) |
183 | ✗ | PANIC(kLogStderr, "failed to create thread"); | |
184 | } | ||
185 | |||
186 |
2/2✓ Branch 0 taken 2400 times.
✓ Branch 1 taken 1840 times.
|
4240 | for (unsigned int i = 0; i < num_threads_; ++i) { |
187 | 2400 | int retval = pthread_join(threads_process_[i], NULL); | |
188 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2400 times.
|
2400 | assert(retval == 0); |
189 | } | ||
190 | 1840 | free(threads_process_); | |
191 | |||
192 |
2/2✓ Branch 1 taken 40 times.
✓ Branch 2 taken 1800 times.
|
1840 | if (atomic_read32(&num_errors_)) |
193 | 40 | return false; | |
194 | |||
195 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1800 times.
|
1800 | assert(catalogs_processing_.size() == 0); |
196 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1800 times.
|
1800 | assert(pre_job_queue_.IsEmpty()); |
197 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1800 times.
|
1800 | assert(post_job_queue_.IsEmpty()); |
198 | 1800 | return true; | |
199 | } | ||
200 | |||
201 | 2400 | static void *MainProcessQueue(void *data) { | |
202 | 2400 | CatalogTraversalParallel<ObjectFetcherT> *traversal = reinterpret_cast< | |
203 | CatalogTraversalParallel<ObjectFetcherT> *>(data); | ||
204 | CatalogJob *current_job; | ||
205 | while (true) { | ||
206 | 16078996 | current_job = traversal->post_job_queue_.TryPopFront(); | |
207 |
2/2✓ Branch 0 taken 1358268 times.
✓ Branch 1 taken 14724048 times.
|
16082316 | if (current_job != NULL) { |
208 | 1358268 | traversal->ProcessJobPost(current_job); | |
209 | } else { | ||
210 | 14724048 | current_job = traversal->pre_job_queue_.PopFront(); | |
211 | // NULL means the master thread tells us to finish | ||
212 |
2/2✓ Branch 1 taken 2400 times.
✓ Branch 2 taken 14722728 times.
|
14725368 | if (current_job->hash.IsNull()) { |
213 |
1/2✓ Branch 0 taken 2400 times.
✗ Branch 1 not taken.
|
2400 | delete current_job; |
214 | 2400 | break; | |
215 | } | ||
216 | 14722728 | traversal->ProcessJobPre(current_job); | |
217 | } | ||
218 | } | ||
219 | 2400 | return NULL; | |
220 | } | ||
221 | |||
222 | 1840 | void NotifyFinished() { | |
223 |
1/2✓ Branch 1 taken 1840 times.
✗ Branch 2 not taken.
|
1840 | shash::Any null_hash; |
224 | 1840 | null_hash.SetNull(); | |
225 |
2/2✓ Branch 0 taken 2400 times.
✓ Branch 1 taken 1840 times.
|
4240 | for (unsigned i = 0; i < num_threads_; ++i) { |
226 |
3/6✓ Branch 2 taken 2400 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2400 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2400 times.
✗ Branch 9 not taken.
|
2400 | CatalogJob *job = new CatalogJob("", null_hash, 0, 0); |
227 |
1/2✓ Branch 1 taken 2400 times.
✗ Branch 2 not taken.
|
2400 | pre_job_queue_.EnqueueFront(job); |
228 | } | ||
229 | 1840 | } | |
230 | |||
231 | 1444 | void PushJob(CatalogJob *job) { | |
232 | 1444 | MutexLockGuard m(&catalogs_lock_); | |
233 |
1/2✓ Branch 1 taken 1444 times.
✗ Branch 2 not taken.
|
1444 | PushJobUnlocked(job); |
234 | 1444 | } | |
235 | |||
236 | 14723208 | void PushJobUnlocked(CatalogJob *job) { | |
237 | 14723208 | catalogs_processing_.Insert(job->hash, job); | |
238 | 14723208 | pre_job_queue_.EnqueueFront(job); | |
239 | 14723208 | } | |
240 | |||
241 | 14722288 | void ProcessJobPre(CatalogJob *job) { | |
242 |
4/6✓ Branch 0 taken 14722288 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 14717968 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 40 times.
✓ Branch 6 taken 14717928 times.
|
14722288 | if (!this->PrepareCatalog(job)) { |
243 | 40 | atomic_inc32(&num_errors_); | |
244 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | NotifyFinished(); |
245 | 13361460 | return; | |
246 | } | ||
247 |
2/2✓ Branch 0 taken 132 times.
✓ Branch 1 taken 14717796 times.
|
14717928 | if (job->ignore) { |
248 |
1/2✓ Branch 1 taken 132 times.
✗ Branch 2 not taken.
|
132 | FinalizeJob(job); |
249 | 132 | return; | |
250 | } | ||
251 |
1/2✓ Branch 1 taken 14688196 times.
✗ Branch 2 not taken.
|
14717796 | NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs(); |
252 | unsigned int num_children; | ||
253 | // Ensure that pushed children won't call ProcessJobPost on this job | ||
254 | // before this function finishes | ||
255 | { | ||
256 | 14688196 | MutexLockGuard m(&catalogs_lock_); | |
257 |
2/2✓ Branch 0 taken 7377292 times.
✓ Branch 1 taken 7345704 times.
|
14722996 | if (effective_traversal_type_ == Base::kBreadthFirst) { |
258 |
1/2✓ Branch 1 taken 7377292 times.
✗ Branch 2 not taken.
|
7377292 | num_children = PushPreviousRevision(job) |
259 |
1/2✓ Branch 1 taken 7377292 times.
✗ Branch 2 not taken.
|
7377292 | + PushNestedCatalogs(job, catalog_list); |
260 | } else { | ||
261 |
1/2✓ Branch 1 taken 7345704 times.
✗ Branch 2 not taken.
|
7345704 | num_children = PushNestedCatalogs(job, catalog_list) |
262 |
1/2✓ Branch 1 taken 7345704 times.
✗ Branch 2 not taken.
|
7345704 | + PushPreviousRevision(job); |
263 | 7345704 | atomic_write32(&job->children_unprocessed, num_children); | |
264 | } | ||
265 |
3/6✓ Branch 0 taken 14722996 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 14722996 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 14722996 times.
|
14722996 | if (!this->CloseCatalog(false, job)) { |
266 | ✗ | atomic_inc32(&num_errors_); | |
267 | ✗ | NotifyFinished(); | |
268 | } | ||
269 | 14722996 | } | |
270 | |||
271 | // breadth-first: can post-process immediately | ||
272 | // depth-first: no children -> can post-process immediately | ||
273 |
4/4✓ Branch 0 taken 7345504 times.
✓ Branch 1 taken 7377252 times.
✓ Branch 2 taken 5987196 times.
✓ Branch 3 taken 1358308 times.
|
14722756 | if (effective_traversal_type_ == Base::kBreadthFirst || num_children == 0) { |
274 |
1/2✓ Branch 1 taken 13360368 times.
✗ Branch 2 not taken.
|
13364448 | ProcessJobPost(job); |
275 | 13360368 | return; | |
276 | } | ||
277 |
2/2✓ Branch 1 taken 1358308 times.
✓ Branch 2 taken 13361288 times.
|
14718676 | } |
278 | |||
279 | 14722996 | unsigned int PushNestedCatalogs(CatalogJob *job, | |
280 | const NestedCatalogList &catalog_list) { | ||
281 | 14722996 | typename NestedCatalogList::const_iterator i = catalog_list.begin(); | |
282 | 14722996 | typename NestedCatalogList::const_iterator iend = catalog_list.end(); | |
283 | 14722996 | unsigned int num_children = 0; | |
284 |
2/2✓ Branch 2 taken 14721168 times.
✓ Branch 3 taken 14722996 times.
|
29444164 | for (; i != iend; ++i) { |
285 |
7/8✓ Branch 0 taken 17488 times.
✓ Branch 1 taken 14703680 times.
✓ Branch 4 taken 17488 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2248 times.
✓ Branch 7 taken 15240 times.
✓ Branch 8 taken 2248 times.
✓ Branch 9 taken 14718920 times.
|
14721168 | if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) { |
286 | 2248 | continue; | |
287 | } | ||
288 | |||
289 | CatalogJob *child; | ||
290 | 29437840 | if (!this->no_repeat_history_ | |
291 |
7/8✓ Branch 0 taken 15240 times.
✓ Branch 1 taken 14703680 times.
✓ Branch 4 taken 15240 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 14920 times.
✓ Branch 7 taken 320 times.
✓ Branch 8 taken 14718600 times.
✓ Branch 9 taken 320 times.
|
14718920 | || !catalogs_processing_.Lookup(i->hash, &child)) { |
292 |
2/2✓ Branch 0 taken 14665480 times.
✓ Branch 1 taken 53120 times.
|
14718600 | CatalogTN *parent = (this->no_close_) ? job->catalog : NULL; |
293 |
1/2✓ Branch 2 taken 14718600 times.
✗ Branch 3 not taken.
|
29437200 | child = new CatalogJob(i->mountpoint.ToString(), |
294 |
1/2✓ Branch 2 taken 14718600 times.
✗ Branch 3 not taken.
|
14718600 | i->hash, |
295 | 14718600 | job->tree_level + 1, | |
296 |
1/2✓ Branch 1 taken 14718600 times.
✗ Branch 2 not taken.
|
14718600 | job->history_depth, |
297 | parent); | ||
298 |
1/2✓ Branch 1 taken 14718600 times.
✗ Branch 2 not taken.
|
14718600 | PushJobUnlocked(child); |
299 | } | ||
300 | |||
301 |
2/2✓ Branch 0 taken 7344888 times.
✓ Branch 1 taken 7374032 times.
|
14718920 | if (effective_traversal_type_ == Base::kDepthFirst) { |
302 |
1/2✓ Branch 1 taken 7344888 times.
✗ Branch 2 not taken.
|
7344888 | child->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
303 | this, job); | ||
304 | } | ||
305 | 14718920 | ++num_children; | |
306 | } | ||
307 | 14722996 | return num_children; | |
308 | } | ||
309 | |||
310 | /** | ||
311 | * Pushes the previous revision of a root catalog. | ||
312 | * @return the number of catalogs pushed on the processing stack | ||
313 | */ | ||
314 | 14722996 | unsigned int PushPreviousRevision(CatalogJob *job) { | |
315 | // only root catalogs are used for entering a previous revision (graph) | ||
316 |
2/2✓ Branch 1 taken 14718516 times.
✓ Branch 2 taken 4480 times.
|
14722996 | if (!job->catalog->IsRoot()) { |
317 | 14718516 | return 0; | |
318 | } | ||
319 | |||
320 |
1/2✓ Branch 1 taken 4480 times.
✗ Branch 2 not taken.
|
4480 | const shash::Any previous_revision = job->catalog->GetPreviousRevision(); |
321 |
2/2✓ Branch 1 taken 328 times.
✓ Branch 2 taken 4152 times.
|
4480 | if (previous_revision.IsNull()) { |
322 | 328 | return 0; | |
323 | } | ||
324 | |||
325 | // check if the next deeper history level is actually requested | ||
326 |
3/4✓ Branch 1 taken 4152 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2008 times.
✓ Branch 4 taken 2144 times.
|
4152 | if (this->IsBelowPruningThresholds(*job, effective_history_depth_, |
327 | effective_timestamp_threshold_)) { | ||
328 | 2008 | return 0; | |
329 | } | ||
330 | |||
331 | 4288 | if (this->no_repeat_history_ | |
332 |
5/8✓ Branch 0 taken 1024 times.
✓ Branch 1 taken 1120 times.
✓ Branch 3 taken 1024 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1024 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 2144 times.
|
2144 | && catalogs_done_.Contains(previous_revision)) { |
333 | ✗ | return 0; | |
334 | } | ||
335 | |||
336 | CatalogJob *prev_job; | ||
337 | 4288 | if (!this->no_repeat_history_ | |
338 |
5/8✓ Branch 0 taken 1024 times.
✓ Branch 1 taken 1120 times.
✓ Branch 3 taken 1024 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1024 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 2144 times.
✗ Branch 8 not taken.
|
2144 | || !catalogs_processing_.Lookup(previous_revision, &prev_job)) { |
339 |
2/4✓ Branch 2 taken 2144 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2144 times.
✗ Branch 6 not taken.
|
4288 | prev_job = new CatalogJob("", previous_revision, 0, |
340 |
1/2✓ Branch 1 taken 2144 times.
✗ Branch 2 not taken.
|
2144 | job->history_depth + 1); |
341 |
1/2✓ Branch 1 taken 2144 times.
✗ Branch 2 not taken.
|
2144 | PushJobUnlocked(prev_job); |
342 | } | ||
343 | |||
344 |
2/2✓ Branch 0 taken 600 times.
✓ Branch 1 taken 1544 times.
|
2144 | if (effective_traversal_type_ == Base::kDepthFirst) { |
345 |
1/2✓ Branch 1 taken 600 times.
✗ Branch 2 not taken.
|
600 | prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
346 | this, job); | ||
347 | } | ||
348 | 2144 | return 1; | |
349 | } | ||
350 | |||
351 | 14720956 | void ProcessJobPost(CatalogJob *job) { | |
352 | // Save time by keeping catalog open when suitable | ||
353 |
1/2✓ Branch 0 taken 14720956 times.
✗ Branch 1 not taken.
|
14720956 | if (job->catalog == NULL) { |
354 |
2/4✓ Branch 0 taken 14720956 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 14681716 times.
|
14720956 | if (!this->ReopenCatalog(job)) { |
355 | ✗ | atomic_inc32(&num_errors_); | |
356 | ✗ | NotifyFinished(); | |
357 | ✗ | return; | |
358 | } | ||
359 | } | ||
360 |
1/2✓ Branch 0 taken 14681716 times.
✗ Branch 1 not taken.
|
14681716 | if (serialize_callbacks_) { |
361 | 14681716 | MutexLockGuard m(&catalog_callback_lock_); | |
362 |
2/4✓ Branch 1 taken 14722996 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 14722996 times.
✗ Branch 5 not taken.
|
14722996 | this->NotifyListeners(job->GetCallbackData()); |
363 | 14722996 | } else { | |
364 | ✗ | this->NotifyListeners(job->GetCallbackData()); | |
365 | } | ||
366 |
2/2✓ Branch 0 taken 57316 times.
✓ Branch 1 taken 14665680 times.
|
14722996 | if (!this->no_close_) { |
367 |
2/4✓ Branch 0 taken 57316 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 57316 times.
|
57316 | if (!this->CloseCatalog(true, job)) { |
368 | ✗ | atomic_inc32(&num_errors_); | |
369 | ✗ | NotifyFinished(); | |
370 | ✗ | return; | |
371 | } | ||
372 | } | ||
373 | 14722996 | FinalizeJob(job); | |
374 | } | ||
375 | |||
376 | 14722968 | void FinalizeJob(CatalogJob *job) { | |
377 | { | ||
378 | 14722968 | MutexLockGuard m(&catalogs_lock_); | |
379 |
1/2✓ Branch 1 taken 14723128 times.
✗ Branch 2 not taken.
|
14723128 | catalogs_processing_.Erase(job->hash); |
380 |
1/2✓ Branch 1 taken 14723128 times.
✗ Branch 2 not taken.
|
14723128 | catalogs_done_.Insert(job->hash, true); |
381 | // No more catalogs to process -> finish | ||
382 |
1/2✓ Branch 2 taken 1800 times.
✗ Branch 3 not taken.
|
14724928 | if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty() |
383 |
5/6✓ Branch 0 taken 1800 times.
✓ Branch 1 taken 14721328 times.
✓ Branch 3 taken 1800 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1800 times.
✓ Branch 6 taken 14721328 times.
|
14724928 | && post_job_queue_.IsEmpty()) { |
384 |
1/2✓ Branch 1 taken 1800 times.
✗ Branch 2 not taken.
|
1800 | NotifyFinished(); |
385 | } | ||
386 | 14723128 | } | |
387 |
2/2✓ Branch 0 taken 7345632 times.
✓ Branch 1 taken 7377336 times.
|
14722968 | if (effective_traversal_type_ == Base::kDepthFirst) { |
388 | 7345632 | job->WakeParents(); | |
389 | } | ||
390 |
2/2✓ Branch 0 taken 14721448 times.
✓ Branch 1 taken 280 times.
|
14721728 | delete job; |
391 | 14719568 | } | |
392 | |||
393 | 7344608 | void OnChildFinished(const int &a, CatalogJob *job) { | |
394 | // atomic_xadd32 returns value before subtraction -> needs to equal 1 | ||
395 |
2/2✓ Branch 1 taken 1358308 times.
✓ Branch 2 taken 5987020 times.
|
7344608 | if (atomic_xadd32(&job->children_unprocessed, -1) == 1) { |
396 | 1358308 | post_job_queue_.EnqueueFront(job); | |
397 | } | ||
398 | 7345328 | } | |
399 | |||
400 | unsigned int num_threads_; | ||
401 | bool serialize_callbacks_; | ||
402 | |||
403 | uint64_t effective_history_depth_; | ||
404 | time_t effective_timestamp_threshold_; | ||
405 | TraversalType effective_traversal_type_; | ||
406 | |||
407 | pthread_t *threads_process_; | ||
408 | atomic_int32 num_errors_; | ||
409 | |||
410 | Tube<CatalogJob> pre_job_queue_; | ||
411 | Tube<CatalogJob> post_job_queue_; | ||
412 | SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_; | ||
413 | SmallHashDynamic<shash::Any, bool> catalogs_done_; | ||
414 | pthread_mutex_t catalogs_lock_; | ||
415 | |||
416 | pthread_mutex_t catalog_callback_lock_; | ||
417 | }; | ||
418 | |||
419 | } // namespace swissknife | ||
420 | |||
421 | #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
422 |