Directory: | cvmfs/ |
---|---|
File: | cvmfs/catalog_traversal_parallel.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 187 | 199 | 94.0% |
Branches: | 159 | 244 | 65.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
6 | #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
7 | |||
8 | #include <stack> | ||
9 | #include <string> | ||
10 | #include <vector> | ||
11 | |||
12 | #include "catalog_traversal.h" | ||
13 | #include "util/atomic.h" | ||
14 | #include "util/exception.h" | ||
15 | #include "util/tube.h" | ||
16 | |||
17 | namespace swissknife { | ||
18 | |||
19 | /** | ||
20 | * This class implements the same functionality as CatalogTraversal, but in | ||
21 | * parallel. For common functionality, see the documentation of | ||
22 | * CatalogTraversal. Differences: | ||
23 | * - can choose number of threads | ||
24 | * - traversal types change meaning: | ||
25 | * - depth-first -> parallelized post-order traversal (parents are processed | ||
26 | * after all children are finished) | ||
27 | * - breadth-first -> same as original, but parallelized | ||
28 | */ | ||
29 | template<class ObjectFetcherT> | ||
30 | class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> { | ||
31 | public: | ||
32 | typedef CatalogTraversalBase<ObjectFetcherT> Base; | ||
33 | typedef ObjectFetcherT ObjectFetcherTN; | ||
34 | typedef typename ObjectFetcherT::CatalogTN CatalogTN; | ||
35 | typedef typename ObjectFetcherT::HistoryTN HistoryTN; | ||
36 | typedef CatalogTraversalData<CatalogTN> CallbackDataTN; | ||
37 | typedef typename CatalogTN::NestedCatalogList NestedCatalogList; | ||
38 | typedef typename Base::Parameters Parameters; | ||
39 | typedef typename Base::TraversalType TraversalType; | ||
40 | typedef std::vector<shash::Any> HashList; | ||
41 | |||
42 | 1759 | explicit CatalogTraversalParallel(const Parameters ¶ms) | |
43 | : CatalogTraversalBase<ObjectFetcherT>(params) | ||
44 | 1759 | , num_threads_(params.num_threads) | |
45 |
4/8✓ Branch 2 taken 1759 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1759 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1759 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 1759 times.
✗ Branch 12 not taken.
|
1759 | , serialize_callbacks_(params.serialize_callbacks) { |
46 | 1759 | atomic_init32(&num_errors_); | |
47 |
1/2✓ Branch 1 taken 1759 times.
✗ Branch 2 not taken.
|
1759 | shash::Any null_hash; |
48 | 1759 | null_hash.SetNull(); | |
49 |
1/2✓ Branch 1 taken 1759 times.
✗ Branch 2 not taken.
|
1759 | catalogs_processing_.Init(1024, null_hash, hasher); |
50 |
1/2✓ Branch 1 taken 1759 times.
✗ Branch 2 not taken.
|
1759 | catalogs_done_.Init(1024, null_hash, hasher); |
51 | 1759 | pthread_mutex_init(&catalog_callback_lock_, NULL); | |
52 | 1759 | pthread_mutex_init(&catalogs_lock_, NULL); | |
53 | 1759 | effective_history_depth_ = this->default_history_depth_; | |
54 | 1759 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
55 | 1759 | } | |
56 | |||
57 | protected: | ||
58 | struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob, | ||
59 | public Observable<int> { | ||
60 | 18037849 | explicit CatalogJob(const std::string &path, | |
61 | const shash::Any &hash, | ||
62 | const unsigned tree_level, | ||
63 | const uint64_t history_depth, | ||
64 | CatalogTN *parent = NULL) | ||
65 | : CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level, | ||
66 | 18037849 | history_depth, parent) { | |
67 | 18037849 | atomic_init32(&children_unprocessed); | |
68 | 18037849 | } | |
69 | |||
70 |
1/2✓ Branch 1 taken 8997565 times.
✗ Branch 2 not taken.
|
8998202 | void WakeParents() { this->NotifyListeners(0); } |
71 | |||
72 | atomic_int32 children_unprocessed; | ||
73 | }; | ||
74 | |||
75 | public: | ||
76 | /** | ||
77 | * Starts the traversal process. | ||
78 | * After calling this methods CatalogTraversal will go through all catalogs | ||
79 | * and call the registered callback methods for each found catalog. | ||
80 | * If something goes wrong in the process, the traversal will be cancelled. | ||
81 | * | ||
82 | * @return true, when all catalogs were successfully processed. On | ||
83 | * failure the traversal is cancelled and false is returned. | ||
84 | */ | ||
85 | 973 | bool Traverse(const TraversalType type = Base::kBreadthFirst) { | |
86 |
1/2✓ Branch 1 taken 973 times.
✗ Branch 2 not taken.
|
973 | const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash(); |
87 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 973 times.
|
973 | if (root_catalog_hash.IsNull()) { |
88 | ✗ | return false; | |
89 | } | ||
90 |
1/2✓ Branch 1 taken 973 times.
✗ Branch 2 not taken.
|
973 | return Traverse(root_catalog_hash, type); |
91 | } | ||
92 | |||
93 | /** | ||
94 | * Starts the traversal process at the catalog pointed to by the given hash | ||
95 | * | ||
96 | * @param root_catalog_hash the entry point into the catalog traversal | ||
97 | * @return true when catalogs were successfully traversed | ||
98 | */ | ||
99 | 1708 | bool Traverse(const shash::Any &root_catalog_hash, | |
100 | const TraversalType type = Base::kBreadthFirst) { | ||
101 | // add the root catalog of the repository as the first element on the job | ||
102 | // stack | ||
103 | 3416 | if (this->no_repeat_history_ | |
104 |
4/6✓ Branch 0 taken 630 times.
✓ Branch 1 taken 1078 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 630 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1708 times.
|
1708 | && catalogs_done_.Contains(root_catalog_hash)) { |
105 | ✗ | return true; | |
106 | } | ||
107 | 1708 | effective_traversal_type_ = type; | |
108 |
3/6✓ Branch 2 taken 1708 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1708 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1708 times.
✗ Branch 9 not taken.
|
1708 | CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0); |
109 | 1708 | PushJob(root_job); | |
110 | 1708 | return DoTraverse(); | |
111 | } | ||
112 | |||
113 | /** | ||
114 | * Start the traversal process from a list of root catalogs. Same as | ||
115 | * TraverseRevision function, TraverseList does not traverse into predecessor | ||
116 | * catalog revisions and ignores TraversalParameter settings. | ||
117 | */ | ||
118 | 427 | bool TraverseList(const HashList &root_catalog_list, | |
119 | const TraversalType type = Base::kBreadthFirst) { | ||
120 | // Push in reverse order for CatalogTraversal-like behavior | ||
121 | 427 | HashList::const_reverse_iterator i = root_catalog_list.rbegin(); | |
122 | 427 | const HashList::const_reverse_iterator iend = root_catalog_list.rend(); | |
123 | 427 | bool has_pushed = false; | |
124 | { | ||
125 | 427 | MutexLockGuard m(&catalogs_lock_); | |
126 |
3/4✓ Branch 2 taken 1614 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1187 times.
✓ Branch 5 taken 427 times.
|
1614 | for (; i != iend; ++i) { |
127 |
7/8✓ Branch 0 taken 746 times.
✓ Branch 1 taken 441 times.
✓ Branch 4 taken 746 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 68 times.
✓ Branch 7 taken 678 times.
✓ Branch 8 taken 68 times.
✓ Branch 9 taken 1119 times.
|
1187 | if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) { |
128 | 68 | continue; | |
129 | } | ||
130 | |||
131 |
3/6✓ Branch 2 taken 1119 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1119 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1119 times.
✗ Branch 10 not taken.
|
1119 | CatalogJob *root_job = new CatalogJob("", *i, 0, 0); |
132 |
1/2✓ Branch 1 taken 1119 times.
✗ Branch 2 not taken.
|
1119 | PushJobUnlocked(root_job); |
133 | 1119 | has_pushed = true; | |
134 | } | ||
135 | 427 | } | |
136 | // noop: no catalogs to traverse | ||
137 |
2/2✓ Branch 0 taken 26 times.
✓ Branch 1 taken 401 times.
|
427 | if (!has_pushed) { |
138 | 26 | return true; | |
139 | } | ||
140 | 401 | effective_traversal_type_ = type; | |
141 | 401 | effective_history_depth_ = Parameters::kNoHistory; | |
142 | 401 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
143 |
1/2✓ Branch 1 taken 401 times.
✗ Branch 2 not taken.
|
401 | bool result = DoTraverse(); |
144 | 401 | effective_history_depth_ = this->default_history_depth_; | |
145 | 401 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
146 | 401 | return result; | |
147 | } | ||
148 | |||
149 | /** | ||
150 | * Starts the traversal process at the catalog pointed to by the given hash | ||
151 | * but doesn't traverse into predecessor catalog revisions. This overrides the | ||
152 | * TraversalParameter settings provided at construction. | ||
153 | * | ||
154 | * @param root_catalog_hash the entry point into the catalog traversal | ||
155 | * @return true when catalogs were successfully traversed | ||
156 | */ | ||
157 | 98 | bool TraverseRevision(const shash::Any &root_catalog_hash, | |
158 | const TraversalType type = Base::kBreadthFirst) { | ||
159 | 98 | effective_history_depth_ = Parameters::kNoHistory; | |
160 | 98 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
161 | 98 | bool result = Traverse(root_catalog_hash, type); | |
162 | 98 | effective_history_depth_ = this->default_history_depth_; | |
163 | 98 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
164 | 98 | return result; | |
165 | } | ||
166 | |||
167 | protected: | ||
168 | 82451997 | static uint32_t hasher(const shash::Any &key) { | |
169 | // Don't start with the first bytes, because == is using them as well | ||
170 | 82451997 | return (uint32_t) * (reinterpret_cast<const uint32_t *>(key.digest) + 1); | |
171 | } | ||
172 | |||
173 | 2109 | bool DoTraverse() { | |
174 | // Optimal number of threads is yet to be determined. The main event loop | ||
175 | // contains a spin-lock, so it should not be more than number of cores. | ||
176 | 2109 | threads_process_ = reinterpret_cast<pthread_t *>( | |
177 | 2109 | smalloc(sizeof(pthread_t) * num_threads_)); | |
178 |
2/2✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 2109 times.
|
4904 | for (unsigned int i = 0; i < num_threads_; ++i) { |
179 | 2795 | int retval = pthread_create(&threads_process_[i], NULL, MainProcessQueue, | |
180 | this); | ||
181 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2795 times.
|
2795 | if (retval != 0) |
182 | ✗ | PANIC(kLogStderr, "failed to create thread"); | |
183 | } | ||
184 | |||
185 |
2/2✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 2109 times.
|
4904 | for (unsigned int i = 0; i < num_threads_; ++i) { |
186 | 2795 | int retval = pthread_join(threads_process_[i], NULL); | |
187 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2795 times.
|
2795 | assert(retval == 0); |
188 | } | ||
189 | 2109 | free(threads_process_); | |
190 | |||
191 |
2/2✓ Branch 1 taken 49 times.
✓ Branch 2 taken 2060 times.
|
2109 | if (atomic_read32(&num_errors_)) |
192 | 49 | return false; | |
193 | |||
194 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2060 times.
|
2060 | assert(catalogs_processing_.size() == 0); |
195 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2060 times.
|
2060 | assert(pre_job_queue_.IsEmpty()); |
196 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2060 times.
|
2060 | assert(post_job_queue_.IsEmpty()); |
197 | 2060 | return true; | |
198 | } | ||
199 | |||
200 | 2795 | static void *MainProcessQueue(void *data) { | |
201 | 2795 | CatalogTraversalParallel<ObjectFetcherT> *traversal = reinterpret_cast< | |
202 | CatalogTraversalParallel<ObjectFetcherT> *>(data); | ||
203 | CatalogJob *current_job; | ||
204 | while (true) { | ||
205 | 19694495 | current_job = traversal->post_job_queue_.TryPopFront(); | |
206 |
2/2✓ Branch 0 taken 1663751 times.
✓ Branch 1 taken 18034272 times.
|
19698023 | if (current_job != NULL) { |
207 | 1663751 | traversal->ProcessJobPost(current_job); | |
208 | } else { | ||
209 | 18034272 | current_job = traversal->pre_job_queue_.PopFront(); | |
210 | // NULL means the master thread tells us to finish | ||
211 |
2/2✓ Branch 1 taken 2697 times.
✓ Branch 2 taken 18034123 times.
|
18037163 | if (current_job->hash.IsNull()) { |
212 |
1/2✓ Branch 0 taken 2697 times.
✗ Branch 1 not taken.
|
2697 | delete current_job; |
213 | 2746 | break; | |
214 | } | ||
215 | 18034123 | traversal->ProcessJobPre(current_job); | |
216 | } | ||
217 | } | ||
218 | 2746 | return NULL; | |
219 | } | ||
220 | |||
221 | 2109 | void NotifyFinished() { | |
222 |
1/2✓ Branch 1 taken 2109 times.
✗ Branch 2 not taken.
|
2109 | shash::Any null_hash; |
223 | 2109 | null_hash.SetNull(); | |
224 |
2/2✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 2109 times.
|
4904 | for (unsigned i = 0; i < num_threads_; ++i) { |
225 |
3/6✓ Branch 2 taken 2795 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2795 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2795 times.
✗ Branch 9 not taken.
|
2795 | CatalogJob *job = new CatalogJob("", null_hash, 0, 0); |
226 |
1/2✓ Branch 1 taken 2795 times.
✗ Branch 2 not taken.
|
2795 | pre_job_queue_.EnqueueFront(job); |
227 | } | ||
228 | 2109 | } | |
229 | |||
230 | 1708 | void PushJob(CatalogJob *job) { | |
231 | 1708 | MutexLockGuard m(&catalogs_lock_); | |
232 |
1/2✓ Branch 1 taken 1708 times.
✗ Branch 2 not taken.
|
1708 | PushJobUnlocked(job); |
233 | 1708 | } | |
234 | |||
235 | 18035054 | void PushJobUnlocked(CatalogJob *job) { | |
236 | 18035054 | catalogs_processing_.Insert(job->hash, job); | |
237 | 18035054 | pre_job_queue_.EnqueueFront(job); | |
238 | 18035054 | } | |
239 | |||
240 | 18033584 | void ProcessJobPre(CatalogJob *job) { | |
241 |
4/6✓ Branch 0 taken 18033584 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18033045 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 49 times.
✓ Branch 6 taken 18032996 times.
|
18033584 | if (!this->PrepareCatalog(job)) { |
242 | 49 | atomic_inc32(&num_errors_); | |
243 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | NotifyFinished(); |
244 | 16364835 | return; | |
245 | } | ||
246 |
2/2✓ Branch 0 taken 153 times.
✓ Branch 1 taken 18032843 times.
|
18032996 | if (job->ignore) { |
247 |
1/2✓ Branch 1 taken 153 times.
✗ Branch 2 not taken.
|
153 | FinalizeJob(job); |
248 | 153 | return; | |
249 | } | ||
250 |
1/2✓ Branch 1 taken 17990311 times.
✗ Branch 2 not taken.
|
18032843 | NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs(); |
251 | unsigned int num_children; | ||
252 | // Ensure that pushed children won't call ProcessJobPost on this job | ||
253 | // before this function finishes | ||
254 | { | ||
255 | 17990311 | MutexLockGuard m(&catalogs_lock_); | |
256 |
2/2✓ Branch 0 taken 9036507 times.
✓ Branch 1 taken 8998296 times.
|
18034803 | if (effective_traversal_type_ == Base::kBreadthFirst) { |
257 |
1/2✓ Branch 1 taken 9036507 times.
✗ Branch 2 not taken.
|
9036507 | num_children = PushPreviousRevision(job) |
258 |
1/2✓ Branch 1 taken 9036507 times.
✗ Branch 2 not taken.
|
9036507 | + PushNestedCatalogs(job, catalog_list); |
259 | } else { | ||
260 |
1/2✓ Branch 1 taken 8998296 times.
✗ Branch 2 not taken.
|
8998296 | num_children = PushNestedCatalogs(job, catalog_list) |
261 |
1/2✓ Branch 1 taken 8998296 times.
✗ Branch 2 not taken.
|
8998296 | + PushPreviousRevision(job); |
262 | 8998296 | atomic_write32(&job->children_unprocessed, num_children); | |
263 | } | ||
264 |
3/6✓ Branch 0 taken 18034803 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18034803 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 18034803 times.
|
18034803 | if (!this->CloseCatalog(false, job)) { |
265 | ✗ | atomic_inc32(&num_errors_); | |
266 | ✗ | NotifyFinished(); | |
267 | } | ||
268 | 18034803 | } | |
269 | |||
270 | // breadth-first: can post-process immediately | ||
271 | // depth-first: no children -> can post-process immediately | ||
272 |
4/4✓ Branch 0 taken 8998149 times.
✓ Branch 1 taken 9036213 times.
✓ Branch 2 taken 7334300 times.
✓ Branch 3 taken 1663849 times.
|
18034362 | if (effective_traversal_type_ == Base::kBreadthFirst || num_children == 0) { |
273 |
1/2✓ Branch 1 taken 16364731 times.
✗ Branch 2 not taken.
|
16370513 | ProcessJobPost(job); |
274 | 16364731 | return; | |
275 | } | ||
276 |
2/2✓ Branch 1 taken 1663849 times.
✓ Branch 2 taken 16364633 times.
|
18028580 | } |
277 | |||
278 | 18034803 | unsigned int PushNestedCatalogs(CatalogJob *job, | |
279 | const NestedCatalogList &catalog_list) { | ||
280 | 18034803 | typename NestedCatalogList::const_iterator i = catalog_list.begin(); | |
281 | 18034803 | typename NestedCatalogList::const_iterator iend = catalog_list.end(); | |
282 | 18034803 | unsigned int num_children = 0; | |
283 |
2/2✓ Branch 2 taken 18032787 times.
✓ Branch 3 taken 18034803 times.
|
36067590 | for (; i != iend; ++i) { |
284 |
7/8✓ Branch 0 taken 20779 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 20779 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2719 times.
✓ Branch 7 taken 18060 times.
✓ Branch 8 taken 2719 times.
✓ Branch 9 taken 18030068 times.
|
18032787 | if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) { |
285 | 2719 | continue; | |
286 | } | ||
287 | |||
288 | CatalogJob *child; | ||
289 | 36060136 | if (!this->no_repeat_history_ | |
290 |
7/8✓ Branch 0 taken 18060 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 18060 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 17668 times.
✓ Branch 7 taken 392 times.
✓ Branch 8 taken 18029676 times.
✓ Branch 9 taken 392 times.
|
18030068 | || !catalogs_processing_.Lookup(i->hash, &child)) { |
291 |
2/2✓ Branch 0 taken 17965213 times.
✓ Branch 1 taken 64463 times.
|
18029676 | CatalogTN *parent = (this->no_close_) ? job->catalog : NULL; |
292 |
1/2✓ Branch 2 taken 18029676 times.
✗ Branch 3 not taken.
|
36059352 | child = new CatalogJob(i->mountpoint.ToString(), |
293 |
1/2✓ Branch 2 taken 18029676 times.
✗ Branch 3 not taken.
|
18029676 | i->hash, |
294 | 18029676 | job->tree_level + 1, | |
295 |
1/2✓ Branch 1 taken 18029676 times.
✗ Branch 2 not taken.
|
18029676 | job->history_depth, |
296 | parent); | ||
297 |
1/2✓ Branch 1 taken 18029676 times.
✗ Branch 2 not taken.
|
18029676 | PushJobUnlocked(child); |
298 | } | ||
299 | |||
300 |
2/2✓ Branch 0 taken 8997366 times.
✓ Branch 1 taken 9032702 times.
|
18030068 | if (effective_traversal_type_ == Base::kDepthFirst) { |
301 |
1/2✓ Branch 1 taken 8997366 times.
✗ Branch 2 not taken.
|
8997366 | child->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
302 | this, job); | ||
303 | } | ||
304 | 18030068 | ++num_children; | |
305 | } | ||
306 | 18034803 | return num_children; | |
307 | } | ||
308 | |||
309 | /** | ||
310 | * Pushes the previous revision of a root catalog. | ||
311 | * @return the number of catalogs pushed on the processing stack | ||
312 | */ | ||
313 | 18034803 | unsigned int PushPreviousRevision(CatalogJob *job) { | |
314 | // only root catalogs are used for entering a previous revision (graph) | ||
315 |
2/2✓ Branch 1 taken 18029576 times.
✓ Branch 2 taken 5227 times.
|
18034803 | if (!job->catalog->IsRoot()) { |
316 | 18029576 | return 0; | |
317 | } | ||
318 | |||
319 |
1/2✓ Branch 1 taken 5227 times.
✗ Branch 2 not taken.
|
5227 | const shash::Any previous_revision = job->catalog->GetPreviousRevision(); |
320 |
2/2✓ Branch 1 taken 367 times.
✓ Branch 2 taken 4860 times.
|
5227 | if (previous_revision.IsNull()) { |
321 | 367 | return 0; | |
322 | } | ||
323 | |||
324 | // check if the next deeper history level is actually requested | ||
325 |
3/4✓ Branch 1 taken 4860 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2309 times.
✓ Branch 4 taken 2551 times.
|
4860 | if (this->IsBelowPruningThresholds(*job, effective_history_depth_, |
326 | effective_timestamp_threshold_)) { | ||
327 | 2309 | return 0; | |
328 | } | ||
329 | |||
330 | 5102 | if (this->no_repeat_history_ | |
331 |
5/8✓ Branch 0 taken 1179 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 1179 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 1179 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 2551 times.
|
2551 | && catalogs_done_.Contains(previous_revision)) { |
332 | ✗ | return 0; | |
333 | } | ||
334 | |||
335 | CatalogJob *prev_job; | ||
336 | 5102 | if (!this->no_repeat_history_ | |
337 |
5/8✓ Branch 0 taken 1179 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 1179 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1179 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 2551 times.
✗ Branch 8 not taken.
|
2551 | || !catalogs_processing_.Lookup(previous_revision, &prev_job)) { |
338 |
2/4✓ Branch 2 taken 2551 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2551 times.
✗ Branch 6 not taken.
|
5102 | prev_job = new CatalogJob("", previous_revision, 0, |
339 |
1/2✓ Branch 1 taken 2551 times.
✗ Branch 2 not taken.
|
2551 | job->history_depth + 1); |
340 |
1/2✓ Branch 1 taken 2551 times.
✗ Branch 2 not taken.
|
2551 | PushJobUnlocked(prev_job); |
341 | } | ||
342 | |||
343 |
2/2✓ Branch 0 taken 735 times.
✓ Branch 1 taken 1816 times.
|
2551 | if (effective_traversal_type_ == Base::kDepthFirst) { |
344 |
1/2✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
|
735 | prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
345 | this, job); | ||
346 | } | ||
347 | 2551 | return 1; | |
348 | } | ||
349 | |||
350 | 18032108 | void ProcessJobPost(CatalogJob *job) { | |
351 | // Save time by keeping catalog open when suitable | ||
352 |
1/2✓ Branch 0 taken 18032108 times.
✗ Branch 1 not taken.
|
18032108 | if (job->catalog == NULL) { |
353 |
3/4✓ Branch 0 taken 18032108 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2254 times.
✓ Branch 4 taken 17975611 times.
|
18032108 | if (!this->ReopenCatalog(job)) { |
354 | 2254 | atomic_inc32(&num_errors_); | |
355 | ✗ | NotifyFinished(); | |
356 | ✗ | return; | |
357 | } | ||
358 | } | ||
359 |
1/2✓ Branch 0 taken 17975611 times.
✗ Branch 1 not taken.
|
17975611 | if (serialize_callbacks_) { |
360 | 17975611 | MutexLockGuard m(&catalog_callback_lock_); | |
361 |
2/4✓ Branch 1 taken 18034803 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18034803 times.
✗ Branch 5 not taken.
|
18034803 | this->NotifyListeners(job->GetCallbackData()); |
362 | 18034803 | } else { | |
363 | ✗ | this->NotifyListeners(job->GetCallbackData()); | |
364 | } | ||
365 |
2/2✓ Branch 0 taken 69345 times.
✓ Branch 1 taken 17965409 times.
|
18034754 | if (!this->no_close_) { |
366 |
2/4✓ Branch 0 taken 69345 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 69345 times.
|
69345 | if (!this->CloseCatalog(true, job)) { |
367 | ✗ | atomic_inc32(&num_errors_); | |
368 | ✗ | NotifyFinished(); | |
369 | ✗ | return; | |
370 | } | ||
371 | } | ||
372 | 18034754 | FinalizeJob(job); | |
373 | } | ||
374 | |||
375 | 18034711 | void FinalizeJob(CatalogJob *job) { | |
376 | { | ||
377 | 18034711 | MutexLockGuard m(&catalogs_lock_); | |
378 |
1/2✓ Branch 1 taken 18034956 times.
✗ Branch 2 not taken.
|
18034956 | catalogs_processing_.Erase(job->hash); |
379 |
1/2✓ Branch 1 taken 18034956 times.
✗ Branch 2 not taken.
|
18034956 | catalogs_done_.Insert(job->hash, true); |
380 | // No more catalogs to process -> finish | ||
381 |
1/2✓ Branch 2 taken 2060 times.
✗ Branch 3 not taken.
|
18037016 | if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty() |
382 |
5/6✓ Branch 0 taken 2060 times.
✓ Branch 1 taken 18032896 times.
✓ Branch 3 taken 2060 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2060 times.
✓ Branch 6 taken 18032896 times.
|
18037016 | && post_job_queue_.IsEmpty()) { |
383 |
1/2✓ Branch 1 taken 2060 times.
✗ Branch 2 not taken.
|
2060 | NotifyFinished(); |
384 | } | ||
385 | 18034956 | } | |
386 |
2/2✓ Branch 0 taken 8998202 times.
✓ Branch 1 taken 9036362 times.
|
18034564 | if (effective_traversal_type_ == Base::kDepthFirst) { |
387 | 8998202 | job->WakeParents(); | |
388 | } | ||
389 |
1/2✓ Branch 0 taken 18033682 times.
✗ Branch 1 not taken.
|
18033682 | delete job; |
390 | 18031526 | } | |
391 | |||
392 | 8996337 | void OnChildFinished(const int &a, CatalogJob *job) { | |
393 | // atomic_xadd32 returns value before subtraction -> needs to equal 1 | ||
394 |
2/2✓ Branch 1 taken 1663849 times.
✓ Branch 2 taken 7333958 times.
|
8996337 | if (atomic_xadd32(&job->children_unprocessed, -1) == 1) { |
395 | 1663849 | post_job_queue_.EnqueueFront(job); | |
396 | } | ||
397 | 8997758 | } | |
398 | |||
399 | unsigned int num_threads_; | ||
400 | bool serialize_callbacks_; | ||
401 | |||
402 | uint64_t effective_history_depth_; | ||
403 | time_t effective_timestamp_threshold_; | ||
404 | TraversalType effective_traversal_type_; | ||
405 | |||
406 | pthread_t *threads_process_; | ||
407 | atomic_int32 num_errors_; | ||
408 | |||
409 | Tube<CatalogJob> pre_job_queue_; | ||
410 | Tube<CatalogJob> post_job_queue_; | ||
411 | SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_; | ||
412 | SmallHashDynamic<shash::Any, bool> catalogs_done_; | ||
413 | pthread_mutex_t catalogs_lock_; | ||
414 | |||
415 | pthread_mutex_t catalog_callback_lock_; | ||
416 | }; | ||
417 | |||
418 | } // namespace swissknife | ||
419 | |||
420 | #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
421 |