Directory: | cvmfs/ |
---|---|
File: | cvmfs/cache_stream.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 155 | 253 | 61.3% |
Branches: | 106 | 296 | 35.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | |||
6 | #include "cache_stream.h" | ||
7 | |||
8 | #include <algorithm> | ||
9 | #include <cstdlib> | ||
10 | #include <cstring> | ||
11 | #include <string> | ||
12 | |||
13 | #include "clientctx.h" | ||
14 | #include "network/download.h" | ||
15 | #include "network/sink.h" | ||
16 | #include "quota.h" | ||
17 | #include "statistics.h" | ||
18 | #include "util/mutex.h" | ||
19 | #include "util/platform.h" | ||
20 | #include "util/smalloc.h" | ||
21 | |||
22 | |||
23 | namespace { | ||
24 | |||
25 | class StreamingSink : public cvmfs::Sink { | ||
26 | public: | ||
27 | 147 | StreamingSink(void *buf, uint64_t size, uint64_t offset, | |
28 | unsigned char *object) | ||
29 | 147 | : Sink(false /* is_owner */) | |
30 | 147 | , pos_(0) | |
31 | 147 | , window_buf_(buf) | |
32 | 147 | , window_size_(size) | |
33 | 147 | , window_offset_(offset) | |
34 | 147 | , object_(object) { } | |
35 | |||
36 | 294 | virtual ~StreamingSink() { } | |
37 | |||
38 | 147 | virtual int64_t Write(const void *buf, uint64_t sz) { | |
39 |
2/2✓ Branch 0 taken 49 times.
✓ Branch 1 taken 98 times.
|
147 | if (object_) { |
40 | 49 | memcpy(object_ + pos_, buf, sz); | |
41 | } | ||
42 | |||
43 | 147 | const uint64_t old_pos = pos_; | |
44 | 147 | pos_ += sz; | |
45 | |||
46 |
2/2✓ Branch 0 taken 98 times.
✓ Branch 1 taken 49 times.
|
147 | if (!window_buf_) |
47 | 98 | return static_cast<int64_t>(sz); | |
48 | |||
49 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | if (pos_ < window_offset_) |
50 | ✗ | return static_cast<int64_t>(sz); | |
51 | |||
52 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
|
49 | if (old_pos >= (window_offset_ + window_size_)) |
53 | ✗ | return static_cast<int64_t>(sz); | |
54 | |||
55 | 49 | const uint64_t copy_offset = std::max(old_pos, window_offset_); | |
56 | 49 | const uint64_t inbuf_offset = copy_offset - old_pos; | |
57 | 49 | const uint64_t outbuf_offset = copy_offset - window_offset_; | |
58 | const uint64_t copy_size = | ||
59 | 49 | std::min(sz - inbuf_offset, window_size_ - outbuf_offset); | |
60 | |||
61 | 49 | memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset, | |
62 | reinterpret_cast<const unsigned char *>(buf) + inbuf_offset, | ||
63 | copy_size); | ||
64 | |||
65 | 49 | return static_cast<int64_t>(sz); | |
66 | } | ||
67 | |||
68 | ✗ | virtual int Reset() { | |
69 | ✗ | pos_ = 0; | |
70 | ✗ | return 0; | |
71 | } | ||
72 | |||
73 | ✗ | virtual int Purge() { return Reset(); } | |
74 | 147 | virtual bool IsValid() { | |
75 |
3/4✓ Branch 0 taken 98 times.
✓ Branch 1 taken 49 times.
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
|
147 | return (window_buf_ != NULL) || (window_size_ == 0); |
76 | } | ||
77 | 147 | virtual int Flush() { return 0; } | |
78 | ✗ | virtual bool Reserve(size_t /* size */) { return true; } | |
79 | 588 | virtual bool RequiresReserve() { return false; } | |
80 | ✗ | virtual std::string Describe() { | |
81 | ✗ | std::string result = "Streaming sink that is "; | |
82 | ✗ | result += IsValid() ? "valid" : "invalid"; | |
83 | ✗ | return result; | |
84 | } | ||
85 | |||
86 | 294 | int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); } | |
87 | |||
88 | private: | ||
89 | uint64_t pos_; | ||
90 | void *window_buf_; | ||
91 | uint64_t window_size_; | ||
92 | uint64_t window_offset_; | ||
93 | unsigned char *object_; | ||
94 | }; // class StreamingSink | ||
95 | |||
96 | 245 | static inline uint32_t hasher_any(const shash::Any &key) { | |
97 | 245 | return *const_cast<uint32_t *>(reinterpret_cast<const uint32_t *>(key.digest) | |
98 | 245 | + 1); | |
99 | } | ||
100 | |||
101 | } // anonymous namespace | ||
102 | |||
103 | |||
104 | const size_t StreamingCacheManager::kDefaultBufferSize = 64 * 1024 * 1024; | ||
105 | |||
106 | |||
107 | 98 | StreamingCacheManager::Counters::Counters(perf::Statistics *statistics) { | |
108 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | sz_transferred_bytes = statistics->Register( |
109 | "streaming_cache_mgr.sz_transferred_bytes", | ||
110 | "Number of bytes downloaded by the streaming cache manager"); | ||
111 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | sz_transfer_ms = statistics->Register( |
112 | "streaming_cache_mgr.sz_transfer_ms", | ||
113 | "Time spent downloading data by the streaming cache manager"); | ||
114 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_downloads = statistics->Register("streaming_cache_mgr.n_downloads", |
115 | "Number of objects requested remotely"); | ||
116 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_buffer_hits = statistics->Register( |
117 | "streaming_cache_mgr.n_buffer_hits", | ||
118 | "Number of requests served from the buffer"); | ||
119 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_buffer_evicts = statistics->Register( |
120 | "streaming_cache_mgr.n_buffer_evicts", | ||
121 | "Number of objects evicted from the buffer"); | ||
122 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_buffer_objects = statistics->Register( |
123 | "streaming_cache_mgr.n_buffer_objects", | ||
124 | "Number of objects in the buffer"); | ||
125 |
3/6✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 98 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 98 times.
✗ Branch 10 not taken.
|
98 | n_buffer_obstacles = statistics->Register( |
126 | "streaming_cache_mgr.n_buffer_obstacles", | ||
127 | "Number of objects that could not be stored in the buffer " | ||
128 | "(e.g., too large)"); | ||
129 | 98 | } | |
130 | |||
131 | |||
132 | 147 | download::DownloadManager *StreamingCacheManager::SelectDownloadManager( | |
133 | const FdInfo &info) { | ||
134 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
|
147 | if (info.label.IsExternal()) |
135 | ✗ | return external_download_mgr_; | |
136 | 147 | return regular_download_mgr_; | |
137 | } | ||
138 | |||
139 | |||
140 | 196 | int64_t StreamingCacheManager::Stream(const FdInfo &info, | |
141 | void *buf, | ||
142 | uint64_t size, | ||
143 | uint64_t offset) { | ||
144 | // Note: objects stored in the ring buffer are prepended by their hash | ||
145 | |||
146 | { | ||
147 | 196 | const MutexLockGuard _(lock_buffer_); | |
148 | RingBuffer::ObjectHandle_t handle; | ||
149 |
3/4✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✓ Branch 4 taken 147 times.
|
196 | if (buffered_objects_.Lookup(info.object_id, &handle)) { |
150 | 49 | perf::Inc(counters_->n_buffer_hits); | |
151 |
1/2✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
|
49 | buffer_->CopySlice(handle, size, offset + sizeof(shash::Any), buf); |
152 |
1/2✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
|
49 | return buffer_->GetObjectSize(handle) - sizeof(shash::Any); |
153 | } | ||
154 |
2/2✓ Branch 1 taken 147 times.
✓ Branch 2 taken 49 times.
|
196 | } |
155 | |||
156 | 147 | unsigned char *object = NULL; | |
157 | 147 | size_t nbytes_in_buffer = 0; | |
158 | 294 | if ((info.label.size != CacheManager::kSizeUnknown) | |
159 |
5/6✓ Branch 0 taken 49 times.
✓ Branch 1 taken 98 times.
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 49 times.
✓ Branch 5 taken 98 times.
|
196 | && (info.label.size + sizeof(shash::Any) |
160 | 49 | <= buffer_->GetMaxObjectSize())) { | |
161 | 49 | nbytes_in_buffer = sizeof(shash::Any) + info.label.size; | |
162 | 49 | object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer)); | |
163 | } else { | ||
164 | 98 | perf::Inc(counters_->n_buffer_obstacles); | |
165 | } | ||
166 | |||
167 | StreamingSink sink(buf, size, offset, | ||
168 |
2/2✓ Branch 0 taken 49 times.
✓ Branch 1 taken 98 times.
|
147 | object ? (object + sizeof(shash::Any)) : NULL); |
169 | 147 | std::string url; | |
170 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
|
147 | if (info.label.IsExternal()) { |
171 | ✗ | url = info.label.path; | |
172 | } else { | ||
173 |
2/4✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 147 times.
✗ Branch 5 not taken.
|
147 | url = "/data/" + info.object_id.MakePath(); |
174 | } | ||
175 | 147 | const bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault; | |
176 | |||
177 | download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */, | ||
178 |
1/2✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
|
147 | &info.object_id, &sink); |
179 | 147 | download_job.SetExtraInfo(&info.label.path); | |
180 | 147 | download_job.SetRangeOffset(info.label.range_offset); | |
181 | 147 | download_job.SetRangeSize(static_cast<int64_t>(info.label.size)); | |
182 |
1/2✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
|
147 | ClientCtx *ctx = ClientCtx::GetInstance(); |
183 |
2/4✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 147 times.
|
147 | if (ctx->IsSet()) { |
184 | ✗ | ctx->Get(download_job.GetUidPtr(), | |
185 | download_job.GetGidPtr(), | ||
186 | download_job.GetPidPtr(), | ||
187 | download_job.GetInterruptCuePtr()); | ||
188 | } | ||
189 | |||
190 | { | ||
191 | 147 | const uint64_t timestamp = platform_monotonic_time_ns(); | |
192 |
1/2✓ Branch 2 taken 147 times.
✗ Branch 3 not taken.
|
147 | SelectDownloadManager(info)->Fetch(&download_job); |
193 | 147 | perf::Xadd(counters_->sz_transfer_ms, | |
194 | 147 | (platform_monotonic_time_ns() - timestamp) / (1000 * 1000)); | |
195 | } | ||
196 | |||
197 | 147 | perf::Inc(counters_->n_downloads); | |
198 | 147 | perf::Xadd(counters_->sz_transferred_bytes, sink.GetNBytesStreamed()); | |
199 | |||
200 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
|
147 | if (download_job.error_code() != download::kFailOk) { |
201 | ✗ | free(object); | |
202 | ✗ | return -EIO; | |
203 | } | ||
204 | |||
205 |
2/2✓ Branch 0 taken 49 times.
✓ Branch 1 taken 98 times.
|
147 | if (object) { |
206 | 49 | memcpy(object, &info.object_id, sizeof(shash::Any)); | |
207 | 49 | const MutexLockGuard _(lock_buffer_); | |
208 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
|
49 | while (!buffer_->HasSpaceFor(nbytes_in_buffer)) { |
209 | ✗ | const RingBuffer::ObjectHandle_t deleted_handle = buffer_->RemoveBack(); | |
210 | // As long as we don't add any new objects, the deleted_handle can still | ||
211 | // be accessed | ||
212 | ✗ | shash::Any deleted_hash; | |
213 | ✗ | buffer_->CopySlice(deleted_handle, sizeof(shash::Any), 0, &deleted_hash); | |
214 | ✗ | buffered_objects_.Erase(deleted_hash); | |
215 | ✗ | perf::Inc(counters_->n_buffer_evicts); | |
216 | ✗ | perf::Dec(counters_->n_buffer_objects); | |
217 | } | ||
218 | const RingBuffer::ObjectHandle_t handle = | ||
219 |
1/2✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
|
49 | buffer_->PushFront(object, nbytes_in_buffer); |
220 |
1/2✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
|
49 | buffered_objects_.Insert(info.object_id, handle); |
221 | 49 | perf::Inc(counters_->n_buffer_objects); | |
222 | 49 | } | |
223 | 147 | free(object); | |
224 | |||
225 | 147 | return sink.GetNBytesStreamed(); | |
226 | 147 | } | |
227 | |||
228 | |||
229 | 98 | StreamingCacheManager::StreamingCacheManager( | |
230 | unsigned max_open_fds, | ||
231 | CacheManager *cache_mgr, | ||
232 | download::DownloadManager *regular_download_mgr, | ||
233 | download::DownloadManager *external_download_mgr, | ||
234 | size_t buffer_size, | ||
235 | 98 | perf::Statistics *statistics) | |
236 | 98 | : cache_mgr_(cache_mgr) | |
237 | 98 | , regular_download_mgr_(regular_download_mgr) | |
238 | 98 | , external_download_mgr_(external_download_mgr) | |
239 |
2/4✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 98 times.
✗ Branch 5 not taken.
|
98 | , fd_table_(max_open_fds, FdInfo()) |
240 |
6/12✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 98 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 98 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 98 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 98 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 98 times.
✗ Branch 18 not taken.
|
196 | , counters_(new Counters(statistics)) { |
241 | 98 | lock_fd_table_ = reinterpret_cast<pthread_mutex_t *>( | |
242 | 98 | smalloc(sizeof(pthread_mutex_t))); | |
243 | 98 | int retval = pthread_mutex_init(lock_fd_table_, NULL); | |
244 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | assert(retval == 0); |
245 | |||
246 |
1/2✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
|
98 | delete quota_mgr_; |
247 | 98 | quota_mgr_ = cache_mgr_->quota_mgr(); | |
248 | |||
249 |
3/6✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 98 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 98 times.
✗ Branch 8 not taken.
|
98 | buffer_ = new RingBuffer(buffer_size); |
250 |
2/4✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 98 times.
✗ Branch 5 not taken.
|
98 | buffered_objects_.Init(16, shash::Any(), hasher_any); |
251 | 98 | lock_buffer_ = reinterpret_cast<pthread_mutex_t *>( | |
252 | 98 | smalloc(sizeof(pthread_mutex_t))); | |
253 | 98 | retval = pthread_mutex_init(lock_buffer_, NULL); | |
254 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | assert(retval == 0); |
255 | 98 | } | |
256 | |||
257 | 392 | StreamingCacheManager::~StreamingCacheManager() { | |
258 | 196 | pthread_mutex_destroy(lock_buffer_); | |
259 | 196 | free(lock_buffer_); | |
260 | 196 | pthread_mutex_destroy(lock_fd_table_); | |
261 | 196 | free(lock_fd_table_); | |
262 | 196 | quota_mgr_ = NULL; // gets deleted by cache_mgr_ | |
263 | 392 | } | |
264 | |||
265 | ✗ | std::string StreamingCacheManager::Describe() { | |
266 | ✗ | return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe(); | |
267 | } | ||
268 | |||
269 | ✗ | bool StreamingCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) { | |
270 | ✗ | const bool result = cache_mgr_->AcquireQuotaManager(quota_mgr); | |
271 | ✗ | if (result) | |
272 | ✗ | quota_mgr_ = cache_mgr_->quota_mgr(); | |
273 | ✗ | return result; | |
274 | } | ||
275 | |||
276 | 98 | int StreamingCacheManager::Open(const LabeledObject &object) { | |
277 |
1/2✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
|
98 | const int fd_in_cache_mgr = cache_mgr_->Open(object); |
278 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | if (fd_in_cache_mgr >= 0) { |
279 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
280 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
281 | } | ||
282 | |||
283 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | if (fd_in_cache_mgr != -ENOENT) |
284 | ✗ | return fd_in_cache_mgr; | |
285 | |||
286 |
1/2✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
|
196 | if (object.label.IsCatalog() || object.label.IsPinned() |
287 |
3/6✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 98 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 98 times.
|
196 | || object.label.IsCertificate()) { |
288 | ✗ | return -ENOENT; | |
289 | } | ||
290 | |||
291 | 98 | const MutexLockGuard lock_guard(lock_fd_table_); | |
292 |
2/4✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 98 times.
✗ Branch 5 not taken.
|
98 | return fd_table_.OpenFd(FdInfo(object)); |
293 | 98 | } | |
294 | |||
295 | ✗ | int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) { | |
296 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
297 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
298 | } | ||
299 | |||
300 | 98 | int64_t StreamingCacheManager::GetSize(int fd) { | |
301 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | FdInfo info; |
302 | { | ||
303 | 98 | const MutexLockGuard lock_guard(lock_fd_table_); | |
304 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | info = fd_table_.GetHandle(fd); |
305 | 98 | } | |
306 | |||
307 |
2/4✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 98 times.
|
98 | if (!info.IsValid()) |
308 | ✗ | return -EBADF; | |
309 | |||
310 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | if (info.fd_in_cache_mgr >= 0) |
311 | ✗ | return cache_mgr_->GetSize(info.fd_in_cache_mgr); | |
312 | |||
313 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | return Stream(info, NULL, 0, 0); |
314 | 98 | } | |
315 | |||
316 | ✗ | int StreamingCacheManager::Dup(int fd) { | |
317 | ✗ | FdInfo info; | |
318 | |||
319 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
320 | ✗ | info = fd_table_.GetHandle(fd); | |
321 | |||
322 | ✗ | if (!info.IsValid()) | |
323 | ✗ | return -EBADF; | |
324 | |||
325 | ✗ | if (info.fd_in_cache_mgr >= 0) { | |
326 | ✗ | const int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr); | |
327 | ✗ | if (dup_fd < 0) | |
328 | ✗ | return dup_fd; | |
329 | ✗ | return fd_table_.OpenFd(FdInfo(dup_fd)); | |
330 | } | ||
331 | |||
332 | ✗ | return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label))); | |
333 | } | ||
334 | |||
335 | 98 | int StreamingCacheManager::Close(int fd) { | |
336 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | FdInfo info; |
337 | { | ||
338 | 98 | const MutexLockGuard lock_guard(lock_fd_table_); | |
339 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | info = fd_table_.GetHandle(fd); |
340 |
2/4✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 98 times.
|
98 | if (!info.IsValid()) |
341 | ✗ | return -EBADF; | |
342 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | fd_table_.CloseFd(fd); |
343 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | } |
344 | |||
345 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | if (info.fd_in_cache_mgr >= 0) |
346 | ✗ | return cache_mgr_->Close(info.fd_in_cache_mgr); | |
347 | |||
348 | 98 | return 0; | |
349 | 98 | } | |
350 | |||
351 | 98 | int64_t StreamingCacheManager::Pread(int fd, void *buf, uint64_t size, | |
352 | uint64_t offset) { | ||
353 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | FdInfo info; |
354 | { | ||
355 | 98 | const MutexLockGuard lock_guard(lock_fd_table_); | |
356 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | info = fd_table_.GetHandle(fd); |
357 | 98 | } | |
358 | |||
359 |
2/4✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 98 times.
|
98 | if (!info.IsValid()) |
360 | ✗ | return -EBADF; | |
361 | |||
362 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | if (info.fd_in_cache_mgr >= 0) |
363 | ✗ | return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset); | |
364 | |||
365 |
1/2✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
|
98 | const int64_t nbytes_streamed = Stream(info, buf, size, offset); |
366 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | if (nbytes_streamed < 0) |
367 | ✗ | return nbytes_streamed; | |
368 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
|
98 | if (static_cast<uint64_t>(nbytes_streamed) < offset) |
369 | ✗ | return 0; | |
370 |
1/2✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
|
98 | if (static_cast<uint64_t>(nbytes_streamed) > (offset + size)) |
371 | 98 | return size; | |
372 | ✗ | return nbytes_streamed - offset; | |
373 | 98 | } | |
374 | |||
375 | ✗ | int StreamingCacheManager::Readahead(int fd) { | |
376 | ✗ | FdInfo info; | |
377 | { | ||
378 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
379 | ✗ | info = fd_table_.GetHandle(fd); | |
380 | } | ||
381 | |||
382 | ✗ | if (!info.IsValid()) | |
383 | ✗ | return -EBADF; | |
384 | |||
385 | ✗ | if (info.fd_in_cache_mgr >= 0) | |
386 | ✗ | return cache_mgr_->Readahead(info.fd_in_cache_mgr); | |
387 | |||
388 | ✗ | return 0; | |
389 | } | ||
390 | |||
391 | ✗ | int StreamingCacheManager::OpenFromTxn(void *txn) { | |
392 | ✗ | const int fd = cache_mgr_->OpenFromTxn(txn); | |
393 | ✗ | if (fd < 0) | |
394 | ✗ | return fd; | |
395 | |||
396 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
397 | ✗ | return fd_table_.OpenFd(FdInfo(fd)); | |
398 | } | ||
399 | |||
400 | ✗ | void *StreamingCacheManager::DoSaveState() { | |
401 | ✗ | SavedState *state = new SavedState(); | |
402 | ✗ | state->fd_table = fd_table_.Clone(); | |
403 | ✗ | state->state_backing_cachemgr = cache_mgr_->SaveState(-1); | |
404 | ✗ | return state; | |
405 | } | ||
406 | |||
407 | ✗ | int StreamingCacheManager::DoRestoreState(void *data) { | |
408 | // When DoRestoreState is called, we have fd 0 assigned to the root file | ||
409 | // catalog | ||
410 | ✗ | FdInfo handle_root = fd_table_.GetHandle(0); | |
411 | |||
412 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
413 | |||
414 | const int new_backing_root_fd = | ||
415 | ✗ | cache_mgr_->RestoreState(-1, state->state_backing_cachemgr); | |
416 | ✗ | fd_table_.AssignFrom(*state->fd_table); | |
417 | |||
418 | ✗ | int new_root_fd = -1; | |
419 | ✗ | if (handle_root.IsValid()) { | |
420 | ✗ | if (new_backing_root_fd >= 0) | |
421 | ✗ | handle_root.fd_in_cache_mgr = new_backing_root_fd; | |
422 | ✗ | new_root_fd = fd_table_.OpenFd(handle_root); | |
423 | // There must be a free file descriptor because the root file catalog gets | ||
424 | // closed before a reload | ||
425 | ✗ | assert(new_root_fd >= 0); | |
426 | } | ||
427 | ✗ | return new_root_fd; | |
428 | } | ||
429 | |||
430 | ✗ | bool StreamingCacheManager::DoFreeState(void *data) { | |
431 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
432 | ✗ | cache_mgr_->FreeState(-1, state->state_backing_cachemgr); | |
433 | ✗ | delete state->fd_table; | |
434 | ✗ | delete state; | |
435 | ✗ | return true; | |
436 | } | ||
437 | |||
438 | ✗ | CacheManager *StreamingCacheManager::MoveOutBackingCacheMgr(int *root_fd) { | |
439 | ✗ | *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr; | |
440 | ✗ | return cache_mgr_.Release(); | |
441 | } | ||
442 |