Directory: | cvmfs/ |
---|---|
File: | cvmfs/catalog_traversal_parallel.h |
Date: | 2025-06-29 02:35:41 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 187 | 199 | 94.0% |
Branches: | 159 | 244 | 65.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
6 | #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
7 | |||
8 | #include <stack> | ||
9 | #include <string> | ||
10 | #include <vector> | ||
11 | |||
12 | #include "catalog_traversal.h" | ||
13 | #include "util/atomic.h" | ||
14 | #include "util/exception.h" | ||
15 | #include "util/tube.h" | ||
16 | |||
17 | namespace swissknife { | ||
18 | |||
19 | /** | ||
20 | * This class implements the same functionality as CatalogTraversal, but in | ||
21 | * parallel. For common functionality, see the documentation of | ||
22 | * CatalogTraversal. Differences: | ||
23 | * - can choose number of threads | ||
24 | * - traversal types change meaning: | ||
25 | * - depth-first -> parallelized post-order traversal (parents are processed | ||
26 | * after all children are finished) | ||
27 | * - breadth-first -> same as original, but parallelized | ||
28 | */ | ||
29 | template<class ObjectFetcherT> | ||
30 | class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> { | ||
31 | public: | ||
32 | typedef CatalogTraversalBase<ObjectFetcherT> Base; | ||
33 | typedef ObjectFetcherT ObjectFetcherTN; | ||
34 | typedef typename ObjectFetcherT::CatalogTN CatalogTN; | ||
35 | typedef typename ObjectFetcherT::HistoryTN HistoryTN; | ||
36 | typedef CatalogTraversalData<CatalogTN> CallbackDataTN; | ||
37 | typedef typename CatalogTN::NestedCatalogList NestedCatalogList; | ||
38 | typedef typename Base::Parameters Parameters; | ||
39 | typedef typename Base::TraversalType TraversalType; | ||
40 | typedef std::vector<shash::Any> HashList; | ||
41 | |||
42 | 1467 | explicit CatalogTraversalParallel(const Parameters ¶ms) | |
43 | : CatalogTraversalBase<ObjectFetcherT>(params) | ||
44 | 1467 | , num_threads_(params.num_threads) | |
45 |
4/8✓ Branch 2 taken 1467 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1467 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1467 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1467 times.
✗ Branch 12 not taken.
|
1467 | , serialize_callbacks_(params.serialize_callbacks) { |
46 | 1467 | atomic_init32(&num_errors_); | |
47 |
1/2✓ Branch 1 taken 1467 times.
✗ Branch 2 not taken.
|
1467 | shash::Any null_hash; |
48 | 1467 | null_hash.SetNull(); | |
49 |
1/2✓ Branch 1 taken 1467 times.
✗ Branch 2 not taken.
|
1467 | catalogs_processing_.Init(1024, null_hash, hasher); |
50 |
1/2✓ Branch 1 taken 1467 times.
✗ Branch 2 not taken.
|
1467 | catalogs_done_.Init(1024, null_hash, hasher); |
51 | 1467 | pthread_mutex_init(&catalog_callback_lock_, NULL); | |
52 | 1467 | pthread_mutex_init(&catalogs_lock_, NULL); | |
53 | 1467 | effective_history_depth_ = this->default_history_depth_; | |
54 | 1467 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
55 | 1467 | } | |
56 | |||
57 | protected: | ||
58 | struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob, | ||
59 | public Observable<int> { | ||
60 | 4801557 | explicit CatalogJob(const std::string &path, | |
61 | const shash::Any &hash, | ||
62 | const unsigned tree_level, | ||
63 | const uint64_t history_depth, | ||
64 | CatalogTN *parent = NULL) | ||
65 | : CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level, | ||
66 | 4801557 | history_depth, parent) { | |
67 | 4801557 | atomic_init32(&children_unprocessed); | |
68 | 4801557 | } | |
69 | |||
70 |
1/2✓ Branch 1 taken 2390149 times.
✗ Branch 2 not taken.
|
2390344 | void WakeParents() { this->NotifyListeners(0); } |
71 | |||
72 | atomic_int32 children_unprocessed; | ||
73 | }; | ||
74 | |||
75 | public: | ||
76 | /** | ||
77 | * Starts the traversal process. | ||
78 | * After calling this methods CatalogTraversal will go through all catalogs | ||
79 | * and call the registered callback methods for each found catalog. | ||
80 | * If something goes wrong in the process, the traversal will be cancelled. | ||
81 | * | ||
82 | * @return true, when all catalogs were successfully processed. On | ||
83 | * failure the traversal is cancelled and false is returned. | ||
84 | */ | ||
85 | 1213 | bool Traverse(const TraversalType type = Base::kBreadthFirst) { | |
86 |
1/2✓ Branch 1 taken 1213 times.
✗ Branch 2 not taken.
|
1213 | const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash(); |
87 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1213 times.
|
1213 | if (root_catalog_hash.IsNull()) { |
88 | ✗ | return false; | |
89 | } | ||
90 |
1/2✓ Branch 1 taken 1213 times.
✗ Branch 2 not taken.
|
1213 | return Traverse(root_catalog_hash, type); |
91 | } | ||
92 | |||
93 | /** | ||
94 | * Starts the traversal process at the catalog pointed to by the given hash | ||
95 | * | ||
96 | * @param root_catalog_hash the entry point into the catalog traversal | ||
97 | * @return true when catalogs were successfully traversed | ||
98 | */ | ||
99 | 1408 | bool Traverse(const shash::Any &root_catalog_hash, | |
100 | const TraversalType type = Base::kBreadthFirst) { | ||
101 | // add the root catalog of the repository as the first element on the job | ||
102 | // stack | ||
103 | 2816 | if (this->no_repeat_history_ | |
104 |
4/6✓ Branch 0 taken 1122 times.
✓ Branch 1 taken 286 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1122 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1408 times.
|
1408 | && catalogs_done_.Contains(root_catalog_hash)) { |
105 | ✗ | return true; | |
106 | } | ||
107 | 1408 | effective_traversal_type_ = type; | |
108 |
3/6✓ Branch 2 taken 1408 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1408 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1408 times.
✗ Branch 9 not taken.
|
1408 | CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0); |
109 | 1408 | PushJob(root_job); | |
110 | 1408 | return DoTraverse(); | |
111 | } | ||
112 | |||
113 | /** | ||
114 | * Start the traversal process from a list of root catalogs. Same as | ||
115 | * TraverseRevision function, TraverseList does not traverse into predecessor | ||
116 | * catalog revisions and ignores TraversalParameter settings. | ||
117 | */ | ||
118 | 2023 | bool TraverseList(const HashList &root_catalog_list, | |
119 | const TraversalType type = Base::kBreadthFirst) { | ||
120 | // Push in reverse order for CatalogTraversal-like behavior | ||
121 | 2023 | HashList::const_reverse_iterator i = root_catalog_list.rbegin(); | |
122 | 2023 | const HashList::const_reverse_iterator iend = root_catalog_list.rend(); | |
123 | 2023 | bool has_pushed = false; | |
124 | { | ||
125 | 2023 | MutexLockGuard m(&catalogs_lock_); | |
126 |
3/4✓ Branch 2 taken 5930 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3907 times.
✓ Branch 5 taken 2023 times.
|
5930 | for (; i != iend; ++i) { |
127 |
7/8✓ Branch 0 taken 3790 times.
✓ Branch 1 taken 117 times.
✓ Branch 4 taken 3790 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1564 times.
✓ Branch 7 taken 2226 times.
✓ Branch 8 taken 1564 times.
✓ Branch 9 taken 2343 times.
|
3907 | if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) { |
128 | 1564 | continue; | |
129 | } | ||
130 | |||
131 |
3/6✓ Branch 2 taken 2343 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2343 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2343 times.
✗ Branch 10 not taken.
|
2343 | CatalogJob *root_job = new CatalogJob("", *i, 0, 0); |
132 |
1/2✓ Branch 1 taken 2343 times.
✗ Branch 2 not taken.
|
2343 | PushJobUnlocked(root_job); |
133 | 2343 | has_pushed = true; | |
134 | } | ||
135 | 2023 | } | |
136 | // noop: no catalogs to traverse | ||
137 |
2/2✓ Branch 0 taken 598 times.
✓ Branch 1 taken 1425 times.
|
2023 | if (!has_pushed) { |
138 | 598 | return true; | |
139 | } | ||
140 | 1425 | effective_traversal_type_ = type; | |
141 | 1425 | effective_history_depth_ = Parameters::kNoHistory; | |
142 | 1425 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
143 |
1/2✓ Branch 1 taken 1425 times.
✗ Branch 2 not taken.
|
1425 | bool result = DoTraverse(); |
144 | 1425 | effective_history_depth_ = this->default_history_depth_; | |
145 | 1425 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
146 | 1425 | return result; | |
147 | } | ||
148 | |||
149 | /** | ||
150 | * Starts the traversal process at the catalog pointed to by the given hash | ||
151 | * but doesn't traverse into predecessor catalog revisions. This overrides the | ||
152 | * TraversalParameter settings provided at construction. | ||
153 | * | ||
154 | * @param root_catalog_hash the entry point into the catalog traversal | ||
155 | * @return true when catalogs were successfully traversed | ||
156 | */ | ||
157 | 26 | bool TraverseRevision(const shash::Any &root_catalog_hash, | |
158 | const TraversalType type = Base::kBreadthFirst) { | ||
159 | 26 | effective_history_depth_ = Parameters::kNoHistory; | |
160 | 26 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
161 | 26 | bool result = Traverse(root_catalog_hash, type); | |
162 | 26 | effective_history_depth_ = this->default_history_depth_; | |
163 | 26 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
164 | 26 | return result; | |
165 | } | ||
166 | |||
167 | protected: | ||
168 | 21887441 | static uint32_t hasher(const shash::Any &key) { | |
169 | // Don't start with the first bytes, because == is using them as well | ||
170 | return static_cast<uint32_t>( | ||
171 | 21887441 | *(reinterpret_cast<const uint32_t *>(key.digest) + 1)); | |
172 | } | ||
173 | |||
174 | 2833 | bool DoTraverse() { | |
175 | // Optimal number of threads is yet to be determined. The main event loop | ||
176 | // contains a spin-lock, so it should not be more than number of cores. | ||
177 | 2833 | threads_process_ = reinterpret_cast<pthread_t *>( | |
178 | 2833 | smalloc(sizeof(pthread_t) * num_threads_)); | |
179 |
2/2✓ Branch 0 taken 3015 times.
✓ Branch 1 taken 2833 times.
|
5848 | for (unsigned int i = 0; i < num_threads_; ++i) { |
180 | 3015 | int retval = pthread_create(&threads_process_[i], NULL, MainProcessQueue, | |
181 | this); | ||
182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3015 times.
|
3015 | if (retval != 0) |
183 | ✗ | PANIC(kLogStderr, "failed to create thread"); | |
184 | } | ||
185 | |||
186 |
2/2✓ Branch 0 taken 3015 times.
✓ Branch 1 taken 2833 times.
|
5848 | for (unsigned int i = 0; i < num_threads_; ++i) { |
187 | 3015 | int retval = pthread_join(threads_process_[i], NULL); | |
188 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3015 times.
|
3015 | assert(retval == 0); |
189 | } | ||
190 | 2833 | free(threads_process_); | |
191 | |||
192 |
2/2✓ Branch 1 taken 13 times.
✓ Branch 2 taken 2820 times.
|
2833 | if (atomic_read32(&num_errors_)) |
193 | 13 | return false; | |
194 | |||
195 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2820 times.
|
2820 | assert(catalogs_processing_.size() == 0); |
196 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2820 times.
|
2820 | assert(pre_job_queue_.IsEmpty()); |
197 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2820 times.
|
2820 | assert(post_job_queue_.IsEmpty()); |
198 | 2820 | return true; | |
199 | } | ||
200 | |||
201 | 3015 | static void *MainProcessQueue(void *data) { | |
202 | 3015 | CatalogTraversalParallel<ObjectFetcherT> *traversal = reinterpret_cast< | |
203 | CatalogTraversalParallel<ObjectFetcherT> *>(data); | ||
204 | CatalogJob *current_job; | ||
205 | while (true) { | ||
206 | 5242797 | current_job = traversal->post_job_queue_.TryPopFront(); | |
207 |
2/2✓ Branch 0 taken 442657 times.
✓ Branch 1 taken 4801323 times.
|
5243980 | if (current_job != NULL) { |
208 | 442657 | traversal->ProcessJobPost(current_job); | |
209 | } else { | ||
210 | 4801323 | current_job = traversal->pre_job_queue_.PopFront(); | |
211 | // NULL means the master thread tells us to finish | ||
212 |
2/2✓ Branch 1 taken 3015 times.
✓ Branch 2 taken 4798438 times.
|
4801505 | if (current_job->hash.IsNull()) { |
213 |
1/2✓ Branch 0 taken 3015 times.
✗ Branch 1 not taken.
|
3015 | delete current_job; |
214 | 3015 | break; | |
215 | } | ||
216 | 4798438 | traversal->ProcessJobPre(current_job); | |
217 | } | ||
218 | } | ||
219 | 3015 | return NULL; | |
220 | } | ||
221 | |||
222 | 2833 | void NotifyFinished() { | |
223 |
1/2✓ Branch 1 taken 2833 times.
✗ Branch 2 not taken.
|
2833 | shash::Any null_hash; |
224 | 2833 | null_hash.SetNull(); | |
225 |
2/2✓ Branch 0 taken 3015 times.
✓ Branch 1 taken 2833 times.
|
5848 | for (unsigned i = 0; i < num_threads_; ++i) { |
226 |
3/6✓ Branch 2 taken 3015 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3015 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 3015 times.
✗ Branch 9 not taken.
|
3015 | CatalogJob *job = new CatalogJob("", null_hash, 0, 0); |
227 |
1/2✓ Branch 1 taken 3015 times.
✗ Branch 2 not taken.
|
3015 | pre_job_queue_.EnqueueFront(job); |
228 | } | ||
229 | 2833 | } | |
230 | |||
231 | 1408 | void PushJob(CatalogJob *job) { | |
232 | 1408 | MutexLockGuard m(&catalogs_lock_); | |
233 |
1/2✓ Branch 1 taken 1408 times.
✗ Branch 2 not taken.
|
1408 | PushJobUnlocked(job); |
234 | 1408 | } | |
235 | |||
236 | 4798542 | void PushJobUnlocked(CatalogJob *job) { | |
237 | 4798542 | catalogs_processing_.Insert(job->hash, job); | |
238 | 4798542 | pre_job_queue_.EnqueueFront(job); | |
239 | 4798542 | } | |
240 | |||
241 | 4798373 | void ProcessJobPre(CatalogJob *job) { | |
242 |
4/6✓ Branch 0 taken 4798373 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 4798061 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 13 times.
✓ Branch 6 taken 4798048 times.
|
4798373 | if (!this->PrepareCatalog(job)) { |
243 | 13 | atomic_inc32(&num_errors_); | |
244 |
1/2✓ Branch 1 taken 13 times.
✗ Branch 2 not taken.
|
13 | NotifyFinished(); |
245 | 4354754 | return; | |
246 | } | ||
247 |
2/2✓ Branch 0 taken 177 times.
✓ Branch 1 taken 4797871 times.
|
4798048 | if (job->ignore) { |
248 |
1/2✓ Branch 1 taken 177 times.
✗ Branch 2 not taken.
|
177 | FinalizeJob(job); |
249 | 177 | return; | |
250 | } | ||
251 |
1/2✓ Branch 1 taken 4785833 times.
✗ Branch 2 not taken.
|
4797871 | NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs(); |
252 | unsigned int num_children; | ||
253 | // Ensure that pushed children won't call ProcessJobPost on this job | ||
254 | // before this function finishes | ||
255 | { | ||
256 | 4785833 | MutexLockGuard m(&catalogs_lock_); | |
257 |
2/2✓ Branch 0 taken 2408035 times.
✓ Branch 1 taken 2390304 times.
|
4798339 | if (effective_traversal_type_ == Base::kBreadthFirst) { |
258 |
1/2✓ Branch 1 taken 2408035 times.
✗ Branch 2 not taken.
|
2408035 | num_children = PushPreviousRevision(job) |
259 |
1/2✓ Branch 1 taken 2408035 times.
✗ Branch 2 not taken.
|
2408035 | + PushNestedCatalogs(job, catalog_list); |
260 | } else { | ||
261 |
1/2✓ Branch 1 taken 2390304 times.
✗ Branch 2 not taken.
|
2390304 | num_children = PushNestedCatalogs(job, catalog_list) |
262 |
1/2✓ Branch 1 taken 2390304 times.
✗ Branch 2 not taken.
|
2390304 | + PushPreviousRevision(job); |
263 | 2390304 | atomic_write32(&job->children_unprocessed, num_children); | |
264 | } | ||
265 |
3/6✓ Branch 0 taken 4798339 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 4798339 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 4798339 times.
|
4798339 | if (!this->CloseCatalog(false, job)) { |
266 | ✗ | atomic_inc32(&num_errors_); | |
267 | ✗ | NotifyFinished(); | |
268 | } | ||
269 | 4798339 | } | |
270 | |||
271 | // breadth-first: can post-process immediately | ||
272 | // depth-first: no children -> can post-process immediately | ||
273 |
4/4✓ Branch 0 taken 2390239 times.
✓ Branch 1 taken 2408009 times.
✓ Branch 2 taken 1947582 times.
✓ Branch 3 taken 442657 times.
|
4798248 | if (effective_traversal_type_ == Base::kBreadthFirst || num_children == 0) { |
274 |
1/2✓ Branch 1 taken 4354772 times.
✗ Branch 2 not taken.
|
4355591 | ProcessJobPost(job); |
275 | 4354772 | return; | |
276 | } | ||
277 |
2/2✓ Branch 1 taken 442657 times.
✓ Branch 2 taken 4354564 times.
|
4797429 | } |
278 | |||
279 | 4798339 | unsigned int PushNestedCatalogs(CatalogJob *job, | |
280 | const NestedCatalogList &catalog_list) { | ||
281 | 4798339 | typename NestedCatalogList::const_iterator i = catalog_list.begin(); | |
282 | 4798339 | typename NestedCatalogList::const_iterator iend = catalog_list.end(); | |
283 | 4798339 | unsigned int num_children = 0; | |
284 |
2/2✓ Branch 2 taken 4794303 times.
✓ Branch 3 taken 4798339 times.
|
9592642 | for (; i != iend; ++i) { |
285 |
7/8✓ Branch 0 taken 15607 times.
✓ Branch 1 taken 4778696 times.
✓ Branch 4 taken 15607 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1267 times.
✓ Branch 7 taken 14340 times.
✓ Branch 8 taken 1267 times.
✓ Branch 9 taken 4793036 times.
|
4794303 | if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) { |
286 | 1267 | continue; | |
287 | } | ||
288 | |||
289 | CatalogJob *child; | ||
290 | 9586072 | if (!this->no_repeat_history_ | |
291 |
7/8✓ Branch 0 taken 14340 times.
✓ Branch 1 taken 4778696 times.
✓ Branch 4 taken 14340 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 14236 times.
✓ Branch 7 taken 104 times.
✓ Branch 8 taken 4792932 times.
✓ Branch 9 taken 104 times.
|
4793036 | || !catalogs_processing_.Lookup(i->hash, &child)) { |
292 |
2/2✓ Branch 0 taken 4766281 times.
✓ Branch 1 taken 26651 times.
|
4792932 | CatalogTN *parent = (this->no_close_) ? job->catalog : NULL; |
293 |
1/2✓ Branch 2 taken 4792932 times.
✗ Branch 3 not taken.
|
9585864 | child = new CatalogJob(i->mountpoint.ToString(), |
294 |
1/2✓ Branch 2 taken 4792932 times.
✗ Branch 3 not taken.
|
4792932 | i->hash, |
295 | 4792932 | job->tree_level + 1, | |
296 |
1/2✓ Branch 1 taken 4792932 times.
✗ Branch 2 not taken.
|
4792932 | job->history_depth, |
297 | parent); | ||
298 |
1/2✓ Branch 1 taken 4792932 times.
✗ Branch 2 not taken.
|
4792932 | PushJobUnlocked(child); |
299 | } | ||
300 | |||
301 |
2/2✓ Branch 0 taken 2388966 times.
✓ Branch 1 taken 2404070 times.
|
4793036 | if (effective_traversal_type_ == Base::kDepthFirst) { |
302 |
1/2✓ Branch 1 taken 2388966 times.
✗ Branch 2 not taken.
|
2388966 | child->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
303 | this, job); | ||
304 | } | ||
305 | 4793036 | ++num_children; | |
306 | } | ||
307 | 4798339 | return num_children; | |
308 | } | ||
309 | |||
310 | /** | ||
311 | * Pushes the previous revision of a root catalog. | ||
312 | * @return the number of catalogs pushed on the processing stack | ||
313 | */ | ||
314 | 4798339 | unsigned int PushPreviousRevision(CatalogJob *job) { | |
315 | // only root catalogs are used for entering a previous revision (graph) | ||
316 |
2/2✓ Branch 1 taken 4792860 times.
✓ Branch 2 taken 5479 times.
|
4798339 | if (!job->catalog->IsRoot()) { |
317 | 4792860 | return 0; | |
318 | } | ||
319 | |||
320 |
1/2✓ Branch 1 taken 5479 times.
✗ Branch 2 not taken.
|
5479 | const shash::Any previous_revision = job->catalog->GetPreviousRevision(); |
321 |
2/2✓ Branch 1 taken 643 times.
✓ Branch 2 taken 4836 times.
|
5479 | if (previous_revision.IsNull()) { |
322 | 643 | return 0; | |
323 | } | ||
324 | |||
325 | // check if the next deeper history level is actually requested | ||
326 |
3/4✓ Branch 1 taken 4836 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2977 times.
✓ Branch 4 taken 1859 times.
|
4836 | if (this->IsBelowPruningThresholds(*job, effective_history_depth_, |
327 | effective_timestamp_threshold_)) { | ||
328 | 2977 | return 0; | |
329 | } | ||
330 | |||
331 | 3718 | if (this->no_repeat_history_ | |
332 |
5/8✓ Branch 0 taken 1495 times.
✓ Branch 1 taken 364 times.
✓ Branch 3 taken 1495 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1495 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 1859 times.
|
1859 | && catalogs_done_.Contains(previous_revision)) { |
333 | ✗ | return 0; | |
334 | } | ||
335 | |||
336 | CatalogJob *prev_job; | ||
337 | 3718 | if (!this->no_repeat_history_ | |
338 |
5/8✓ Branch 0 taken 1495 times.
✓ Branch 1 taken 364 times.
✓ Branch 3 taken 1495 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1495 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 1859 times.
✗ Branch 8 not taken.
|
1859 | || !catalogs_processing_.Lookup(previous_revision, &prev_job)) { |
339 |
2/4✓ Branch 2 taken 1859 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1859 times.
✗ Branch 6 not taken.
|
3718 | prev_job = new CatalogJob("", previous_revision, 0, |
340 |
1/2✓ Branch 1 taken 1859 times.
✗ Branch 2 not taken.
|
1859 | job->history_depth + 1); |
341 |
1/2✓ Branch 1 taken 1859 times.
✗ Branch 2 not taken.
|
1859 | PushJobUnlocked(prev_job); |
342 | } | ||
343 | |||
344 |
2/2✓ Branch 0 taken 195 times.
✓ Branch 1 taken 1664 times.
|
1859 | if (effective_traversal_type_ == Base::kDepthFirst) { |
345 |
1/2✓ Branch 1 taken 195 times.
✗ Branch 2 not taken.
|
195 | prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
346 | this, job); | ||
347 | } | ||
348 | 1859 | return 1; | |
349 | } | ||
350 | |||
351 | 4797598 | void ProcessJobPost(CatalogJob *job) { | |
352 | // Save time by keeping catalog open when suitable | ||
353 |
1/2✓ Branch 0 taken 4797637 times.
✗ Branch 1 not taken.
|
4797598 | if (job->catalog == NULL) { |
354 |
3/4✓ Branch 0 taken 4797637 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 546 times.
✓ Branch 4 taken 4783090 times.
|
4797637 | if (!this->ReopenCatalog(job)) { |
355 | 546 | atomic_inc32(&num_errors_); | |
356 | ✗ | NotifyFinished(); | |
357 | ✗ | return; | |
358 | } | ||
359 | } | ||
360 |
1/2✓ Branch 0 taken 4783051 times.
✗ Branch 1 not taken.
|
4783051 | if (serialize_callbacks_) { |
361 | 4783051 | MutexLockGuard m(&catalog_callback_lock_); | |
362 |
2/4✓ Branch 1 taken 4798339 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4798339 times.
✗ Branch 5 not taken.
|
4798339 | this->NotifyListeners(job->GetCallbackData()); |
363 | 4798339 | } else { | |
364 | ✗ | this->NotifyListeners(job->GetCallbackData()); | |
365 | } | ||
366 |
2/2✓ Branch 0 taken 31993 times.
✓ Branch 1 taken 4766320 times.
|
4798313 | if (!this->no_close_) { |
367 |
2/4✓ Branch 0 taken 31993 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 31993 times.
|
31993 | if (!this->CloseCatalog(true, job)) { |
368 | ✗ | atomic_inc32(&num_errors_); | |
369 | ✗ | NotifyFinished(); | |
370 | ✗ | return; | |
371 | } | ||
372 | } | ||
373 | 4798313 | FinalizeJob(job); | |
374 | } | ||
375 | |||
376 | 4798451 | void FinalizeJob(CatalogJob *job) { | |
377 | { | ||
378 | 4798451 | MutexLockGuard m(&catalogs_lock_); | |
379 |
1/2✓ Branch 1 taken 4798516 times.
✗ Branch 2 not taken.
|
4798516 | catalogs_processing_.Erase(job->hash); |
380 |
1/2✓ Branch 1 taken 4798516 times.
✗ Branch 2 not taken.
|
4798516 | catalogs_done_.Insert(job->hash, true); |
381 | // No more catalogs to process -> finish | ||
382 |
1/2✓ Branch 2 taken 2820 times.
✗ Branch 3 not taken.
|
4801336 | if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty() |
383 |
5/6✓ Branch 0 taken 2820 times.
✓ Branch 1 taken 4795696 times.
✓ Branch 3 taken 2820 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2820 times.
✓ Branch 6 taken 4795696 times.
|
4801336 | && post_job_queue_.IsEmpty()) { |
384 |
1/2✓ Branch 1 taken 2820 times.
✗ Branch 2 not taken.
|
2820 | NotifyFinished(); |
385 | } | ||
386 | 4798516 | } | |
387 |
2/2✓ Branch 0 taken 2390331 times.
✓ Branch 1 taken 2408068 times.
|
4798399 | if (effective_traversal_type_ == Base::kDepthFirst) { |
388 | 2390331 | job->WakeParents(); | |
389 | } | ||
390 |
1/2✓ Branch 0 taken 4798139 times.
✗ Branch 1 not taken.
|
4798139 | delete job; |
391 | 4797775 | } | |
392 | |||
393 | 2388654 | void OnChildFinished(const int &a, CatalogJob *job) { | |
394 | // atomic_xadd32 returns value before subtraction -> needs to equal 1 | ||
395 |
2/2✓ Branch 1 taken 442644 times.
✓ Branch 2 taken 1946387 times.
|
2388654 | if (atomic_xadd32(&job->children_unprocessed, -1) == 1) { |
396 | 442644 | post_job_queue_.EnqueueFront(job); | |
397 | } | ||
398 | 2389044 | } | |
399 | |||
400 | unsigned int num_threads_; | ||
401 | bool serialize_callbacks_; | ||
402 | |||
403 | uint64_t effective_history_depth_; | ||
404 | time_t effective_timestamp_threshold_; | ||
405 | TraversalType effective_traversal_type_; | ||
406 | |||
407 | pthread_t *threads_process_; | ||
408 | atomic_int32 num_errors_; | ||
409 | |||
410 | Tube<CatalogJob> pre_job_queue_; | ||
411 | Tube<CatalogJob> post_job_queue_; | ||
412 | SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_; | ||
413 | SmallHashDynamic<shash::Any, bool> catalogs_done_; | ||
414 | pthread_mutex_t catalogs_lock_; | ||
415 | |||
416 | pthread_mutex_t catalog_callback_lock_; | ||
417 | }; | ||
418 | |||
419 | } // namespace swissknife | ||
420 | |||
421 | #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
422 |