| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/bundle_mgr.cc |
| Date: | 2026-06-21 02:37:04 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 57 | 167 | 34.1% |
| Branches: | 47 | 242 | 19.4% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #include "bundle_mgr.h" | ||
| 6 | |||
| 7 | #include <fcntl.h> | ||
| 8 | #include <pthread.h> | ||
| 9 | #include <unistd.h> | ||
| 10 | |||
| 11 | #include <cassert> | ||
| 12 | #include <cerrno> | ||
| 13 | #include <cstdlib> | ||
| 14 | #include <memory> | ||
| 15 | #include <string> | ||
| 16 | #include <vector> | ||
| 17 | |||
| 18 | #include "cache.h" | ||
| 19 | #include "catalog_mgr_client.h" | ||
| 20 | #include "fetch.h" | ||
| 21 | #include "json_document.h" | ||
| 22 | #include "mountpoint.h" | ||
| 23 | #include "options.h" | ||
| 24 | #include "shortstring.h" | ||
| 25 | #include "util/posix.h" | ||
| 26 | |||
| 27 | namespace { | ||
| 28 | constexpr size_t kDefaultBundlePoolSize = 8; | ||
| 29 | |||
| 30 | // Read the .cvmfsbundle.<basename> file via the cvmfs cache | ||
| 31 | 78 | BundleFileMgr *LoadBundleFromCvmfs(MountPoint *mp, | |
| 32 | const PathString &bundle_file_path) { | ||
| 33 |
1/2✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
|
78 | catalog::DirectoryEntry dirent; |
| 34 |
2/4✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 78 times.
|
78 | if (!mp->catalog_mgr()->LookupPath(bundle_file_path, catalog::kLookupDefault, |
| 35 | &dirent)) { | ||
| 36 | ✗ | LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: LookupPath failed for %s", | |
| 37 | ✗ | bundle_file_path.ToString().c_str()); | |
| 38 | ✗ | return nullptr; | |
| 39 | } | ||
| 40 | 78 | cvmfs::Fetcher *fetcher = mp->fetcher(); | |
| 41 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
|
78 | if (fetcher == nullptr) { |
| 42 | ✗ | LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: fetcher is null"); | |
| 43 | ✗ | return nullptr; | |
| 44 | } | ||
| 45 | |||
| 46 | 78 | CacheManager::Label label; | |
| 47 |
1/2✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
|
78 | label.path = bundle_file_path.ToString(); |
| 48 |
1/2✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
|
78 | label.size = dirent.size(); |
| 49 | 78 | label.zip_algorithm = dirent.compression_algorithm(); | |
| 50 |
2/4✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 78 times.
✗ Branch 6 not taken.
|
234 | const int fd = fetcher->Fetch( |
| 51 |
1/2✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
|
156 | CacheManager::LabeledObject(dirent.checksum(), label)); |
| 52 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
|
78 | if (fd < 0) { |
| 53 | ✗ | LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: Fetch returned fd=%d", fd); | |
| 54 | ✗ | return nullptr; | |
| 55 | } | ||
| 56 | |||
| 57 | 78 | CacheManager *cache_mgr = mp->file_system()->cache_mgr(); | |
| 58 | 78 | std::string content; | |
| 59 |
2/4✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
|
78 | content.resize(static_cast<size_t>(dirent.size())); |
| 60 |
2/4✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 78 times.
✗ Branch 6 not taken.
|
78 | const int64_t n = cache_mgr->Pread(fd, &content[0], content.size(), 0); |
| 61 |
1/2✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
|
78 | cache_mgr->Close(fd); |
| 62 |
5/6✓ Branch 0 taken 26 times.
✓ Branch 1 taken 52 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 26 times.
✓ Branch 5 taken 52 times.
✓ Branch 6 taken 26 times.
|
78 | if (n < 0 || static_cast<size_t>(n) != content.size()) { |
| 63 |
1/2✓ Branch 2 taken 52 times.
✗ Branch 3 not taken.
|
52 | LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: Pread returned %ld want %zu", |
| 64 | static_cast<long>(n), content.size()); | ||
| 65 | 52 | return nullptr; | |
| 66 | } | ||
| 67 | |||
| 68 | // The bundle file may start with a "#%CVMFS_BUNDLE version=..." header | ||
| 69 | // line (per file_bundle.h); strip any leading lines beginning with '#' | ||
| 70 | // before handing off to the strict JSON parser. | ||
| 71 | 26 | size_t json_start = 0; | |
| 72 |
2/8✗ Branch 1 not taken.
✓ Branch 2 taken 26 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 26 times.
|
26 | while (json_start < content.size() && content[json_start] == '#') { |
| 73 | ✗ | const size_t nl = content.find('\n', json_start); | |
| 74 | ✗ | if (nl == std::string::npos) { | |
| 75 | ✗ | json_start = content.size(); | |
| 76 | ✗ | break; | |
| 77 | } | ||
| 78 | ✗ | json_start = nl + 1; | |
| 79 | } | ||
| 80 | const std::string json_text = (json_start == 0) ? content | ||
| 81 |
2/6✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
|
26 | : content.substr(json_start); |
| 82 | |||
| 83 |
1/2✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
|
26 | JsonDocument *doc = JsonDocument::Create(json_text); |
| 84 |
1/2✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
|
26 | if (doc == nullptr) { |
| 85 |
1/2✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
|
26 | LogCvmfs(kLogCvmfs, kLogDebug, |
| 86 | "BUNDLE-LOAD: JsonDocument::Create failed (size=%zu)", | ||
| 87 | json_text.size()); | ||
| 88 | 26 | return nullptr; | |
| 89 | } | ||
| 90 | ✗ | LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: loaded bundle %s (%zu bytes)", | |
| 91 | ✗ | bundle_file_path.ToString().c_str(), content.size()); | |
| 92 | ✗ | return new BundleFileMgr(doc); | |
| 93 | 78 | } | |
| 94 | } // namespace | ||
| 95 | |||
| 96 | 78 | BundleMgr::BundleMgr(MountPoint *mp, const PathString &path) | |
| 97 | 78 | : mount_point_(mp) | |
| 98 | 78 | , path_(path) | |
| 99 | 78 | , fetcher_threads_() | |
| 100 | 78 | , pool_size_(kDefaultBundlePoolSize) { | |
| 101 |
2/4✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
|
78 | fname_ = GetFileName(path_); |
| 102 |
2/4✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
|
78 | parent_path_ = GetParentPath(path_); |
| 103 | // There is a naming convention regarding the name of the file with the | ||
| 104 | // contents of the bundle | ||
| 105 |
3/6✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 78 times.
✗ Branch 8 not taken.
|
312 | bundle_file_path_ = PathString(parent_path_.ToString() + "/.cvmfsbundle." |
| 106 |
3/6✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 78 times.
✗ Branch 8 not taken.
|
390 | + fname_.ToString()); |
| 107 | |||
| 108 | 78 | pipe_bm_[0] = pipe_bm_[1] = -1; | |
| 109 | 78 | pthread_mutex_init(&worker_read_mutex_, nullptr); | |
| 110 | |||
| 111 |
1/2✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
|
78 | bfm_ = LoadBundleFromCvmfs(mount_point_, bundle_file_path_); |
| 112 |
1/2✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
|
78 | if (bfm_ == nullptr) { |
| 113 |
1/2✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
|
78 | LogCvmfs(kLogCvmfs, kLogDebug, "BundleMgr: failed to load bundle file %s", |
| 114 |
1/2✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
|
156 | bundle_file_path_.ToString().c_str()); |
| 115 | 78 | is_valid_ = false; | |
| 116 | 78 | return; | |
| 117 | } | ||
| 118 | |||
| 119 | // Pool size override via CVMFS_BUNDLE_POOL_SIZE | ||
| 120 | ✗ | if (mount_point_ != nullptr && mount_point_->file_system() != nullptr | |
| 121 | ✗ | && mount_point_->file_system()->options_mgr() != nullptr) { | |
| 122 | ✗ | std::string opt; | |
| 123 | ✗ | if (mount_point_->file_system()->options_mgr()->GetValue( | |
| 124 | "CVMFS_BUNDLE_POOL_SIZE", &opt)) { | ||
| 125 | ✗ | char *end = nullptr; | |
| 126 | ✗ | const unsigned long n = std::strtoul(opt.c_str(), &end, 10); | |
| 127 | ✗ | if (end != opt.c_str() && n >= 1) { | |
| 128 | ✗ | pool_size_ = static_cast<size_t>(n); | |
| 129 | } | ||
| 130 | } | ||
| 131 | } | ||
| 132 | |||
| 133 | ✗ | SpawnFetcherPool(); | |
| 134 | } | ||
| 135 | |||
| 136 | 26 | void BundleMgr::Fetch() { | |
| 137 |
1/2✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
|
26 | if (not is_valid_) { |
| 138 | 26 | LogCvmfs(kLogBundleMgr, | |
| 139 | kLogDebug, | ||
| 140 | "BundleMgr is not in a valid state. Can't fetch!"); | ||
| 141 | 26 | return; | |
| 142 | } | ||
| 143 | |||
| 144 | ✗ | while (auto file = bfm_->GetNext()) { | |
| 145 | // A TrySendPath() here is used as a profylaxis to a scenario where the pipe | ||
| 146 | // is currently blocked. | ||
| 147 | ✗ | while (not TrySendPath(back_channel_, file)) { | |
| 148 | } | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | 78 | void BundleMgr::JoinFetcherPool() { | |
| 153 |
1/2✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
|
78 | if (pipe_bm_[1] < 0) |
| 154 | 78 | return; | |
| 155 | // Send one kTerminate per worker. Workers drain all queued kFetch | ||
| 156 | // messages before reaching their kTerminate (FIFO pipe), so we can't | ||
| 157 | // just close the pipe — that would EOF some workers mid-drain. | ||
| 158 | ✗ | for (size_t i = 0; i < fetcher_threads_.size(); ++i) { | |
| 159 | ✗ | Command cmd = Command::kTerminate; | |
| 160 | while (true) { | ||
| 161 | ✗ | const ssize_t n = ::write(pipe_bm_[1], &cmd, sizeof(Command)); | |
| 162 | ✗ | if (n == sizeof(Command)) | |
| 163 | ✗ | break; | |
| 164 | ✗ | if (errno != EAGAIN && errno != EWOULDBLOCK) | |
| 165 | ✗ | break; | |
| 166 | } | ||
| 167 | } | ||
| 168 | // Wait for every worker to drain its share of the queue and exit. | ||
| 169 | ✗ | for (auto &t : fetcher_threads_) { | |
| 170 | ✗ | pthread_join(*t, nullptr); | |
| 171 | } | ||
| 172 | ✗ | ClosePipe(pipe_bm_); | |
| 173 | } | ||
| 174 | |||
| 175 | ✗ | void BundleMgr::SpawnFetcherPool() { | |
| 176 | ✗ | MakePipe(pipe_bm_); | |
| 177 | ✗ | back_channel_ = pipe_bm_[1]; | |
| 178 | |||
| 179 | // Non-blocking writes on the work-queue pipe so TrySendPath can poll. | ||
| 180 | // Per pipe(7), writes <= PIPE_BUF are atomic on non-blocking pipes: | ||
| 181 | // they either fully succeed or fail with EAGAIN. | ||
| 182 | ✗ | const int flags = fcntl(back_channel_, F_GETFL); | |
| 183 | ✗ | fcntl(back_channel_, F_SETFL, flags | O_NONBLOCK); | |
| 184 | |||
| 185 | ✗ | for (size_t i = 0; i < pool_size_; ++i) { | |
| 186 | ✗ | std::unique_ptr<pthread_t> thread(new pthread_t()); | |
| 187 | ✗ | const int res = pthread_create(thread.get(), nullptr, MainBundleMgrFetcher, | |
| 188 | this); | ||
| 189 | ✗ | if (res != 0) { | |
| 190 | ✗ | LogCvmfs(kLogBundleMgr, kLogDebug, | |
| 191 | "Thread creation failed! pool_size_=%zu spawned=%zu", pool_size_, | ||
| 192 | i); | ||
| 193 | ✗ | is_valid_ = false; | |
| 194 | ✗ | return; | |
| 195 | } | ||
| 196 | ✗ | fetcher_threads_.emplace_back(std::move(thread)); | |
| 197 | } | ||
| 198 | } | ||
| 199 | |||
| 200 | ✗ | void BundleMgr::FetchPath(const PathString &path) { | |
| 201 | ✗ | catalog::DirectoryEntry dirent; | |
| 202 | ✗ | const bool found = mount_point_->catalog_mgr()->LookupPath( | |
| 203 | path, catalog::kLookupDefault, &dirent); | ||
| 204 | ✗ | cvmfs::Fetcher *this_fetcher = dirent.IsExternalFile() | |
| 205 | ✗ | ? mount_point_->external_fetcher() | |
| 206 | ✗ | : mount_point_->fetcher(); | |
| 207 | ✗ | if (not(found and this_fetcher)) { | |
| 208 | ✗ | LogCvmfs(kLogCvmfs, kLogDebug, | |
| 209 | "BUNDLE-FETCH: lookup failed for %s (found=%d)", | ||
| 210 | ✗ | path.ToString().c_str(), int(found)); | |
| 211 | ✗ | return; | |
| 212 | } | ||
| 213 | ✗ | LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-FETCH: prefetching %s", | |
| 214 | ✗ | path.ToString().c_str()); | |
| 215 | |||
| 216 | ✗ | CacheManager::Label label; | |
| 217 | ✗ | label.path = path.ToString(); | |
| 218 | ✗ | label.size = dirent.size(); | |
| 219 | ✗ | label.zip_algorithm = dirent.compression_algorithm(); | |
| 220 | ✗ | if (mount_point_->catalog_mgr()->volatile_flag()) | |
| 221 | ✗ | label.flags |= CacheManager::kLabelVolatile; | |
| 222 | ✗ | if (dirent.IsExternalFile()) | |
| 223 | ✗ | label.flags |= CacheManager::kLabelExternal; | |
| 224 | ✗ | this_fetcher->Fetch(CacheManager::LabeledObject(dirent.checksum(), label)); | |
| 225 | } | ||
| 226 | |||
| 227 | ✗ | void *BundleMgr::MainBundleMgrFetcher(void *data) { | |
| 228 | #ifndef __APPLE__ | ||
| 229 | ✗ | pthread_setname_np(pthread_self(), "bm_fetcher"); | |
| 230 | #endif | ||
| 231 | ✗ | BundleMgr *mgr = static_cast<BundleMgr *>(data); | |
| 232 | ✗ | const int rfd = mgr->pipe_bm_[0]; | |
| 233 | |||
| 234 | while (true) { | ||
| 235 | ✗ | Command cmd = Command::kTerminate; | |
| 236 | ✗ | PathString path; | |
| 237 | ✗ | bool got_path = false; | |
| 238 | ✗ | bool eof = false; | |
| 239 | |||
| 240 | // Atomically receive cmd + (optional) path payload. The whole receipt | ||
| 241 | // is under worker_read_mutex_ so messages aren't interleaved between | ||
| 242 | // workers reading from the shared pipe. | ||
| 243 | ✗ | pthread_mutex_lock(&mgr->worker_read_mutex_); | |
| 244 | ✗ | const ssize_t n = read(rfd, &cmd, sizeof(Command)); | |
| 245 | ✗ | if (n != static_cast<ssize_t>(sizeof(Command))) { | |
| 246 | ✗ | eof = true; | |
| 247 | ✗ | } else if (cmd == Command::kFetch) { | |
| 248 | ✗ | path = mgr->ReceivePath(rfd); | |
| 249 | ✗ | got_path = true; | |
| 250 | } | ||
| 251 | ✗ | pthread_mutex_unlock(&mgr->worker_read_mutex_); | |
| 252 | |||
| 253 | ✗ | if (eof) | |
| 254 | ✗ | break; | |
| 255 | |||
| 256 | ✗ | bool terminate = false; | |
| 257 | ✗ | switch (cmd) { | |
| 258 | ✗ | case Command::kFetch: { | |
| 259 | ✗ | if (!got_path) { | |
| 260 | ✗ | terminate = true; | |
| 261 | ✗ | break; | |
| 262 | } | ||
| 263 | ✗ | mgr->FetchPath(path); | |
| 264 | ✗ | } break; | |
| 265 | ✗ | case Command::kTerminate: | |
| 266 | default: | ||
| 267 | ✗ | terminate = true; | |
| 268 | ✗ | break; | |
| 269 | } | ||
| 270 | ✗ | if (terminate) { | |
| 271 | ✗ | break; | |
| 272 | } | ||
| 273 | } | ||
| 274 | |||
| 275 | ✗ | pthread_exit(nullptr); | |
| 276 | } | ||
| 277 | |||
| 278 | 26 | PathString BundleMgr::ReceivePath(int fd) const { | |
| 279 |
1/2✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
|
26 | const std::string buffer = BlockingReceive(fd); |
| 280 |
1/2✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
|
26 | assert(buffer.size() > 0 && "A path can't be empty"); |
| 281 |
1/2✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
|
52 | return PathString(buffer); |
| 282 | 26 | } | |
| 283 | |||
| 284 | ✗ | bool BundleMgr::TrySendPath(int fd, const PathString &path) const { | |
| 285 | ✗ | Command cmd = Command::kFetch; | |
| 286 | ✗ | if ((write(fd, &cmd, sizeof(Command))) != sizeof(Command)) { | |
| 287 | ✗ | if (not(errno == EAGAIN || errno == EWOULDBLOCK)) { | |
| 288 | ✗ | LogCvmfs(kLogBundleMgr, | |
| 289 | kLogDebug, | ||
| 290 | "write() on back channel failed unexpectedly"); | ||
| 291 | } | ||
| 292 | ✗ | return false; | |
| 293 | } else { | ||
| 294 | ✗ | BlockingSend(fd, path); | |
| 295 | } | ||
| 296 | ✗ | return true; | |
| 297 | } | ||
| 298 | |||
| 299 |