GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_stream.cc
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 101 190 53.2%
Branches: 49 186 26.3%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "cvmfs_config.h"
6 #include "cache_stream.h"
7
8 #include <algorithm>
9 #include <cstdlib>
10 #include <cstring>
11 #include <string>
12
13 #include "network/download.h"
14 #include "network/sink.h"
15 #include "quota.h"
16 #include "util/mutex.h"
17 #include "util/smalloc.h"
18
19
20 namespace {
21
22 class StreamingSink : public cvmfs::Sink {
23 public:
24 2 StreamingSink(void *buf, uint64_t size, uint64_t offset)
25 2 : Sink(false /* is_owner */)
26 2 , pos_(0)
27 2 , window_buf_(buf)
28 2 , window_size_(size)
29 2 , window_offset_(offset)
30 2 { }
31
32 4 virtual ~StreamingSink() {}
33
34 2 virtual int64_t Write(const void *buf, uint64_t sz) {
35 2 uint64_t old_pos = pos_;
36 2 pos_ += sz;
37
38
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
2 if (!window_buf_)
39 1 return static_cast<int64_t>(sz);
40
41
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (pos_ < window_offset_)
42 return static_cast<int64_t>(sz);
43
44
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (old_pos >= (window_offset_ + window_size_))
45 return static_cast<int64_t>(sz);
46
47 1 uint64_t copy_offset = std::max(old_pos, window_offset_);
48 1 uint64_t inbuf_offset = copy_offset - old_pos;
49 1 uint64_t outbuf_offset = copy_offset - window_offset_;
50 uint64_t copy_size =
51 1 std::min(sz - inbuf_offset, window_size_ - outbuf_offset);
52
53 1 memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
54 reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
55 copy_size);
56
57 1 return static_cast<int64_t>(sz);
58 }
59
60 virtual int Reset() {
61 pos_ = 0;
62 return 0;
63 }
64
65 virtual int Purge() { return Reset(); }
66 2 virtual bool IsValid() {
67
3/4
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
2 return (window_buf_ != NULL) || (window_size_ == 0);
68 }
69 2 virtual int Flush() { return 0; }
70 virtual bool Reserve(size_t /* size */) { return true; }
71 8 virtual bool RequiresReserve() { return false; }
72 virtual std::string Describe() {
73 std::string result = "Streaming sink that is ";
74 result += IsValid() ? "valid" : "invalid";
75 return result;
76 }
77
78 2 int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); }
79
80 private:
81 uint64_t pos_;
82 void *window_buf_;
83 uint64_t window_size_;
84 uint64_t window_offset_;
85 }; // class StreamingSink
86
87 } // anonymous namespace
88
89
90 2 download::DownloadManager *StreamingCacheManager::SelectDownloadManager(
91 const FdInfo &info)
92 {
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (info.label.IsExternal())
94 return external_download_mgr_;
95 2 return regular_download_mgr_;
96 }
97
98 2 int64_t StreamingCacheManager::Stream(
99 const FdInfo &info,
100 void *buf,
101 uint64_t size,
102 uint64_t offset)
103 {
104 2 StreamingSink sink(buf, size, offset);
105 2 std::string url;
106
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (info.label.IsExternal()) {
107 url = info.label.path;
108 } else {
109
2/4
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 url = "/data/" + info.object_id.MakePath();
110 }
111 2 bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault;
112
113 download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */,
114
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 &info.object_id, &sink);
115 2 download_job.SetExtraInfo(&info.label.path);
116 2 download_job.SetRangeOffset(info.label.range_offset);
117 2 download_job.SetRangeSize(static_cast<int64_t>(info.label.size));
118
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
2 SelectDownloadManager(info)->Fetch(&download_job);
119
120
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (download_job.error_code() != download::kFailOk) {
121 return -EIO;
122 }
123
124 2 return sink.GetNBytesStreamed();
125 2 }
126
127
128 1 StreamingCacheManager::StreamingCacheManager(
129 unsigned max_open_fds,
130 CacheManager *cache_mgr,
131 download::DownloadManager *regular_download_mgr,
132 1 download::DownloadManager *external_download_mgr)
133 1 : cache_mgr_(cache_mgr)
134 1 , regular_download_mgr_(regular_download_mgr)
135 1 , external_download_mgr_(external_download_mgr)
136
3/6
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1 times.
✗ Branch 9 not taken.
1 , fd_table_(max_open_fds, FdInfo())
137 {
138 1 lock_fd_table_ =
139 1 reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
140 1 int retval = pthread_mutex_init(lock_fd_table_, NULL);
141
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 assert(retval == 0);
142
143
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 delete quota_mgr_;
144 1 quota_mgr_ = cache_mgr_->quota_mgr();
145 1 }
146
147 4 StreamingCacheManager::~StreamingCacheManager() {
148 2 pthread_mutex_destroy(lock_fd_table_);
149 2 free(lock_fd_table_);
150 2 quota_mgr_ = NULL; // gets deleted by cache_mgr_
151 4 }
152
153 std::string StreamingCacheManager::Describe() {
154 return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe();
155 }
156
157 bool StreamingCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) {
158 bool result = cache_mgr_->AcquireQuotaManager(quota_mgr);
159 if (result)
160 quota_mgr_ = cache_mgr_->quota_mgr();
161 return result;
162 }
163
164 1 int StreamingCacheManager::Open(const LabeledObject &object) {
165
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 int fd_in_cache_mgr = cache_mgr_->Open(object);
166
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (fd_in_cache_mgr >= 0) {
167 MutexLockGuard lock_guard(lock_fd_table_);
168 return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
169 }
170
171
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (fd_in_cache_mgr != -ENOENT)
172 return fd_in_cache_mgr;
173
174
4/8
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 1 times.
2 if (object.label.IsCatalog() || object.label.IsPinned() ||
175 1 object.label.IsCertificate())
176 {
177 return -ENOENT;
178 }
179
180 1 MutexLockGuard lock_guard(lock_fd_table_);
181
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
1 return fd_table_.OpenFd(FdInfo(object));
182 1 }
183
184 int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) {
185 MutexLockGuard lock_guard(lock_fd_table_);
186 return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
187 }
188
189 1 int64_t StreamingCacheManager::GetSize(int fd) {
190
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 FdInfo info;
191 {
192 1 MutexLockGuard lock_guard(lock_fd_table_);
193
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 info = fd_table_.GetHandle(fd);
194 1 }
195
196
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
1 if (!info.IsValid())
197 return -EBADF;
198
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (info.fd_in_cache_mgr >= 0)
200 return cache_mgr_->GetSize(info.fd_in_cache_mgr);
201
202
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 return Stream(info, NULL, 0, 0);
203 1 }
204
205 int StreamingCacheManager::Dup(int fd) {
206 FdInfo info;
207
208 MutexLockGuard lock_guard(lock_fd_table_);
209 info = fd_table_.GetHandle(fd);
210
211 if (!info.IsValid())
212 return -EBADF;
213
214 if (info.fd_in_cache_mgr >= 0) {
215 int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr);
216 if (dup_fd < 0)
217 return dup_fd;
218 return fd_table_.OpenFd(FdInfo(dup_fd));
219 }
220
221 return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label)));
222 }
223
224 1 int StreamingCacheManager::Close(int fd) {
225
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 FdInfo info;
226 {
227 1 MutexLockGuard lock_guard(lock_fd_table_);
228
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 info = fd_table_.GetHandle(fd);
229
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
1 if (!info.IsValid())
230 return -EBADF;
231
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 fd_table_.CloseFd(fd);
232
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 }
233
234
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (info.fd_in_cache_mgr >= 0)
235 return cache_mgr_->Close(info.fd_in_cache_mgr);
236
237 1 return 0;
238 1 }
239
240 1 int64_t StreamingCacheManager::Pread(
241 int fd, void *buf, uint64_t size, uint64_t offset)
242 {
243
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 FdInfo info;
244 {
245 1 MutexLockGuard lock_guard(lock_fd_table_);
246
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 info = fd_table_.GetHandle(fd);
247 1 }
248
249
2/4
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
1 if (!info.IsValid())
250 return -EBADF;
251
252
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (info.fd_in_cache_mgr >= 0)
253 return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset);
254
255
1/2
✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
1 uint64_t nbytes_streamed = Stream(info, buf, size, offset);
256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
1 if (nbytes_streamed < offset)
257 return 0;
258
1/2
✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
1 if (nbytes_streamed > (offset + size))
259 1 return static_cast<int64_t>(size);
260 return static_cast<int64_t>(nbytes_streamed - offset);
261 1 }
262
263 int StreamingCacheManager::Readahead(int fd) {
264 FdInfo info;
265 {
266 MutexLockGuard lock_guard(lock_fd_table_);
267 info = fd_table_.GetHandle(fd);
268 }
269
270 if (!info.IsValid())
271 return -EBADF;
272
273 if (info.fd_in_cache_mgr >= 0)
274 return cache_mgr_->Readahead(info.fd_in_cache_mgr);
275
276 return 0;
277 }
278
279 int StreamingCacheManager::OpenFromTxn(void *txn) {
280 int fd = cache_mgr_->OpenFromTxn(txn);
281 if (fd < 0)
282 return fd;
283
284 MutexLockGuard lock_guard(lock_fd_table_);
285 return fd_table_.OpenFd(FdInfo(fd));
286 }
287
288 void *StreamingCacheManager::DoSaveState() {
289 SavedState *state = new SavedState();
290 state->fd_table = fd_table_.Clone();
291 state->state_backing_cachemgr = cache_mgr_->SaveState(-1);
292 return state;
293 }
294
295 int StreamingCacheManager::DoRestoreState(void *data) {
296 // When DoRestoreState is called, we have fd 0 assigned to the root file
297 // catalog
298 FdInfo handle_root = fd_table_.GetHandle(0);
299
300 SavedState *state = reinterpret_cast<SavedState *>(data);
301
302 int new_backing_root_fd =
303 cache_mgr_->RestoreState(-1, state->state_backing_cachemgr);
304 fd_table_.AssignFrom(*state->fd_table);
305
306 int new_root_fd = -1;
307 if (handle_root.IsValid()) {
308 if (new_backing_root_fd >= 0)
309 handle_root.fd_in_cache_mgr = new_backing_root_fd;
310 new_root_fd = fd_table_.OpenFd(handle_root);
311 // There must be a free file descriptor because the root file catalog gets
312 // closed before a reload
313 assert(new_root_fd >= 0);
314 }
315 return new_root_fd;
316 }
317
318 bool StreamingCacheManager::DoFreeState(void *data) {
319 SavedState *state = reinterpret_cast<SavedState *>(data);
320 cache_mgr_->FreeState(-1, state->state_backing_cachemgr);
321 delete state->fd_table;
322 delete state;
323 return true;
324 }
325
326 CacheManager *StreamingCacheManager::MoveOutBackingCacheMgr(int *root_fd) {
327 *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr;
328 return cache_mgr_.Release();
329 }
330