Directory: | cvmfs/ |
---|---|
File: | cvmfs/cache_stream.cc |
Date: | 2024-04-28 02:33:07 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 101 | 190 | 53.2% |
Branches: | 49 | 186 | 26.3% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "cvmfs_config.h" | ||
6 | #include "cache_stream.h" | ||
7 | |||
8 | #include <algorithm> | ||
9 | #include <cstdlib> | ||
10 | #include <cstring> | ||
11 | #include <string> | ||
12 | |||
13 | #include "network/download.h" | ||
14 | #include "network/sink.h" | ||
15 | #include "quota.h" | ||
16 | #include "util/mutex.h" | ||
17 | #include "util/smalloc.h" | ||
18 | |||
19 | |||
20 | namespace { | ||
21 | |||
22 | class StreamingSink : public cvmfs::Sink { | ||
23 | public: | ||
24 | 2 | StreamingSink(void *buf, uint64_t size, uint64_t offset) | |
25 | 2 | : Sink(false /* is_owner */) | |
26 | 2 | , pos_(0) | |
27 | 2 | , window_buf_(buf) | |
28 | 2 | , window_size_(size) | |
29 | 2 | , window_offset_(offset) | |
30 | 2 | { } | |
31 | |||
32 | 4 | virtual ~StreamingSink() {} | |
33 | |||
34 | 2 | virtual int64_t Write(const void *buf, uint64_t sz) { | |
35 | 2 | uint64_t old_pos = pos_; | |
36 | 2 | pos_ += sz; | |
37 | |||
38 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
|
2 | if (!window_buf_) |
39 | 1 | return static_cast<int64_t>(sz); | |
40 | |||
41 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (pos_ < window_offset_) |
42 | ✗ | return static_cast<int64_t>(sz); | |
43 | |||
44 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (old_pos >= (window_offset_ + window_size_)) |
45 | ✗ | return static_cast<int64_t>(sz); | |
46 | |||
47 | 1 | uint64_t copy_offset = std::max(old_pos, window_offset_); | |
48 | 1 | uint64_t inbuf_offset = copy_offset - old_pos; | |
49 | 1 | uint64_t outbuf_offset = copy_offset - window_offset_; | |
50 | uint64_t copy_size = | ||
51 | 1 | std::min(sz - inbuf_offset, window_size_ - outbuf_offset); | |
52 | |||
53 | 1 | memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset, | |
54 | reinterpret_cast<const unsigned char *>(buf) + inbuf_offset, | ||
55 | copy_size); | ||
56 | |||
57 | 1 | return static_cast<int64_t>(sz); | |
58 | } | ||
59 | |||
60 | ✗ | virtual int Reset() { | |
61 | ✗ | pos_ = 0; | |
62 | ✗ | return 0; | |
63 | } | ||
64 | |||
65 | ✗ | virtual int Purge() { return Reset(); } | |
66 | 2 | virtual bool IsValid() { | |
67 |
3/4✓ Branch 0 taken 1 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
2 | return (window_buf_ != NULL) || (window_size_ == 0); |
68 | } | ||
69 | 2 | virtual int Flush() { return 0; } | |
70 | ✗ | virtual bool Reserve(size_t /* size */) { return true; } | |
71 | 8 | virtual bool RequiresReserve() { return false; } | |
72 | ✗ | virtual std::string Describe() { | |
73 | ✗ | std::string result = "Streaming sink that is "; | |
74 | ✗ | result += IsValid() ? "valid" : "invalid"; | |
75 | ✗ | return result; | |
76 | } | ||
77 | |||
78 | 2 | int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); } | |
79 | |||
80 | private: | ||
81 | uint64_t pos_; | ||
82 | void *window_buf_; | ||
83 | uint64_t window_size_; | ||
84 | uint64_t window_offset_; | ||
85 | }; // class StreamingSink | ||
86 | |||
87 | } // anonymous namespace | ||
88 | |||
89 | |||
90 | 2 | download::DownloadManager *StreamingCacheManager::SelectDownloadManager( | |
91 | const FdInfo &info) | ||
92 | { | ||
93 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (info.label.IsExternal()) |
94 | ✗ | return external_download_mgr_; | |
95 | 2 | return regular_download_mgr_; | |
96 | } | ||
97 | |||
98 | 2 | int64_t StreamingCacheManager::Stream( | |
99 | const FdInfo &info, | ||
100 | void *buf, | ||
101 | uint64_t size, | ||
102 | uint64_t offset) | ||
103 | { | ||
104 | 2 | StreamingSink sink(buf, size, offset); | |
105 | 2 | std::string url; | |
106 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (info.label.IsExternal()) { |
107 | ✗ | url = info.label.path; | |
108 | } else { | ||
109 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | url = "/data/" + info.object_id.MakePath(); |
110 | } | ||
111 | 2 | bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault; | |
112 | |||
113 | download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */, | ||
114 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | &info.object_id, &sink); |
115 | 2 | download_job.SetExtraInfo(&info.label.path); | |
116 | 2 | download_job.SetRangeOffset(info.label.range_offset); | |
117 | 2 | download_job.SetRangeSize(static_cast<int64_t>(info.label.size)); | |
118 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | SelectDownloadManager(info)->Fetch(&download_job); |
119 | |||
120 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (download_job.error_code() != download::kFailOk) { |
121 | ✗ | return -EIO; | |
122 | } | ||
123 | |||
124 | 2 | return sink.GetNBytesStreamed(); | |
125 | 2 | } | |
126 | |||
127 | |||
128 | 1 | StreamingCacheManager::StreamingCacheManager( | |
129 | unsigned max_open_fds, | ||
130 | CacheManager *cache_mgr, | ||
131 | download::DownloadManager *regular_download_mgr, | ||
132 | 1 | download::DownloadManager *external_download_mgr) | |
133 | 1 | : cache_mgr_(cache_mgr) | |
134 | 1 | , regular_download_mgr_(regular_download_mgr) | |
135 | 1 | , external_download_mgr_(external_download_mgr) | |
136 |
3/6✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 1 times.
✗ Branch 9 not taken.
|
1 | , fd_table_(max_open_fds, FdInfo()) |
137 | { | ||
138 | 1 | lock_fd_table_ = | |
139 | 1 | reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
140 | 1 | int retval = pthread_mutex_init(lock_fd_table_, NULL); | |
141 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | assert(retval == 0); |
142 | |||
143 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | delete quota_mgr_; |
144 | 1 | quota_mgr_ = cache_mgr_->quota_mgr(); | |
145 | 1 | } | |
146 | |||
147 | 4 | StreamingCacheManager::~StreamingCacheManager() { | |
148 | 2 | pthread_mutex_destroy(lock_fd_table_); | |
149 | 2 | free(lock_fd_table_); | |
150 | 2 | quota_mgr_ = NULL; // gets deleted by cache_mgr_ | |
151 | 4 | } | |
152 | |||
153 | ✗ | std::string StreamingCacheManager::Describe() { | |
154 | ✗ | return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe(); | |
155 | } | ||
156 | |||
157 | ✗ | bool StreamingCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) { | |
158 | ✗ | bool result = cache_mgr_->AcquireQuotaManager(quota_mgr); | |
159 | ✗ | if (result) | |
160 | ✗ | quota_mgr_ = cache_mgr_->quota_mgr(); | |
161 | ✗ | return result; | |
162 | } | ||
163 | |||
164 | 1 | int StreamingCacheManager::Open(const LabeledObject &object) { | |
165 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | int fd_in_cache_mgr = cache_mgr_->Open(object); |
166 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (fd_in_cache_mgr >= 0) { |
167 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
168 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
169 | } | ||
170 | |||
171 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (fd_in_cache_mgr != -ENOENT) |
172 | ✗ | return fd_in_cache_mgr; | |
173 | |||
174 |
4/8✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 1 times.
|
2 | if (object.label.IsCatalog() || object.label.IsPinned() || |
175 | 1 | object.label.IsCertificate()) | |
176 | { | ||
177 | ✗ | return -ENOENT; | |
178 | } | ||
179 | |||
180 | 1 | MutexLockGuard lock_guard(lock_fd_table_); | |
181 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | return fd_table_.OpenFd(FdInfo(object)); |
182 | 1 | } | |
183 | |||
184 | ✗ | int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) { | |
185 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
186 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
187 | } | ||
188 | |||
189 | 1 | int64_t StreamingCacheManager::GetSize(int fd) { | |
190 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | FdInfo info; |
191 | { | ||
192 | 1 | MutexLockGuard lock_guard(lock_fd_table_); | |
193 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | info = fd_table_.GetHandle(fd); |
194 | 1 | } | |
195 | |||
196 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
|
1 | if (!info.IsValid()) |
197 | ✗ | return -EBADF; | |
198 | |||
199 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (info.fd_in_cache_mgr >= 0) |
200 | ✗ | return cache_mgr_->GetSize(info.fd_in_cache_mgr); | |
201 | |||
202 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | return Stream(info, NULL, 0, 0); |
203 | 1 | } | |
204 | |||
205 | ✗ | int StreamingCacheManager::Dup(int fd) { | |
206 | ✗ | FdInfo info; | |
207 | |||
208 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
209 | ✗ | info = fd_table_.GetHandle(fd); | |
210 | |||
211 | ✗ | if (!info.IsValid()) | |
212 | ✗ | return -EBADF; | |
213 | |||
214 | ✗ | if (info.fd_in_cache_mgr >= 0) { | |
215 | ✗ | int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr); | |
216 | ✗ | if (dup_fd < 0) | |
217 | ✗ | return dup_fd; | |
218 | ✗ | return fd_table_.OpenFd(FdInfo(dup_fd)); | |
219 | } | ||
220 | |||
221 | ✗ | return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label))); | |
222 | } | ||
223 | |||
224 | 1 | int StreamingCacheManager::Close(int fd) { | |
225 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | FdInfo info; |
226 | { | ||
227 | 1 | MutexLockGuard lock_guard(lock_fd_table_); | |
228 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | info = fd_table_.GetHandle(fd); |
229 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
|
1 | if (!info.IsValid()) |
230 | ✗ | return -EBADF; | |
231 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | fd_table_.CloseFd(fd); |
232 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | } |
233 | |||
234 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (info.fd_in_cache_mgr >= 0) |
235 | ✗ | return cache_mgr_->Close(info.fd_in_cache_mgr); | |
236 | |||
237 | 1 | return 0; | |
238 | 1 | } | |
239 | |||
240 | 1 | int64_t StreamingCacheManager::Pread( | |
241 | int fd, void *buf, uint64_t size, uint64_t offset) | ||
242 | { | ||
243 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | FdInfo info; |
244 | { | ||
245 | 1 | MutexLockGuard lock_guard(lock_fd_table_); | |
246 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | info = fd_table_.GetHandle(fd); |
247 | 1 | } | |
248 | |||
249 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
|
1 | if (!info.IsValid()) |
250 | ✗ | return -EBADF; | |
251 | |||
252 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (info.fd_in_cache_mgr >= 0) |
253 | ✗ | return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset); | |
254 | |||
255 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | uint64_t nbytes_streamed = Stream(info, buf, size, offset); |
256 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (nbytes_streamed < offset) |
257 | ✗ | return 0; | |
258 |
1/2✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
|
1 | if (nbytes_streamed > (offset + size)) |
259 | 1 | return static_cast<int64_t>(size); | |
260 | ✗ | return static_cast<int64_t>(nbytes_streamed - offset); | |
261 | 1 | } | |
262 | |||
263 | ✗ | int StreamingCacheManager::Readahead(int fd) { | |
264 | ✗ | FdInfo info; | |
265 | { | ||
266 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
267 | ✗ | info = fd_table_.GetHandle(fd); | |
268 | } | ||
269 | |||
270 | ✗ | if (!info.IsValid()) | |
271 | ✗ | return -EBADF; | |
272 | |||
273 | ✗ | if (info.fd_in_cache_mgr >= 0) | |
274 | ✗ | return cache_mgr_->Readahead(info.fd_in_cache_mgr); | |
275 | |||
276 | ✗ | return 0; | |
277 | } | ||
278 | |||
279 | ✗ | int StreamingCacheManager::OpenFromTxn(void *txn) { | |
280 | ✗ | int fd = cache_mgr_->OpenFromTxn(txn); | |
281 | ✗ | if (fd < 0) | |
282 | ✗ | return fd; | |
283 | |||
284 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
285 | ✗ | return fd_table_.OpenFd(FdInfo(fd)); | |
286 | } | ||
287 | |||
288 | ✗ | void *StreamingCacheManager::DoSaveState() { | |
289 | ✗ | SavedState *state = new SavedState(); | |
290 | ✗ | state->fd_table = fd_table_.Clone(); | |
291 | ✗ | state->state_backing_cachemgr = cache_mgr_->SaveState(-1); | |
292 | ✗ | return state; | |
293 | } | ||
294 | |||
295 | ✗ | int StreamingCacheManager::DoRestoreState(void *data) { | |
296 | // When DoRestoreState is called, we have fd 0 assigned to the root file | ||
297 | // catalog | ||
298 | ✗ | FdInfo handle_root = fd_table_.GetHandle(0); | |
299 | |||
300 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
301 | |||
302 | int new_backing_root_fd = | ||
303 | ✗ | cache_mgr_->RestoreState(-1, state->state_backing_cachemgr); | |
304 | ✗ | fd_table_.AssignFrom(*state->fd_table); | |
305 | |||
306 | ✗ | int new_root_fd = -1; | |
307 | ✗ | if (handle_root.IsValid()) { | |
308 | ✗ | if (new_backing_root_fd >= 0) | |
309 | ✗ | handle_root.fd_in_cache_mgr = new_backing_root_fd; | |
310 | ✗ | new_root_fd = fd_table_.OpenFd(handle_root); | |
311 | // There must be a free file descriptor because the root file catalog gets | ||
312 | // closed before a reload | ||
313 | ✗ | assert(new_root_fd >= 0); | |
314 | } | ||
315 | ✗ | return new_root_fd; | |
316 | } | ||
317 | |||
318 | ✗ | bool StreamingCacheManager::DoFreeState(void *data) { | |
319 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
320 | ✗ | cache_mgr_->FreeState(-1, state->state_backing_cachemgr); | |
321 | ✗ | delete state->fd_table; | |
322 | ✗ | delete state; | |
323 | ✗ | return true; | |
324 | } | ||
325 | |||
326 | ✗ | CacheManager *StreamingCacheManager::MoveOutBackingCacheMgr(int *root_fd) { | |
327 | ✗ | *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr; | |
328 | ✗ | return cache_mgr_.Release(); | |
329 | } | ||
330 |