Directory: | cvmfs/ |
---|---|
File: | cvmfs/catalog_traversal_parallel.h |
Date: | 2025-04-20 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 188 | 200 | 94.0% |
Branches: | 159 | 248 | 64.1% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
6 | #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
7 | |||
8 | #include <stack> | ||
9 | #include <string> | ||
10 | #include <vector> | ||
11 | |||
12 | #include "catalog_traversal.h" | ||
13 | #include "util/atomic.h" | ||
14 | #include "util/exception.h" | ||
15 | #include "util/tube.h" | ||
16 | |||
17 | namespace swissknife { | ||
18 | |||
19 | /** | ||
20 | * This class implements the same functionality as CatalogTraversal, but in | ||
21 | * parallel. For common functionality, see the documentation of | ||
22 | * CatalogTraversal. Differences: | ||
23 | * - can choose number of threads | ||
24 | * - traversal types change meaning: | ||
25 | * - depth-first -> parallelized post-order traversal (parents are processed | ||
26 | * after all children are finished) | ||
27 | * - breadth-first -> same as original, but parallelized | ||
28 | */ | ||
29 | template<class ObjectFetcherT> | ||
30 | class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> { | ||
31 | public: | ||
32 | typedef CatalogTraversalBase<ObjectFetcherT> Base; | ||
33 | typedef ObjectFetcherT ObjectFetcherTN; | ||
34 | typedef typename ObjectFetcherT::CatalogTN CatalogTN; | ||
35 | typedef typename ObjectFetcherT::HistoryTN HistoryTN; | ||
36 | typedef CatalogTraversalData<CatalogTN> CallbackDataTN; | ||
37 | typedef typename CatalogTN::NestedCatalogList NestedCatalogList; | ||
38 | typedef typename Base::Parameters Parameters; | ||
39 | typedef typename Base::TraversalType TraversalType; | ||
40 | typedef std::vector<shash::Any> HashList; | ||
41 | |||
42 | 57 | explicit CatalogTraversalParallel(const Parameters ¶ms) | |
43 | : CatalogTraversalBase<ObjectFetcherT>(params) | ||
44 | 57 | , num_threads_(params.num_threads) | |
45 |
4/8✓ Branch 2 taken 57 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 57 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 57 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 57 times.
✗ Branch 12 not taken.
|
57 | , serialize_callbacks_(params.serialize_callbacks) { |
46 | 57 | atomic_init32(&num_errors_); | |
47 |
1/2✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
|
57 | shash::Any null_hash; |
48 | 57 | null_hash.SetNull(); | |
49 |
1/2✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
|
57 | catalogs_processing_.Init(1024, null_hash, hasher); |
50 |
1/2✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
|
57 | catalogs_done_.Init(1024, null_hash, hasher); |
51 | 57 | pthread_mutex_init(&catalog_callback_lock_, NULL); | |
52 | 57 | pthread_mutex_init(&catalogs_lock_, NULL); | |
53 | 57 | effective_history_depth_ = this->default_history_depth_; | |
54 | 57 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
55 | 57 | } | |
56 | |||
57 | protected: | ||
58 | struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob, | ||
59 | public Observable<int> { | ||
60 | 368457 | explicit CatalogJob(const std::string &path, | |
61 | const shash::Any &hash, | ||
62 | const unsigned tree_level, | ||
63 | const uint64_t history_depth, | ||
64 | CatalogTN *parent = NULL) : | ||
65 | CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level, | ||
66 | 368457 | history_depth, parent) { | |
67 | 368457 | atomic_init32(&children_unprocessed); | |
68 | 368457 | } | |
69 | |||
70 | 183703 | void WakeParents() { | |
71 |
1/2✓ Branch 1 taken 183685 times.
✗ Branch 2 not taken.
|
183703 | this->NotifyListeners(0); |
72 | 183685 | } | |
73 | |||
74 | atomic_int32 children_unprocessed; | ||
75 | }; | ||
76 | |||
77 | public: | ||
78 | /** | ||
79 | * Starts the traversal process. | ||
80 | * After calling this methods CatalogTraversal will go through all catalogs | ||
81 | * and call the registered callback methods for each found catalog. | ||
82 | * If something goes wrong in the process, the traversal will be cancelled. | ||
83 | * | ||
84 | * @return true, when all catalogs were successfully processed. On | ||
85 | * failure the traversal is cancelled and false is returned. | ||
86 | */ | ||
87 | 40 | bool Traverse(const TraversalType type = Base::kBreadthFirst) { | |
88 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash(); |
89 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
|
40 | if (root_catalog_hash.IsNull()) { |
90 | ✗ | return false; | |
91 | } | ||
92 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | return Traverse(root_catalog_hash, type); |
93 | } | ||
94 | |||
95 | /** | ||
96 | * Starts the traversal process at the catalog pointed to by the given hash | ||
97 | * | ||
98 | * @param root_catalog_hash the entry point into the catalog traversal | ||
99 | * @return true when catalogs were successfully traversed | ||
100 | */ | ||
101 | 55 | bool Traverse(const shash::Any &root_catalog_hash, | |
102 | const TraversalType type = Base::kBreadthFirst) { | ||
103 | // add the root catalog of the repository as the first element on the job | ||
104 | // stack | ||
105 |
3/6✓ Branch 0 taken 33 times.
✓ Branch 1 taken 22 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 55 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
88 | if (this->no_repeat_history_ && |
106 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 33 times.
|
33 | catalogs_done_.Contains(root_catalog_hash)) { |
107 | ✗ | return true; | |
108 | } | ||
109 | 55 | effective_traversal_type_ = type; | |
110 |
3/6✓ Branch 2 taken 55 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 55 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 55 times.
✗ Branch 9 not taken.
|
55 | CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0); |
111 | 55 | PushJob(root_job); | |
112 | 55 | return DoTraverse(); | |
113 | } | ||
114 | |||
115 | /** | ||
116 | * Start the traversal process from a list of root catalogs. Same as | ||
117 | * TraverseRevision function, TraverseList does not traverse into predecessor | ||
118 | * catalog revisions and ignores TraversalParameter settings. | ||
119 | */ | ||
120 | 49 | bool TraverseList(const HashList &root_catalog_list, | |
121 | const TraversalType type = Base::kBreadthFirst) { | ||
122 | // Push in reverse order for CatalogTraversal-like behavior | ||
123 | 49 | HashList::const_reverse_iterator i = root_catalog_list.rbegin(); | |
124 | 49 | const HashList::const_reverse_iterator iend = root_catalog_list.rend(); | |
125 | 49 | bool has_pushed = false; | |
126 | { | ||
127 | 49 | MutexLockGuard m(&catalogs_lock_); | |
128 |
3/4✓ Branch 2 taken 149 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 100 times.
✓ Branch 5 taken 49 times.
|
149 | for (; i != iend; ++i) { |
129 |
7/8✓ Branch 0 taken 91 times.
✓ Branch 1 taken 9 times.
✓ Branch 4 taken 91 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 34 times.
✓ Branch 7 taken 57 times.
✓ Branch 8 taken 34 times.
✓ Branch 9 taken 66 times.
|
100 | if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) { |
130 | 34 | continue; | |
131 | } | ||
132 | |||
133 |
3/6✓ Branch 2 taken 66 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 66 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 66 times.
✗ Branch 10 not taken.
|
66 | CatalogJob *root_job = new CatalogJob("", *i, 0, 0); |
134 |
1/2✓ Branch 1 taken 66 times.
✗ Branch 2 not taken.
|
66 | PushJobUnlocked(root_job); |
135 | 66 | has_pushed = true; | |
136 | } | ||
137 | 49 | } | |
138 | // noop: no catalogs to traverse | ||
139 |
2/2✓ Branch 0 taken 13 times.
✓ Branch 1 taken 36 times.
|
49 | if (!has_pushed) { |
140 | 13 | return true; | |
141 | } | ||
142 | 36 | effective_traversal_type_ = type; | |
143 | 36 | effective_history_depth_ = Parameters::kNoHistory; | |
144 | 36 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
145 |
1/2✓ Branch 1 taken 36 times.
✗ Branch 2 not taken.
|
36 | bool result = DoTraverse(); |
146 | 36 | effective_history_depth_ = this->default_history_depth_; | |
147 | 36 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
148 | 36 | return result; | |
149 | } | ||
150 | |||
151 | /** | ||
152 | * Starts the traversal process at the catalog pointed to by the given hash | ||
153 | * but doesn't traverse into predecessor catalog revisions. This overrides the | ||
154 | * TraversalParameter settings provided at construction. | ||
155 | * | ||
156 | * @param root_catalog_hash the entry point into the catalog traversal | ||
157 | * @return true when catalogs were successfully traversed | ||
158 | */ | ||
159 | 2 | bool TraverseRevision( | |
160 | const shash::Any &root_catalog_hash, | ||
161 | const TraversalType type = Base::kBreadthFirst) | ||
162 | { | ||
163 | 2 | effective_history_depth_ = Parameters::kNoHistory; | |
164 | 2 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
165 | 2 | bool result = Traverse(root_catalog_hash, type); | |
166 | 2 | effective_history_depth_ = this->default_history_depth_; | |
167 | 2 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
168 | 2 | return result; | |
169 | } | ||
170 | |||
171 | protected: | ||
172 | 1689355 | static uint32_t hasher(const shash::Any &key) { | |
173 | // Don't start with the first bytes, because == is using them as well | ||
174 | 1689355 | return (uint32_t) *(reinterpret_cast<const uint32_t *>(key.digest) + 1); | |
175 | } | ||
176 | |||
177 | 91 | bool DoTraverse() { | |
178 | // Optimal number of threads is yet to be determined. The main event loop | ||
179 | // contains a spin-lock, so it should not be more than number of cores. | ||
180 | 91 | threads_process_ = reinterpret_cast<pthread_t *> | |
181 | 91 | (smalloc(sizeof(pthread_t)*num_threads_)); | |
182 |
2/2✓ Branch 0 taken 105 times.
✓ Branch 1 taken 91 times.
|
196 | for (unsigned int i = 0; i < num_threads_; ++i) { |
183 | 105 | int retval = pthread_create(&threads_process_[i], NULL, | |
184 | MainProcessQueue, this); | ||
185 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 105 times.
|
105 | if (retval != 0) PANIC(kLogStderr, "failed to create thread"); |
186 | } | ||
187 | |||
188 |
2/2✓ Branch 0 taken 105 times.
✓ Branch 1 taken 91 times.
|
196 | for (unsigned int i = 0; i < num_threads_; ++i) { |
189 | 105 | int retval = pthread_join(threads_process_[i], NULL); | |
190 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 105 times.
|
105 | assert(retval == 0); |
191 | } | ||
192 | 91 | free(threads_process_); | |
193 | |||
194 |
2/2✓ Branch 1 taken 1 times.
✓ Branch 2 taken 90 times.
|
91 | if (atomic_read32(&num_errors_)) |
195 | 1 | return false; | |
196 | |||
197 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 90 times.
|
90 | assert(catalogs_processing_.size() == 0); |
198 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 90 times.
|
90 | assert(pre_job_queue_.IsEmpty()); |
199 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 90 times.
|
90 | assert(post_job_queue_.IsEmpty()); |
200 | 90 | return true; | |
201 | } | ||
202 | |||
203 | 105 | static void *MainProcessQueue(void *data) { | |
204 | 105 | CatalogTraversalParallel<ObjectFetcherT> *traversal = | |
205 | reinterpret_cast<CatalogTraversalParallel<ObjectFetcherT> *>(data); | ||
206 | CatalogJob *current_job; | ||
207 | while (true) { | ||
208 | 402332 | current_job = traversal->post_job_queue_.TryPopFront(); | |
209 |
2/2✓ Branch 0 taken 33982 times.
✓ Branch 1 taken 368423 times.
|
402405 | if (current_job != NULL) { |
210 | 33982 | traversal->ProcessJobPost(current_job); | |
211 | } else { | ||
212 | 368423 | current_job = traversal->pre_job_queue_.PopFront(); | |
213 | // NULL means the master thread tells us to finish | ||
214 |
2/2✓ Branch 1 taken 105 times.
✓ Branch 2 taken 368336 times.
|
368447 | if (current_job->hash.IsNull()) { |
215 |
1/2✓ Branch 0 taken 105 times.
✗ Branch 1 not taken.
|
105 | delete current_job; |
216 | 105 | break; | |
217 | } | ||
218 | 368336 | traversal->ProcessJobPre(current_job); | |
219 | } | ||
220 | } | ||
221 | 105 | return NULL; | |
222 | } | ||
223 | |||
224 | 91 | void NotifyFinished() { | |
225 |
1/2✓ Branch 1 taken 91 times.
✗ Branch 2 not taken.
|
91 | shash::Any null_hash; |
226 | 91 | null_hash.SetNull(); | |
227 |
2/2✓ Branch 0 taken 105 times.
✓ Branch 1 taken 91 times.
|
196 | for (unsigned i = 0; i < num_threads_; ++i) { |
228 |
3/6✓ Branch 2 taken 105 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 105 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 105 times.
✗ Branch 9 not taken.
|
105 | CatalogJob *job = new CatalogJob("", null_hash, 0, 0); |
229 |
1/2✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
|
105 | pre_job_queue_.EnqueueFront(job); |
230 | } | ||
231 | 91 | } | |
232 | |||
233 | 55 | void PushJob(CatalogJob *job) { | |
234 | 55 | MutexLockGuard m(&catalogs_lock_); | |
235 |
1/2✓ Branch 1 taken 55 times.
✗ Branch 2 not taken.
|
55 | PushJobUnlocked(job); |
236 | 55 | } | |
237 | |||
238 | 368352 | void PushJobUnlocked(CatalogJob *job) { | |
239 | 368352 | catalogs_processing_.Insert(job->hash, job); | |
240 | 368352 | pre_job_queue_.EnqueueFront(job); | |
241 | 368352 | } | |
242 | |||
243 | 368330 | void ProcessJobPre(CatalogJob *job) { | |
244 |
4/6✓ Branch 0 taken 368330 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 368294 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 times.
✓ Branch 6 taken 368293 times.
|
368330 | if (!this->PrepareCatalog(job)) { |
245 | 1 | atomic_inc32(&num_errors_); | |
246 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | NotifyFinished(); |
247 | 334310 | return; | |
248 | } | ||
249 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 368287 times.
|
368293 | if (job->ignore) { |
250 |
1/2✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
|
6 | FinalizeJob(job); |
251 | 6 | return; | |
252 | } | ||
253 |
1/2✓ Branch 1 taken 367651 times.
✗ Branch 2 not taken.
|
368287 | NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs(); |
254 | unsigned int num_children; | ||
255 | // Ensure that pushed children won't call ProcessJobPost on this job | ||
256 | // before this function finishes | ||
257 | { | ||
258 | 367651 | MutexLockGuard m(&catalogs_lock_); | |
259 |
2/2✓ Branch 0 taken 184642 times.
✓ Branch 1 taken 183702 times.
|
368344 | if (effective_traversal_type_ == Base::kBreadthFirst) { |
260 |
1/2✓ Branch 1 taken 184642 times.
✗ Branch 2 not taken.
|
184642 | num_children = PushPreviousRevision(job) + |
261 |
1/2✓ Branch 1 taken 184642 times.
✗ Branch 2 not taken.
|
184642 | PushNestedCatalogs(job, catalog_list); |
262 | } else { | ||
263 |
1/2✓ Branch 1 taken 183702 times.
✗ Branch 2 not taken.
|
183702 | num_children = PushNestedCatalogs(job, catalog_list) + |
264 |
1/2✓ Branch 1 taken 183702 times.
✗ Branch 2 not taken.
|
183702 | PushPreviousRevision(job); |
265 | 183702 | atomic_write32(&job->children_unprocessed, num_children); | |
266 | } | ||
267 |
3/6✓ Branch 0 taken 368344 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 368344 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 368344 times.
|
368344 | if (!this->CloseCatalog(false, job)) { |
268 | ✗ | atomic_inc32(&num_errors_); | |
269 | ✗ | NotifyFinished(); | |
270 | } | ||
271 | 368344 | } | |
272 | |||
273 | // breadth-first: can post-process immediately | ||
274 | // depth-first: no children -> can post-process immediately | ||
275 |
4/4✓ Branch 0 taken 183695 times.
✓ Branch 1 taken 184648 times.
✓ Branch 2 taken 149713 times.
✓ Branch 3 taken 33982 times.
|
368343 | if (effective_traversal_type_ == Base::kBreadthFirst || |
276 | num_children == 0) { | ||
277 |
1/2✓ Branch 1 taken 334274 times.
✗ Branch 2 not taken.
|
334361 | ProcessJobPost(job); |
278 | 334274 | return; | |
279 | } | ||
280 |
2/2✓ Branch 1 taken 33982 times.
✓ Branch 2 taken 334303 times.
|
368256 | } |
281 | |||
282 | 368344 | unsigned int PushNestedCatalogs(CatalogJob *job, | |
283 | const NestedCatalogList &catalog_list) { | ||
284 | 368344 | typename NestedCatalogList::const_iterator i = catalog_list.begin(); | |
285 | 368344 | typename NestedCatalogList::const_iterator iend = catalog_list.end(); | |
286 | 368344 | unsigned int num_children = 0; | |
287 |
2/2✓ Branch 2 taken 368229 times.
✓ Branch 3 taken 368344 times.
|
736573 | for (; i != iend; ++i) { |
288 |
7/8✓ Branch 0 taken 637 times.
✓ Branch 1 taken 367592 times.
✓ Branch 4 taken 637 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 67 times.
✓ Branch 7 taken 570 times.
✓ Branch 8 taken 67 times.
✓ Branch 9 taken 368162 times.
|
368229 | if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) { |
289 | 67 | continue; | |
290 | } | ||
291 | |||
292 | CatalogJob *child; | ||
293 |
4/4✓ Branch 0 taken 570 times.
✓ Branch 1 taken 367592 times.
✓ Branch 2 taken 368154 times.
✓ Branch 3 taken 8 times.
|
368732 | if (!this->no_repeat_history_ || |
294 |
3/4✓ Branch 2 taken 570 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 562 times.
✓ Branch 5 taken 8 times.
|
570 | !catalogs_processing_.Lookup(i->hash, &child)) { |
295 |
2/2✓ Branch 0 taken 366637 times.
✓ Branch 1 taken 1517 times.
|
368154 | CatalogTN *parent = (this->no_close_) ? job->catalog : NULL; |
296 |
1/2✓ Branch 2 taken 368154 times.
✗ Branch 3 not taken.
|
736308 | child = new CatalogJob(i->mountpoint.ToString(), |
297 |
1/2✓ Branch 2 taken 368154 times.
✗ Branch 3 not taken.
|
368154 | i->hash, |
298 | 368154 | job->tree_level + 1, | |
299 |
1/2✓ Branch 1 taken 368154 times.
✗ Branch 2 not taken.
|
368154 | job->history_depth, |
300 | parent); | ||
301 |
1/2✓ Branch 1 taken 368154 times.
✗ Branch 2 not taken.
|
368154 | PushJobUnlocked(child); |
302 | } | ||
303 | |||
304 |
2/2✓ Branch 0 taken 183660 times.
✓ Branch 1 taken 184502 times.
|
368162 | if (effective_traversal_type_ == Base::kDepthFirst) { |
305 |
1/2✓ Branch 1 taken 183660 times.
✗ Branch 2 not taken.
|
183660 | child->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
306 | this, job); | ||
307 | } | ||
308 | 368162 | ++num_children; | |
309 | } | ||
310 | 368344 | return num_children; | |
311 | } | ||
312 | |||
313 | /** | ||
314 | * Pushes the previous revision of a root catalog. | ||
315 | * @return the number of catalogs pushed on the processing stack | ||
316 | */ | ||
317 | 368344 | unsigned int PushPreviousRevision(CatalogJob *job) { | |
318 | // only root catalogs are used for entering a previous revision (graph) | ||
319 |
2/2✓ Branch 1 taken 368151 times.
✓ Branch 2 taken 193 times.
|
368344 | if (!job->catalog->IsRoot()) { |
320 | 368151 | return 0; | |
321 | } | ||
322 | |||
323 |
1/2✓ Branch 1 taken 193 times.
✗ Branch 2 not taken.
|
193 | const shash::Any previous_revision = job->catalog->GetPreviousRevision(); |
324 |
2/2✓ Branch 1 taken 19 times.
✓ Branch 2 taken 174 times.
|
193 | if (previous_revision.IsNull()) { |
325 | 19 | return 0; | |
326 | } | ||
327 | |||
328 | // check if the next deeper history level is actually requested | ||
329 |
3/4✓ Branch 1 taken 174 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 97 times.
✓ Branch 4 taken 77 times.
|
174 | if (this->IsBelowPruningThresholds(*job, effective_history_depth_, |
330 | effective_timestamp_threshold_)) { | ||
331 | 97 | return 0; | |
332 | } | ||
333 | |||
334 |
3/6✓ Branch 0 taken 49 times.
✓ Branch 1 taken 28 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 77 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
|
126 | if (this->no_repeat_history_ && |
335 |
2/4✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 49 times.
|
49 | catalogs_done_.Contains(previous_revision)) { |
336 | ✗ | return 0; | |
337 | } | ||
338 | |||
339 | CatalogJob *prev_job; | ||
340 |
3/4✓ Branch 0 taken 49 times.
✓ Branch 1 taken 28 times.
✓ Branch 2 taken 77 times.
✗ Branch 3 not taken.
|
126 | if (!this->no_repeat_history_ || |
341 |
2/4✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
|
49 | !catalogs_processing_.Lookup(previous_revision, &prev_job)) { |
342 |
1/2✓ Branch 2 taken 77 times.
✗ Branch 3 not taken.
|
154 | prev_job = |
343 |
2/4✓ Branch 1 taken 77 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 77 times.
✗ Branch 5 not taken.
|
77 | new CatalogJob("", previous_revision, 0, job->history_depth + 1); |
344 |
1/2✓ Branch 1 taken 77 times.
✗ Branch 2 not taken.
|
77 | PushJobUnlocked(prev_job); |
345 | } | ||
346 | |||
347 |
2/2✓ Branch 0 taken 15 times.
✓ Branch 1 taken 62 times.
|
77 | if (effective_traversal_type_ == Base::kDepthFirst) { |
348 |
1/2✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
|
15 | prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
349 | this, job); | ||
350 | } | ||
351 | 77 | return 1; | |
352 | } | ||
353 | |||
354 | 368308 | void ProcessJobPost(CatalogJob *job) { | |
355 | // Save time by keeping catalog open when suitable | ||
356 |
1/2✓ Branch 0 taken 368308 times.
✗ Branch 1 not taken.
|
368308 | if (job->catalog == NULL) { |
357 |
2/4✓ Branch 0 taken 368308 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 367589 times.
|
368308 | if (!this->ReopenCatalog(job)) { |
358 | ✗ | atomic_inc32(&num_errors_); | |
359 | ✗ | NotifyFinished(); | |
360 | ✗ | return; | |
361 | } | ||
362 | } | ||
363 |
1/2✓ Branch 0 taken 367589 times.
✗ Branch 1 not taken.
|
367589 | if (serialize_callbacks_) { |
364 | 367589 | MutexLockGuard m(&catalog_callback_lock_); | |
365 |
2/4✓ Branch 1 taken 368344 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 368344 times.
✗ Branch 5 not taken.
|
368344 | this->NotifyListeners(job->GetCallbackData()); |
366 | 368344 | } else { | |
367 | ✗ | this->NotifyListeners(job->GetCallbackData()); | |
368 | } | ||
369 |
2/2✓ Branch 0 taken 1702 times.
✓ Branch 1 taken 366640 times.
|
368342 | if (!this->no_close_) { |
370 |
2/4✓ Branch 0 taken 1702 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1702 times.
|
1702 | if (!this->CloseCatalog(true, job)) { |
371 | ✗ | atomic_inc32(&num_errors_); | |
372 | ✗ | NotifyFinished(); | |
373 | ✗ | return; | |
374 | } | ||
375 | } | ||
376 | 368342 | FinalizeJob(job); | |
377 | } | ||
378 | |||
379 | 368343 | void FinalizeJob(CatalogJob *job) { | |
380 | { | ||
381 | 368343 | MutexLockGuard m(&catalogs_lock_); | |
382 |
1/2✓ Branch 1 taken 368350 times.
✗ Branch 2 not taken.
|
368350 | catalogs_processing_.Erase(job->hash); |
383 |
1/2✓ Branch 1 taken 368350 times.
✗ Branch 2 not taken.
|
368350 | catalogs_done_.Insert(job->hash, true); |
384 | // No more catalogs to process -> finish | ||
385 |
6/8✓ Branch 1 taken 90 times.
✓ Branch 2 taken 368260 times.
✓ Branch 4 taken 90 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 90 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 90 times.
✓ Branch 9 taken 368260 times.
|
368440 | if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty() && |
386 | 90 | post_job_queue_.IsEmpty()) { | |
387 |
1/2✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
|
90 | NotifyFinished(); |
388 | } | ||
389 | 368350 | } | |
390 |
2/2✓ Branch 0 taken 183704 times.
✓ Branch 1 taken 184646 times.
|
368350 | if (effective_traversal_type_ == Base::kDepthFirst) { |
391 | 183704 | job->WakeParents(); | |
392 | } | ||
393 |
2/2✓ Branch 0 taken 368323 times.
✓ Branch 1 taken 3 times.
|
368326 | delete job; |
394 | 368261 | } | |
395 | |||
396 | 183648 | void OnChildFinished(const int &a, CatalogJob *job) { | |
397 | // atomic_xadd32 returns value before subtraction -> needs to equal 1 | ||
398 |
2/2✓ Branch 1 taken 33982 times.
✓ Branch 2 taken 149684 times.
|
183648 | if (atomic_xadd32(&job->children_unprocessed, -1) == 1) { |
399 | 33982 | post_job_queue_.EnqueueFront(job); | |
400 | } | ||
401 | 183666 | } | |
402 | |||
403 | unsigned int num_threads_; | ||
404 | bool serialize_callbacks_; | ||
405 | |||
406 | uint64_t effective_history_depth_; | ||
407 | time_t effective_timestamp_threshold_; | ||
408 | TraversalType effective_traversal_type_; | ||
409 | |||
410 | pthread_t *threads_process_; | ||
411 | atomic_int32 num_errors_; | ||
412 | |||
413 | Tube<CatalogJob> pre_job_queue_; | ||
414 | Tube<CatalogJob> post_job_queue_; | ||
415 | SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_; | ||
416 | SmallHashDynamic<shash::Any, bool> catalogs_done_; | ||
417 | pthread_mutex_t catalogs_lock_; | ||
418 | |||
419 | pthread_mutex_t catalog_callback_lock_; | ||
420 | }; | ||
421 | |||
422 | } // namespace swissknife | ||
423 | |||
424 | #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
425 |