| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/catalog_traversal_parallel.h |
| Date: | 2025-11-02 02:35:35 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 186 | 199 | 93.5% |
| Branches: | 159 | 244 | 65.2% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
| 6 | #define CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
| 7 | |||
| 8 | #include <stack> | ||
| 9 | #include <string> | ||
| 10 | #include <vector> | ||
| 11 | |||
| 12 | #include "catalog_traversal.h" | ||
| 13 | #include "util/atomic.h" | ||
| 14 | #include "util/exception.h" | ||
| 15 | #include "util/tube.h" | ||
| 16 | |||
| 17 | namespace swissknife { | ||
| 18 | |||
| 19 | /** | ||
| 20 | * This class implements the same functionality as CatalogTraversal, but in | ||
| 21 | * parallel. For common functionality, see the documentation of | ||
| 22 | * CatalogTraversal. Differences: | ||
| 23 | * - can choose number of threads | ||
| 24 | * - traversal types change meaning: | ||
| 25 | * - depth-first -> parallelized post-order traversal (parents are processed | ||
| 26 | * after all children are finished) | ||
| 27 | * - breadth-first -> same as original, but parallelized | ||
| 28 | */ | ||
| 29 | template<class ObjectFetcherT> | ||
| 30 | class CatalogTraversalParallel : public CatalogTraversalBase<ObjectFetcherT> { | ||
| 31 | public: | ||
| 32 | typedef CatalogTraversalBase<ObjectFetcherT> Base; | ||
| 33 | typedef ObjectFetcherT ObjectFetcherTN; | ||
| 34 | typedef typename ObjectFetcherT::CatalogTN CatalogTN; | ||
| 35 | typedef typename ObjectFetcherT::HistoryTN HistoryTN; | ||
| 36 | typedef CatalogTraversalData<CatalogTN> CallbackDataTN; | ||
| 37 | typedef typename CatalogTN::NestedCatalogList NestedCatalogList; | ||
| 38 | typedef typename Base::Parameters Parameters; | ||
| 39 | typedef typename Base::TraversalType TraversalType; | ||
| 40 | typedef std::vector<shash::Any> HashList; | ||
| 41 | |||
| 42 | 2529 | explicit CatalogTraversalParallel(const Parameters ¶ms) | |
| 43 | : CatalogTraversalBase<ObjectFetcherT>(params) | ||
| 44 | 2529 | , num_threads_(params.num_threads) | |
| 45 |
4/8✓ Branch 2 taken 2529 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2529 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2529 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 2529 times.
✗ Branch 12 not taken.
|
2529 | , serialize_callbacks_(params.serialize_callbacks) { |
| 46 | 2529 | atomic_init32(&num_errors_); | |
| 47 |
1/2✓ Branch 1 taken 2529 times.
✗ Branch 2 not taken.
|
2529 | shash::Any null_hash; |
| 48 | 2529 | null_hash.SetNull(); | |
| 49 |
1/2✓ Branch 1 taken 2529 times.
✗ Branch 2 not taken.
|
2529 | catalogs_processing_.Init(1024, null_hash, hasher); |
| 50 |
1/2✓ Branch 1 taken 2529 times.
✗ Branch 2 not taken.
|
2529 | catalogs_done_.Init(1024, null_hash, hasher); |
| 51 | 2529 | pthread_mutex_init(&catalog_callback_lock_, NULL); | |
| 52 | 2529 | pthread_mutex_init(&catalogs_lock_, NULL); | |
| 53 | 2529 | effective_history_depth_ = this->default_history_depth_; | |
| 54 | 2529 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
| 55 | 2529 | } | |
| 56 | |||
| 57 | protected: | ||
| 58 | struct CatalogJob : public CatalogTraversal<ObjectFetcherT>::CatalogJob, | ||
| 59 | public Observable<int> { | ||
| 60 | 18050169 | explicit CatalogJob(const std::string &path, | |
| 61 | const shash::Any &hash, | ||
| 62 | const unsigned tree_level, | ||
| 63 | const uint64_t history_depth, | ||
| 64 | CatalogTN *parent = NULL) | ||
| 65 | : CatalogTraversal<ObjectFetcherT>::CatalogJob(path, hash, tree_level, | ||
| 66 | 18050169 | history_depth, parent) { | |
| 67 | 18050169 | atomic_init32(&children_unprocessed); | |
| 68 | 18050169 | } | |
| 69 | |||
| 70 |
1/2✓ Branch 1 taken 8999798 times.
✗ Branch 2 not taken.
|
9000239 | void WakeParents() { this->NotifyListeners(0); } |
| 71 | |||
| 72 | atomic_int32 children_unprocessed; | ||
| 73 | }; | ||
| 74 | |||
| 75 | public: | ||
| 76 | /** | ||
| 77 | * Starts the traversal process. | ||
| 78 | * After calling this methods CatalogTraversal will go through all catalogs | ||
| 79 | * and call the registered callback methods for each found catalog. | ||
| 80 | * If something goes wrong in the process, the traversal will be cancelled. | ||
| 81 | * | ||
| 82 | * @return true, when all catalogs were successfully processed. On | ||
| 83 | * failure the traversal is cancelled and false is returned. | ||
| 84 | */ | ||
| 85 | 1708 | bool Traverse(const TraversalType type = Base::kBreadthFirst) { | |
| 86 |
1/2✓ Branch 1 taken 1708 times.
✗ Branch 2 not taken.
|
1708 | const shash::Any root_catalog_hash = this->GetRepositoryRootCatalogHash(); |
| 87 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1708 times.
|
1708 | if (root_catalog_hash.IsNull()) { |
| 88 | ✗ | return false; | |
| 89 | } | ||
| 90 |
1/2✓ Branch 1 taken 1708 times.
✗ Branch 2 not taken.
|
1708 | return Traverse(root_catalog_hash, type); |
| 91 | } | ||
| 92 | |||
| 93 | /** | ||
| 94 | * Starts the traversal process at the catalog pointed to by the given hash | ||
| 95 | * | ||
| 96 | * @param root_catalog_hash the entry point into the catalog traversal | ||
| 97 | * @return true when catalogs were successfully traversed | ||
| 98 | */ | ||
| 99 | 2443 | bool Traverse(const shash::Any &root_catalog_hash, | |
| 100 | const TraversalType type = Base::kBreadthFirst) { | ||
| 101 | // add the root catalog of the repository as the first element on the job | ||
| 102 | // stack | ||
| 103 | 4886 | if (this->no_repeat_history_ | |
| 104 |
4/6✓ Branch 0 taken 1365 times.
✓ Branch 1 taken 1078 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1365 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 2443 times.
|
2443 | && catalogs_done_.Contains(root_catalog_hash)) { |
| 105 | ✗ | return true; | |
| 106 | } | ||
| 107 | 2443 | effective_traversal_type_ = type; | |
| 108 |
3/6✓ Branch 2 taken 2443 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2443 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2443 times.
✗ Branch 9 not taken.
|
2443 | CatalogJob *root_job = new CatalogJob("", root_catalog_hash, 0, 0); |
| 109 | 2443 | PushJob(root_job); | |
| 110 | 2443 | return DoTraverse(); | |
| 111 | } | ||
| 112 | |||
| 113 | /** | ||
| 114 | * Start the traversal process from a list of root catalogs. Same as | ||
| 115 | * TraverseRevision function, TraverseList does not traverse into predecessor | ||
| 116 | * catalog revisions and ignores TraversalParameter settings. | ||
| 117 | */ | ||
| 118 | 1897 | bool TraverseList(const HashList &root_catalog_list, | |
| 119 | const TraversalType type = Base::kBreadthFirst) { | ||
| 120 | // Push in reverse order for CatalogTraversal-like behavior | ||
| 121 | 1897 | HashList::const_reverse_iterator i = root_catalog_list.rbegin(); | |
| 122 | 1897 | const HashList::const_reverse_iterator iend = root_catalog_list.rend(); | |
| 123 | 1897 | bool has_pushed = false; | |
| 124 | { | ||
| 125 | 1897 | MutexLockGuard m(&catalogs_lock_); | |
| 126 |
3/4✓ Branch 2 taken 5849 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 3952 times.
✓ Branch 5 taken 1897 times.
|
5849 | for (; i != iend; ++i) { |
| 127 |
7/8✓ Branch 0 taken 3511 times.
✓ Branch 1 taken 441 times.
✓ Branch 4 taken 3511 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1258 times.
✓ Branch 7 taken 2253 times.
✓ Branch 8 taken 1258 times.
✓ Branch 9 taken 2694 times.
|
3952 | if (this->no_repeat_history_ && catalogs_done_.Contains(*i)) { |
| 128 | 1258 | continue; | |
| 129 | } | ||
| 130 | |||
| 131 |
3/6✓ Branch 2 taken 2694 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2694 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2694 times.
✗ Branch 10 not taken.
|
2694 | CatalogJob *root_job = new CatalogJob("", *i, 0, 0); |
| 132 |
1/2✓ Branch 1 taken 2694 times.
✗ Branch 2 not taken.
|
2694 | PushJobUnlocked(root_job); |
| 133 | 2694 | has_pushed = true; | |
| 134 | } | ||
| 135 | 1897 | } | |
| 136 | // noop: no catalogs to traverse | ||
| 137 |
2/2✓ Branch 0 taken 481 times.
✓ Branch 1 taken 1416 times.
|
1897 | if (!has_pushed) { |
| 138 | 481 | return true; | |
| 139 | } | ||
| 140 | 1416 | effective_traversal_type_ = type; | |
| 141 | 1416 | effective_history_depth_ = Parameters::kNoHistory; | |
| 142 | 1416 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
| 143 |
1/2✓ Branch 1 taken 1416 times.
✗ Branch 2 not taken.
|
1416 | bool result = DoTraverse(); |
| 144 | 1416 | effective_history_depth_ = this->default_history_depth_; | |
| 145 | 1416 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
| 146 | 1416 | return result; | |
| 147 | } | ||
| 148 | |||
| 149 | /** | ||
| 150 | * Starts the traversal process at the catalog pointed to by the given hash | ||
| 151 | * but doesn't traverse into predecessor catalog revisions. This overrides the | ||
| 152 | * TraversalParameter settings provided at construction. | ||
| 153 | * | ||
| 154 | * @param root_catalog_hash the entry point into the catalog traversal | ||
| 155 | * @return true when catalogs were successfully traversed | ||
| 156 | */ | ||
| 157 | 98 | bool TraverseRevision(const shash::Any &root_catalog_hash, | |
| 158 | const TraversalType type = Base::kBreadthFirst) { | ||
| 159 | 98 | effective_history_depth_ = Parameters::kNoHistory; | |
| 160 | 98 | effective_timestamp_threshold_ = Parameters::kNoTimestampThreshold; | |
| 161 | 98 | bool result = Traverse(root_catalog_hash, type); | |
| 162 | 98 | effective_history_depth_ = this->default_history_depth_; | |
| 163 | 98 | effective_timestamp_threshold_ = this->default_timestamp_threshold_; | |
| 164 | 98 | return result; | |
| 165 | } | ||
| 166 | |||
| 167 | protected: | ||
| 168 | 82718963 | static uint32_t hasher(const shash::Any &key) { | |
| 169 | // Don't start with the first bytes, because == is using them as well | ||
| 170 | return static_cast<uint32_t>( | ||
| 171 | 82718963 | *(reinterpret_cast<const uint32_t *>(key.digest) + 1)); | |
| 172 | } | ||
| 173 | |||
| 174 | 3859 | bool DoTraverse() { | |
| 175 | // Optimal number of threads is yet to be determined. The main event loop | ||
| 176 | // contains a spin-lock, so it should not be more than number of cores. | ||
| 177 | 3859 | threads_process_ = reinterpret_cast<pthread_t *>( | |
| 178 | 3859 | smalloc(sizeof(pthread_t) * num_threads_)); | |
| 179 |
2/2✓ Branch 0 taken 4545 times.
✓ Branch 1 taken 3859 times.
|
8404 | for (unsigned int i = 0; i < num_threads_; ++i) { |
| 180 | 4545 | int retval = pthread_create(&threads_process_[i], NULL, MainProcessQueue, | |
| 181 | this); | ||
| 182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4545 times.
|
4545 | if (retval != 0) |
| 183 | ✗ | PANIC(kLogStderr, "failed to create thread"); | |
| 184 | } | ||
| 185 | |||
| 186 |
2/2✓ Branch 0 taken 4545 times.
✓ Branch 1 taken 3859 times.
|
8404 | for (unsigned int i = 0; i < num_threads_; ++i) { |
| 187 | 4545 | int retval = pthread_join(threads_process_[i], NULL); | |
| 188 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4545 times.
|
4545 | assert(retval == 0); |
| 189 | } | ||
| 190 | 3859 | free(threads_process_); | |
| 191 | |||
| 192 |
2/2✓ Branch 1 taken 49 times.
✓ Branch 2 taken 3810 times.
|
3859 | if (atomic_read32(&num_errors_)) |
| 193 | 49 | return false; | |
| 194 | |||
| 195 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3810 times.
|
3810 | assert(catalogs_processing_.size() == 0); |
| 196 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3810 times.
|
3810 | assert(pre_job_queue_.IsEmpty()); |
| 197 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3810 times.
|
3810 | assert(post_job_queue_.IsEmpty()); |
| 198 | 3810 | return true; | |
| 199 | } | ||
| 200 | |||
| 201 | 4545 | static void *MainProcessQueue(void *data) { | |
| 202 | 4545 | CatalogTraversalParallel<ObjectFetcherT> *traversal = reinterpret_cast< | |
| 203 | CatalogTraversalParallel<ObjectFetcherT> *>(data); | ||
| 204 | CatalogJob *current_job; | ||
| 205 | while (true) { | ||
| 206 | 19709328 | current_job = traversal->post_job_queue_.TryPopFront(); | |
| 207 |
2/2✓ Branch 0 taken 1664794 times.
✓ Branch 1 taken 18049238 times.
|
19714032 | if (current_job != NULL) { |
| 208 | 1664794 | traversal->ProcessJobPost(current_job); | |
| 209 | } else { | ||
| 210 | 18049238 | current_job = traversal->pre_job_queue_.PopFront(); | |
| 211 | // NULL means the master thread tells us to finish | ||
| 212 |
2/2✓ Branch 1 taken 4545 times.
✓ Branch 2 taken 18045085 times.
|
18049826 | if (current_job->hash.IsNull()) { |
| 213 |
1/2✓ Branch 0 taken 4545 times.
✗ Branch 1 not taken.
|
4545 | delete current_job; |
| 214 | 4496 | break; | |
| 215 | } | ||
| 216 | 18045085 | traversal->ProcessJobPre(current_job); | |
| 217 | } | ||
| 218 | } | ||
| 219 | 4496 | return NULL; | |
| 220 | } | ||
| 221 | |||
| 222 | 3859 | void NotifyFinished() { | |
| 223 |
1/2✓ Branch 1 taken 3859 times.
✗ Branch 2 not taken.
|
3859 | shash::Any null_hash; |
| 224 | 3859 | null_hash.SetNull(); | |
| 225 |
2/2✓ Branch 0 taken 4545 times.
✓ Branch 1 taken 3859 times.
|
8404 | for (unsigned i = 0; i < num_threads_; ++i) { |
| 226 |
3/6✓ Branch 2 taken 4545 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4545 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 4545 times.
✗ Branch 9 not taken.
|
4545 | CatalogJob *job = new CatalogJob("", null_hash, 0, 0); |
| 227 |
1/2✓ Branch 1 taken 4545 times.
✗ Branch 2 not taken.
|
4545 | pre_job_queue_.EnqueueFront(job); |
| 228 | } | ||
| 229 | 3859 | } | |
| 230 | |||
| 231 | 2443 | void PushJob(CatalogJob *job) { | |
| 232 | 2443 | MutexLockGuard m(&catalogs_lock_); | |
| 233 |
1/2✓ Branch 1 taken 2443 times.
✗ Branch 2 not taken.
|
2443 | PushJobUnlocked(job); |
| 234 | 2443 | } | |
| 235 | |||
| 236 | 18045624 | void PushJobUnlocked(CatalogJob *job) { | |
| 237 | 18045624 | catalogs_processing_.Insert(job->hash, job); | |
| 238 | 18045624 | pre_job_queue_.EnqueueFront(job); | |
| 239 | 18045624 | } | |
| 240 | |||
| 241 | 18044693 | void ProcessJobPre(CatalogJob *job) { | |
| 242 |
4/6✓ Branch 0 taken 18044693 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18039107 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 49 times.
✓ Branch 6 taken 18039058 times.
|
18044693 | if (!this->PrepareCatalog(job)) { |
| 243 | 49 | atomic_inc32(&num_errors_); | |
| 244 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | NotifyFinished(); |
| 245 | 16376518 | return; | |
| 246 | } | ||
| 247 |
2/2✓ Branch 0 taken 258 times.
✓ Branch 1 taken 18038800 times.
|
18039058 | if (job->ignore) { |
| 248 |
1/2✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
|
258 | FinalizeJob(job); |
| 249 | 258 | return; | |
| 250 | } | ||
| 251 |
1/2✓ Branch 1 taken 17991711 times.
✗ Branch 2 not taken.
|
18038800 | NestedCatalogList catalog_list = job->catalog->ListOwnNestedCatalogs(); |
| 252 | unsigned int num_children; | ||
| 253 | // Ensure that pushed children won't call ProcessJobPost on this job | ||
| 254 | // before this function finishes | ||
| 255 | { | ||
| 256 | 17991711 | MutexLockGuard m(&catalogs_lock_); | |
| 257 |
2/2✓ Branch 0 taken 9044662 times.
✓ Branch 1 taken 9000606 times.
|
18045268 | if (effective_traversal_type_ == Base::kBreadthFirst) { |
| 258 |
1/2✓ Branch 1 taken 9044662 times.
✗ Branch 2 not taken.
|
9044662 | num_children = PushPreviousRevision(job) |
| 259 |
1/2✓ Branch 1 taken 9044662 times.
✗ Branch 2 not taken.
|
9044662 | + PushNestedCatalogs(job, catalog_list); |
| 260 | } else { | ||
| 261 |
1/2✓ Branch 1 taken 9000606 times.
✗ Branch 2 not taken.
|
9000606 | num_children = PushNestedCatalogs(job, catalog_list) |
| 262 |
1/2✓ Branch 1 taken 9000606 times.
✗ Branch 2 not taken.
|
9000606 | + PushPreviousRevision(job); |
| 263 | 9000606 | atomic_write32(&job->children_unprocessed, num_children); | |
| 264 | } | ||
| 265 |
3/6✓ Branch 0 taken 18045268 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 18045268 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 18045268 times.
|
18045268 | if (!this->CloseCatalog(false, job)) { |
| 266 | ✗ | atomic_inc32(&num_errors_); | |
| 267 | ✗ | NotifyFinished(); | |
| 268 | } | ||
| 269 | 18045268 | } | |
| 270 | |||
| 271 | // breadth-first: can post-process immediately | ||
| 272 | // depth-first: no children -> can post-process immediately | ||
| 273 |
4/4✓ Branch 0 taken 9000459 times.
✓ Branch 1 taken 9044613 times.
✓ Branch 2 taken 7335665 times.
✓ Branch 3 taken 1664794 times.
|
18045072 | if (effective_traversal_type_ == Base::kBreadthFirst || num_children == 0) { |
| 274 |
1/2✓ Branch 1 taken 16375868 times.
✗ Branch 2 not taken.
|
16380278 | ProcessJobPost(job); |
| 275 | 16375868 | return; | |
| 276 | } | ||
| 277 |
2/2✓ Branch 1 taken 1664794 times.
✓ Branch 2 taken 16376211 times.
|
18040662 | } |
| 278 | |||
| 279 | 18045268 | unsigned int PushNestedCatalogs(CatalogJob *job, | |
| 280 | const NestedCatalogList &catalog_list) { | ||
| 281 | 18045268 | typename NestedCatalogList::const_iterator i = catalog_list.begin(); | |
| 282 | 18045268 | typename NestedCatalogList::const_iterator iend = catalog_list.end(); | |
| 283 | 18045268 | unsigned int num_children = 0; | |
| 284 |
2/2✓ Branch 2 taken 18040557 times.
✓ Branch 3 taken 18045268 times.
|
36085825 | for (; i != iend; ++i) { |
| 285 |
7/8✓ Branch 0 taken 28549 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 28549 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 3139 times.
✓ Branch 7 taken 25410 times.
✓ Branch 8 taken 3139 times.
✓ Branch 9 taken 18037418 times.
|
18040557 | if (this->no_repeat_history_ && catalogs_done_.Contains(i->hash)) { |
| 286 | 3139 | continue; | |
| 287 | } | ||
| 288 | |||
| 289 | CatalogJob *child; | ||
| 290 | 36074836 | if (!this->no_repeat_history_ | |
| 291 |
7/8✓ Branch 0 taken 25410 times.
✓ Branch 1 taken 18012008 times.
✓ Branch 4 taken 25410 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 25018 times.
✓ Branch 7 taken 392 times.
✓ Branch 8 taken 18037026 times.
✓ Branch 9 taken 392 times.
|
18037418 | || !catalogs_processing_.Lookup(i->hash, &child)) { |
| 292 |
2/2✓ Branch 0 taken 17965213 times.
✓ Branch 1 taken 71813 times.
|
18037026 | CatalogTN *parent = (this->no_close_) ? job->catalog : NULL; |
| 293 |
1/2✓ Branch 2 taken 18037026 times.
✗ Branch 3 not taken.
|
36074052 | child = new CatalogJob(i->mountpoint.ToString(), |
| 294 |
1/2✓ Branch 2 taken 18037026 times.
✗ Branch 3 not taken.
|
18037026 | i->hash, |
| 295 | 18037026 | job->tree_level + 1, | |
| 296 |
1/2✓ Branch 1 taken 18037026 times.
✗ Branch 2 not taken.
|
18037026 | job->history_depth, |
| 297 | parent); | ||
| 298 |
1/2✓ Branch 1 taken 18037026 times.
✗ Branch 2 not taken.
|
18037026 | PushJobUnlocked(child); |
| 299 | } | ||
| 300 | |||
| 301 |
2/2✓ Branch 0 taken 8998836 times.
✓ Branch 1 taken 9038582 times.
|
18037418 | if (effective_traversal_type_ == Base::kDepthFirst) { |
| 302 |
1/2✓ Branch 1 taken 8998836 times.
✗ Branch 2 not taken.
|
8998836 | child->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
| 303 | this, job); | ||
| 304 | } | ||
| 305 | 18037418 | ++num_children; | |
| 306 | } | ||
| 307 | 18045268 | return num_children; | |
| 308 | } | ||
| 309 | |||
| 310 | /** | ||
| 311 | * Pushes the previous revision of a root catalog. | ||
| 312 | * @return the number of catalogs pushed on the processing stack | ||
| 313 | */ | ||
| 314 | 18045268 | unsigned int PushPreviousRevision(CatalogJob *job) { | |
| 315 | // only root catalogs are used for entering a previous revision (graph) | ||
| 316 |
2/2✓ Branch 1 taken 18036891 times.
✓ Branch 2 taken 8377 times.
|
18045268 | if (!job->catalog->IsRoot()) { |
| 317 | 18036891 | return 0; | |
| 318 | } | ||
| 319 | |||
| 320 |
1/2✓ Branch 1 taken 8377 times.
✗ Branch 2 not taken.
|
8377 | const shash::Any previous_revision = job->catalog->GetPreviousRevision(); |
| 321 |
2/2✓ Branch 1 taken 787 times.
✓ Branch 2 taken 7590 times.
|
8377 | if (previous_revision.IsNull()) { |
| 322 | 787 | return 0; | |
| 323 | } | ||
| 324 | |||
| 325 | // check if the next deeper history level is actually requested | ||
| 326 |
3/4✓ Branch 1 taken 7590 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4129 times.
✓ Branch 4 taken 3461 times.
|
7590 | if (this->IsBelowPruningThresholds(*job, effective_history_depth_, |
| 327 | effective_timestamp_threshold_)) { | ||
| 328 | 4129 | return 0; | |
| 329 | } | ||
| 330 | |||
| 331 | 6922 | if (this->no_repeat_history_ | |
| 332 |
5/8✓ Branch 0 taken 2089 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 2089 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 2089 times.
✗ Branch 7 not taken.
✓ Branch 8 taken 3461 times.
|
3461 | && catalogs_done_.Contains(previous_revision)) { |
| 333 | ✗ | return 0; | |
| 334 | } | ||
| 335 | |||
| 336 | CatalogJob *prev_job; | ||
| 337 | 6922 | if (!this->no_repeat_history_ | |
| 338 |
5/8✓ Branch 0 taken 2089 times.
✓ Branch 1 taken 1372 times.
✓ Branch 3 taken 2089 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2089 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 3461 times.
✗ Branch 8 not taken.
|
3461 | || !catalogs_processing_.Lookup(previous_revision, &prev_job)) { |
| 339 |
2/4✓ Branch 2 taken 3461 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3461 times.
✗ Branch 6 not taken.
|
6922 | prev_job = new CatalogJob("", previous_revision, 0, |
| 340 |
1/2✓ Branch 1 taken 3461 times.
✗ Branch 2 not taken.
|
3461 | job->history_depth + 1); |
| 341 |
1/2✓ Branch 1 taken 3461 times.
✗ Branch 2 not taken.
|
3461 | PushJobUnlocked(prev_job); |
| 342 | } | ||
| 343 | |||
| 344 |
2/2✓ Branch 0 taken 735 times.
✓ Branch 1 taken 2726 times.
|
3461 | if (effective_traversal_type_ == Base::kDepthFirst) { |
| 345 |
1/2✓ Branch 1 taken 735 times.
✗ Branch 2 not taken.
|
735 | prev_job->RegisterListener(&CatalogTraversalParallel::OnChildFinished, |
| 346 | this, job); | ||
| 347 | } | ||
| 348 | 3461 | return 1; | |
| 349 | } | ||
| 350 | |||
| 351 | 18042769 | void ProcessJobPost(CatalogJob *job) { | |
| 352 | // Save time by keeping catalog open when suitable | ||
| 353 |
1/2✓ Branch 0 taken 18042769 times.
✗ Branch 1 not taken.
|
18042769 | if (job->catalog == NULL) { |
| 354 |
2/4✓ Branch 0 taken 18042769 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 17990437 times.
|
18042769 | if (!this->ReopenCatalog(job)) { |
| 355 | ✗ | atomic_inc32(&num_errors_); | |
| 356 | ✗ | NotifyFinished(); | |
| 357 | ✗ | return; | |
| 358 | } | ||
| 359 | } | ||
| 360 |
1/2✓ Branch 0 taken 17990437 times.
✗ Branch 1 not taken.
|
17990437 | if (serialize_callbacks_) { |
| 361 | 17990437 | MutexLockGuard m(&catalog_callback_lock_); | |
| 362 |
2/4✓ Branch 1 taken 18045268 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18045268 times.
✗ Branch 5 not taken.
|
18045268 | this->NotifyListeners(job->GetCallbackData()); |
| 363 | 18045268 | } else { | |
| 364 | ✗ | this->NotifyListeners(job->GetCallbackData()); | |
| 365 | } | ||
| 366 |
2/2✓ Branch 0 taken 79810 times.
✓ Branch 1 taken 17965409 times.
|
18045219 | if (!this->no_close_) { |
| 367 |
2/4✓ Branch 0 taken 79810 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 79810 times.
|
79810 | if (!this->CloseCatalog(true, job)) { |
| 368 | ✗ | atomic_inc32(&num_errors_); | |
| 369 | ✗ | NotifyFinished(); | |
| 370 | ✗ | return; | |
| 371 | } | ||
| 372 | } | ||
| 373 | 18045219 | FinalizeJob(job); | |
| 374 | } | ||
| 375 | |||
| 376 | 18045379 | void FinalizeJob(CatalogJob *job) { | |
| 377 | { | ||
| 378 | 18045379 | MutexLockGuard m(&catalogs_lock_); | |
| 379 |
1/2✓ Branch 1 taken 18045526 times.
✗ Branch 2 not taken.
|
18045526 | catalogs_processing_.Erase(job->hash); |
| 380 |
1/2✓ Branch 1 taken 18045526 times.
✗ Branch 2 not taken.
|
18045526 | catalogs_done_.Insert(job->hash, true); |
| 381 | // No more catalogs to process -> finish | ||
| 382 |
1/2✓ Branch 2 taken 3810 times.
✗ Branch 3 not taken.
|
18049336 | if (catalogs_processing_.size() == 0 && pre_job_queue_.IsEmpty() |
| 383 |
5/6✓ Branch 0 taken 3810 times.
✓ Branch 1 taken 18041716 times.
✓ Branch 3 taken 3810 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 3810 times.
✓ Branch 6 taken 18041716 times.
|
18049336 | && post_job_queue_.IsEmpty()) { |
| 384 |
1/2✓ Branch 1 taken 3810 times.
✗ Branch 2 not taken.
|
3810 | NotifyFinished(); |
| 385 | } | ||
| 386 | 18045526 | } | |
| 387 |
2/2✓ Branch 0 taken 9000337 times.
✓ Branch 1 taken 9045042 times.
|
18045379 | if (effective_traversal_type_ == Base::kDepthFirst) { |
| 388 | 9000337 | job->WakeParents(); | |
| 389 | } | ||
| 390 |
2/2✓ Branch 0 taken 18044105 times.
✓ Branch 1 taken 490 times.
|
18044595 | delete job; |
| 391 | 18042292 | } | |
| 392 | |||
| 393 | 8998787 | void OnChildFinished(const int &a, CatalogJob *job) { | |
| 394 | // atomic_xadd32 returns value before subtraction -> needs to equal 1 | ||
| 395 |
2/2✓ Branch 1 taken 1664794 times.
✓ Branch 2 taken 7334728 times.
|
8998787 | if (atomic_xadd32(&job->children_unprocessed, -1) == 1) { |
| 396 | 1664794 | post_job_queue_.EnqueueFront(job); | |
| 397 | } | ||
| 398 | 8999522 | } | |
| 399 | |||
| 400 | unsigned int num_threads_; | ||
| 401 | bool serialize_callbacks_; | ||
| 402 | |||
| 403 | uint64_t effective_history_depth_; | ||
| 404 | time_t effective_timestamp_threshold_; | ||
| 405 | TraversalType effective_traversal_type_; | ||
| 406 | |||
| 407 | pthread_t *threads_process_; | ||
| 408 | atomic_int32 num_errors_; | ||
| 409 | |||
| 410 | Tube<CatalogJob> pre_job_queue_; | ||
| 411 | Tube<CatalogJob> post_job_queue_; | ||
| 412 | SmallHashDynamic<shash::Any, CatalogJob *> catalogs_processing_; | ||
| 413 | SmallHashDynamic<shash::Any, bool> catalogs_done_; | ||
| 414 | pthread_mutex_t catalogs_lock_; | ||
| 415 | |||
| 416 | pthread_mutex_t catalog_callback_lock_; | ||
| 417 | }; | ||
| 418 | |||
| 419 | } // namespace swissknife | ||
| 420 | |||
| 421 | #endif // CVMFS_CATALOG_TRAVERSAL_PARALLEL_H_ | ||
| 422 |