GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/bundle_mgr.cc
Date: 2026-06-21 02:37:04
Exec Total Coverage
Lines: 57 167 34.1%
Branches: 47 242 19.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "bundle_mgr.h"
6
7 #include <fcntl.h>
8 #include <pthread.h>
9 #include <unistd.h>
10
11 #include <cassert>
12 #include <cerrno>
13 #include <cstdlib>
14 #include <memory>
15 #include <string>
16 #include <vector>
17
18 #include "cache.h"
19 #include "catalog_mgr_client.h"
20 #include "fetch.h"
21 #include "json_document.h"
22 #include "mountpoint.h"
23 #include "options.h"
24 #include "shortstring.h"
25 #include "util/posix.h"
26
27 namespace {
28 constexpr size_t kDefaultBundlePoolSize = 8;
29
30 // Read the .cvmfsbundle.<basename> file via the cvmfs cache
31 78 BundleFileMgr *LoadBundleFromCvmfs(MountPoint *mp,
32 const PathString &bundle_file_path) {
33
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 catalog::DirectoryEntry dirent;
34
2/4
✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 78 times.
78 if (!mp->catalog_mgr()->LookupPath(bundle_file_path, catalog::kLookupDefault,
35 &dirent)) {
36 LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: LookupPath failed for %s",
37 bundle_file_path.ToString().c_str());
38 return nullptr;
39 }
40 78 cvmfs::Fetcher *fetcher = mp->fetcher();
41
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
78 if (fetcher == nullptr) {
42 LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: fetcher is null");
43 return nullptr;
44 }
45
46 78 CacheManager::Label label;
47
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 label.path = bundle_file_path.ToString();
48
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 label.size = dirent.size();
49 78 label.zip_algorithm = dirent.compression_algorithm();
50
2/4
✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 78 times.
✗ Branch 6 not taken.
234 const int fd = fetcher->Fetch(
51
1/2
✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
156 CacheManager::LabeledObject(dirent.checksum(), label));
52
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
78 if (fd < 0) {
53 LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: Fetch returned fd=%d", fd);
54 return nullptr;
55 }
56
57 78 CacheManager *cache_mgr = mp->file_system()->cache_mgr();
58 78 std::string content;
59
2/4
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
78 content.resize(static_cast<size_t>(dirent.size()));
60
2/4
✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 78 times.
✗ Branch 6 not taken.
78 const int64_t n = cache_mgr->Pread(fd, &content[0], content.size(), 0);
61
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 cache_mgr->Close(fd);
62
5/6
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 52 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 26 times.
✓ Branch 5 taken 52 times.
✓ Branch 6 taken 26 times.
78 if (n < 0 || static_cast<size_t>(n) != content.size()) {
63
1/2
✓ Branch 2 taken 52 times.
✗ Branch 3 not taken.
52 LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: Pread returned %ld want %zu",
64 static_cast<long>(n), content.size());
65 52 return nullptr;
66 }
67
68 // The bundle file may start with a "#%CVMFS_BUNDLE version=..." header
69 // line (per file_bundle.h); strip any leading lines beginning with '#'
70 // before handing off to the strict JSON parser.
71 26 size_t json_start = 0;
72
2/8
✗ Branch 1 not taken.
✓ Branch 2 taken 26 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 26 times.
26 while (json_start < content.size() && content[json_start] == '#') {
73 const size_t nl = content.find('\n', json_start);
74 if (nl == std::string::npos) {
75 json_start = content.size();
76 break;
77 }
78 json_start = nl + 1;
79 }
80 const std::string json_text = (json_start == 0) ? content
81
2/6
✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
26 : content.substr(json_start);
82
83
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 JsonDocument *doc = JsonDocument::Create(json_text);
84
1/2
✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
26 if (doc == nullptr) {
85
1/2
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
26 LogCvmfs(kLogCvmfs, kLogDebug,
86 "BUNDLE-LOAD: JsonDocument::Create failed (size=%zu)",
87 json_text.size());
88 26 return nullptr;
89 }
90 LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-LOAD: loaded bundle %s (%zu bytes)",
91 bundle_file_path.ToString().c_str(), content.size());
92 return new BundleFileMgr(doc);
93 78 }
94 } // namespace
95
96 78 BundleMgr::BundleMgr(MountPoint *mp, const PathString &path)
97 78 : mount_point_(mp)
98 78 , path_(path)
99 78 , fetcher_threads_()
100 78 , pool_size_(kDefaultBundlePoolSize) {
101
2/4
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
78 fname_ = GetFileName(path_);
102
2/4
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
78 parent_path_ = GetParentPath(path_);
103 // There is a naming convention regarding the name of the file with the
104 // contents of the bundle
105
3/6
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 78 times.
✗ Branch 8 not taken.
312 bundle_file_path_ = PathString(parent_path_.ToString() + "/.cvmfsbundle."
106
3/6
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 78 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 78 times.
✗ Branch 8 not taken.
390 + fname_.ToString());
107
108 78 pipe_bm_[0] = pipe_bm_[1] = -1;
109 78 pthread_mutex_init(&worker_read_mutex_, nullptr);
110
111
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 bfm_ = LoadBundleFromCvmfs(mount_point_, bundle_file_path_);
112
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 if (bfm_ == nullptr) {
113
1/2
✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
78 LogCvmfs(kLogCvmfs, kLogDebug, "BundleMgr: failed to load bundle file %s",
114
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
156 bundle_file_path_.ToString().c_str());
115 78 is_valid_ = false;
116 78 return;
117 }
118
119 // Pool size override via CVMFS_BUNDLE_POOL_SIZE
120 if (mount_point_ != nullptr && mount_point_->file_system() != nullptr
121 && mount_point_->file_system()->options_mgr() != nullptr) {
122 std::string opt;
123 if (mount_point_->file_system()->options_mgr()->GetValue(
124 "CVMFS_BUNDLE_POOL_SIZE", &opt)) {
125 char *end = nullptr;
126 const unsigned long n = std::strtoul(opt.c_str(), &end, 10);
127 if (end != opt.c_str() && n >= 1) {
128 pool_size_ = static_cast<size_t>(n);
129 }
130 }
131 }
132
133 SpawnFetcherPool();
134 }
135
136 26 void BundleMgr::Fetch() {
137
1/2
✓ Branch 0 taken 26 times.
✗ Branch 1 not taken.
26 if (not is_valid_) {
138 26 LogCvmfs(kLogBundleMgr,
139 kLogDebug,
140 "BundleMgr is not in a valid state. Can't fetch!");
141 26 return;
142 }
143
144 while (auto file = bfm_->GetNext()) {
145 // A TrySendPath() here is used as a profylaxis to a scenario where the pipe
146 // is currently blocked.
147 while (not TrySendPath(back_channel_, file)) {
148 }
149 }
150 }
151
152 78 void BundleMgr::JoinFetcherPool() {
153
1/2
✓ Branch 0 taken 78 times.
✗ Branch 1 not taken.
78 if (pipe_bm_[1] < 0)
154 78 return;
155 // Send one kTerminate per worker. Workers drain all queued kFetch
156 // messages before reaching their kTerminate (FIFO pipe), so we can't
157 // just close the pipe — that would EOF some workers mid-drain.
158 for (size_t i = 0; i < fetcher_threads_.size(); ++i) {
159 Command cmd = Command::kTerminate;
160 while (true) {
161 const ssize_t n = ::write(pipe_bm_[1], &cmd, sizeof(Command));
162 if (n == sizeof(Command))
163 break;
164 if (errno != EAGAIN && errno != EWOULDBLOCK)
165 break;
166 }
167 }
168 // Wait for every worker to drain its share of the queue and exit.
169 for (auto &t : fetcher_threads_) {
170 pthread_join(*t, nullptr);
171 }
172 ClosePipe(pipe_bm_);
173 }
174
175 void BundleMgr::SpawnFetcherPool() {
176 MakePipe(pipe_bm_);
177 back_channel_ = pipe_bm_[1];
178
179 // Non-blocking writes on the work-queue pipe so TrySendPath can poll.
180 // Per pipe(7), writes <= PIPE_BUF are atomic on non-blocking pipes:
181 // they either fully succeed or fail with EAGAIN.
182 const int flags = fcntl(back_channel_, F_GETFL);
183 fcntl(back_channel_, F_SETFL, flags | O_NONBLOCK);
184
185 for (size_t i = 0; i < pool_size_; ++i) {
186 std::unique_ptr<pthread_t> thread(new pthread_t());
187 const int res = pthread_create(thread.get(), nullptr, MainBundleMgrFetcher,
188 this);
189 if (res != 0) {
190 LogCvmfs(kLogBundleMgr, kLogDebug,
191 "Thread creation failed! pool_size_=%zu spawned=%zu", pool_size_,
192 i);
193 is_valid_ = false;
194 return;
195 }
196 fetcher_threads_.emplace_back(std::move(thread));
197 }
198 }
199
200 void BundleMgr::FetchPath(const PathString &path) {
201 catalog::DirectoryEntry dirent;
202 const bool found = mount_point_->catalog_mgr()->LookupPath(
203 path, catalog::kLookupDefault, &dirent);
204 cvmfs::Fetcher *this_fetcher = dirent.IsExternalFile()
205 ? mount_point_->external_fetcher()
206 : mount_point_->fetcher();
207 if (not(found and this_fetcher)) {
208 LogCvmfs(kLogCvmfs, kLogDebug,
209 "BUNDLE-FETCH: lookup failed for %s (found=%d)",
210 path.ToString().c_str(), int(found));
211 return;
212 }
213 LogCvmfs(kLogCvmfs, kLogDebug, "BUNDLE-FETCH: prefetching %s",
214 path.ToString().c_str());
215
216 CacheManager::Label label;
217 label.path = path.ToString();
218 label.size = dirent.size();
219 label.zip_algorithm = dirent.compression_algorithm();
220 if (mount_point_->catalog_mgr()->volatile_flag())
221 label.flags |= CacheManager::kLabelVolatile;
222 if (dirent.IsExternalFile())
223 label.flags |= CacheManager::kLabelExternal;
224 this_fetcher->Fetch(CacheManager::LabeledObject(dirent.checksum(), label));
225 }
226
227 void *BundleMgr::MainBundleMgrFetcher(void *data) {
228 #ifndef __APPLE__
229 pthread_setname_np(pthread_self(), "bm_fetcher");
230 #endif
231 BundleMgr *mgr = static_cast<BundleMgr *>(data);
232 const int rfd = mgr->pipe_bm_[0];
233
234 while (true) {
235 Command cmd = Command::kTerminate;
236 PathString path;
237 bool got_path = false;
238 bool eof = false;
239
240 // Atomically receive cmd + (optional) path payload. The whole receipt
241 // is under worker_read_mutex_ so messages aren't interleaved between
242 // workers reading from the shared pipe.
243 pthread_mutex_lock(&mgr->worker_read_mutex_);
244 const ssize_t n = read(rfd, &cmd, sizeof(Command));
245 if (n != static_cast<ssize_t>(sizeof(Command))) {
246 eof = true;
247 } else if (cmd == Command::kFetch) {
248 path = mgr->ReceivePath(rfd);
249 got_path = true;
250 }
251 pthread_mutex_unlock(&mgr->worker_read_mutex_);
252
253 if (eof)
254 break;
255
256 bool terminate = false;
257 switch (cmd) {
258 case Command::kFetch: {
259 if (!got_path) {
260 terminate = true;
261 break;
262 }
263 mgr->FetchPath(path);
264 } break;
265 case Command::kTerminate:
266 default:
267 terminate = true;
268 break;
269 }
270 if (terminate) {
271 break;
272 }
273 }
274
275 pthread_exit(nullptr);
276 }
277
278 26 PathString BundleMgr::ReceivePath(int fd) const {
279
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 const std::string buffer = BlockingReceive(fd);
280
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 assert(buffer.size() > 0 && "A path can't be empty");
281
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
52 return PathString(buffer);
282 26 }
283
284 bool BundleMgr::TrySendPath(int fd, const PathString &path) const {
285 Command cmd = Command::kFetch;
286 if ((write(fd, &cmd, sizeof(Command))) != sizeof(Command)) {
287 if (not(errno == EAGAIN || errno == EWOULDBLOCK)) {
288 LogCvmfs(kLogBundleMgr,
289 kLogDebug,
290 "write() on back channel failed unexpectedly");
291 }
292 return false;
293 } else {
294 BlockingSend(fd, path);
295 }
296 return true;
297 }
298
299