Directory: | cvmfs/ |
---|---|
File: | cvmfs/cache_stream.cc |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 155 | 253 | 61.3% |
Branches: | 106 | 296 | 35.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | |||
6 | #include "cache_stream.h" | ||
7 | |||
8 | #include <algorithm> | ||
9 | #include <cstdlib> | ||
10 | #include <cstring> | ||
11 | #include <string> | ||
12 | |||
13 | #include "clientctx.h" | ||
14 | #include "network/download.h" | ||
15 | #include "network/sink.h" | ||
16 | #include "quota.h" | ||
17 | #include "statistics.h" | ||
18 | #include "util/mutex.h" | ||
19 | #include "util/platform.h" | ||
20 | #include "util/smalloc.h" | ||
21 | |||
22 | |||
23 | namespace { | ||
24 | |||
25 | class StreamingSink : public cvmfs::Sink { | ||
26 | public: | ||
27 | 3 | StreamingSink(void *buf, uint64_t size, uint64_t offset, | |
28 | unsigned char *object) | ||
29 | 3 | : Sink(false /* is_owner */) | |
30 | 3 | , pos_(0) | |
31 | 3 | , window_buf_(buf) | |
32 | 3 | , window_size_(size) | |
33 | 3 | , window_offset_(offset) | |
34 | 3 | , object_(object) | |
35 | 3 | { } | |
36 | |||
37 | 6 | virtual ~StreamingSink() {} | |
38 | |||
39 | 3 | virtual int64_t Write(const void *buf, uint64_t sz) { | |
40 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
|
3 | if (object_) { |
41 | 1 | memcpy(object_ + pos_, buf, sz); | |
42 | } | ||
43 | |||
44 | 3 | uint64_t old_pos = pos_; | |
45 | 3 | pos_ += sz; | |
46 | |||
47 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
|
3 | if (!window_buf_) |
48 | 2 | return static_cast<int64_t>(sz); | |
49 | |||
50 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (pos_ < window_offset_) |
51 | ✗ | return static_cast<int64_t>(sz); | |
52 | |||
53 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (old_pos >= (window_offset_ + window_size_)) |
54 | ✗ | return static_cast<int64_t>(sz); | |
55 | |||
56 | 1 | uint64_t copy_offset = std::max(old_pos, window_offset_); | |
57 | 1 | uint64_t inbuf_offset = copy_offset - old_pos; | |
58 | 1 | uint64_t outbuf_offset = copy_offset - window_offset_; | |
59 | uint64_t copy_size = | ||
60 | 1 | std::min(sz - inbuf_offset, window_size_ - outbuf_offset); | |
61 | |||
62 | 1 | memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset, | |
63 | reinterpret_cast<const unsigned char *>(buf) + inbuf_offset, | ||
64 | copy_size); | ||
65 | |||
66 | 1 | return static_cast<int64_t>(sz); | |
67 | } | ||
68 | |||
69 | ✗ | virtual int Reset() { | |
70 | ✗ | pos_ = 0; | |
71 | ✗ | return 0; | |
72 | } | ||
73 | |||
74 | ✗ | virtual int Purge() { return Reset(); } | |
75 | 3 | virtual bool IsValid() { | |
76 |
3/4✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
3 | return (window_buf_ != NULL) || (window_size_ == 0); |
77 | } | ||
78 | 3 | virtual int Flush() { return 0; } | |
79 | ✗ | virtual bool Reserve(size_t /* size */) { return true; } | |
80 | 12 | virtual bool RequiresReserve() { return false; } | |
81 | ✗ | virtual std::string Describe() { | |
82 | ✗ | std::string result = "Streaming sink that is "; | |
83 | ✗ | result += IsValid() ? "valid" : "invalid"; | |
84 | ✗ | return result; | |
85 | } | ||
86 | |||
87 | 6 | int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); } | |
88 | |||
89 | private: | ||
90 | uint64_t pos_; | ||
91 | void *window_buf_; | ||
92 | uint64_t window_size_; | ||
93 | uint64_t window_offset_; | ||
94 | unsigned char *object_; | ||
95 | }; // class StreamingSink | ||
96 | |||
97 | 5 | static inline uint32_t hasher_any(const shash::Any &key) { | |
98 | 5 | return *const_cast<uint32_t *>( | |
99 | 5 | reinterpret_cast<const uint32_t *>(key.digest) + 1); | |
100 | } | ||
101 | |||
102 | } // anonymous namespace | ||
103 | |||
104 | |||
105 | const size_t StreamingCacheManager::kDefaultBufferSize = 64 * 1024 * 1024; | ||
106 | |||
107 | |||
108 | 2 | StreamingCacheManager::Counters::Counters(perf::Statistics *statistics) { | |
109 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | sz_transferred_bytes = statistics->Register( |
110 | "streaming_cache_mgr.sz_transferred_bytes", | ||
111 | "Number of bytes downloaded by the streaming cache manager"); | ||
112 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | sz_transfer_ms = statistics->Register( |
113 | "streaming_cache_mgr.sz_transfer_ms", | ||
114 | "Time spent downloading data by the streaming cache manager"); | ||
115 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_downloads = statistics->Register( |
116 | "streaming_cache_mgr.n_downloads", "Number of objects requested remotely"); | ||
117 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_buffer_hits = statistics->Register( |
118 | "streaming_cache_mgr.n_buffer_hits", | ||
119 | "Number of requests served from the buffer"); | ||
120 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_buffer_evicts = statistics->Register( |
121 | "streaming_cache_mgr.n_buffer_evicts", | ||
122 | "Number of objects evicted from the buffer"); | ||
123 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_buffer_objects = statistics->Register( |
124 | "streaming_cache_mgr.n_buffer_objects", "Number of objects in the buffer"); | ||
125 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | n_buffer_obstacles = statistics->Register( |
126 | "streaming_cache_mgr.n_buffer_obstacles", | ||
127 | "Number of objects that could not be stored in the buffer " | ||
128 | "(e.g., too large)"); | ||
129 | 2 | } | |
130 | |||
131 | |||
132 | 3 | download::DownloadManager *StreamingCacheManager::SelectDownloadManager( | |
133 | const FdInfo &info) | ||
134 | { | ||
135 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | if (info.label.IsExternal()) |
136 | ✗ | return external_download_mgr_; | |
137 | 3 | return regular_download_mgr_; | |
138 | } | ||
139 | |||
140 | |||
141 | 4 | int64_t StreamingCacheManager::Stream( | |
142 | const FdInfo &info, | ||
143 | void *buf, | ||
144 | uint64_t size, | ||
145 | uint64_t offset) | ||
146 | { | ||
147 | // Note: objects stored in the ring buffer are prepended by their hash | ||
148 | |||
149 | { | ||
150 | 4 | MutexLockGuard _(lock_buffer_); | |
151 | RingBuffer::ObjectHandle_t handle; | ||
152 |
3/4✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
✓ Branch 4 taken 3 times.
|
4 | if (buffered_objects_.Lookup(info.object_id, &handle)) { |
153 | 1 | perf::Inc(counters_->n_buffer_hits); | |
154 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | buffer_->CopySlice(handle, size, offset + sizeof(shash::Any), buf); |
155 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | return buffer_->GetObjectSize(handle) - sizeof(shash::Any); |
156 | } | ||
157 |
2/2✓ Branch 1 taken 3 times.
✓ Branch 2 taken 1 times.
|
4 | } |
158 | |||
159 | 3 | unsigned char *object = NULL; | |
160 | 3 | size_t nbytes_in_buffer = 0; | |
161 |
5/6✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 2 times.
|
4 | if ((info.label.size != CacheManager::kSizeUnknown) && |
162 | 1 | (info.label.size + sizeof(shash::Any) <= buffer_->GetMaxObjectSize())) | |
163 | { | ||
164 | 1 | nbytes_in_buffer = sizeof(shash::Any) + info.label.size; | |
165 | 1 | object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer)); | |
166 | } else { | ||
167 | 2 | perf::Inc(counters_->n_buffer_obstacles); | |
168 | } | ||
169 | |||
170 | StreamingSink sink(buf, size, offset, | ||
171 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
|
3 | object ? (object + sizeof(shash::Any)) : NULL); |
172 | 3 | std::string url; | |
173 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | if (info.label.IsExternal()) { |
174 | ✗ | url = info.label.path; | |
175 | } else { | ||
176 |
2/4✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
|
3 | url = "/data/" + info.object_id.MakePath(); |
177 | } | ||
178 | 3 | bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault; | |
179 | |||
180 | download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */, | ||
181 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | &info.object_id, &sink); |
182 | 3 | download_job.SetExtraInfo(&info.label.path); | |
183 | 3 | download_job.SetRangeOffset(info.label.range_offset); | |
184 | 3 | download_job.SetRangeSize(static_cast<int64_t>(info.label.size)); | |
185 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | ClientCtx *ctx = ClientCtx::GetInstance(); |
186 |
2/4✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3 times.
|
3 | if (ctx->IsSet()) { |
187 | ✗ | ctx->Get(download_job.GetUidPtr(), | |
188 | download_job.GetGidPtr(), | ||
189 | download_job.GetPidPtr(), | ||
190 | download_job.GetInterruptCuePtr()); | ||
191 | } | ||
192 | |||
193 | { | ||
194 | 3 | uint64_t timestamp = platform_monotonic_time_ns(); | |
195 |
1/2✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
|
3 | SelectDownloadManager(info)->Fetch(&download_job); |
196 | 3 | perf::Xadd(counters_->sz_transfer_ms, | |
197 | 3 | (platform_monotonic_time_ns() - timestamp) / (1000 * 1000)); | |
198 | } | ||
199 | |||
200 | 3 | perf::Inc(counters_->n_downloads); | |
201 | 3 | perf::Xadd(counters_->sz_transferred_bytes, sink.GetNBytesStreamed()); | |
202 | |||
203 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
|
3 | if (download_job.error_code() != download::kFailOk) { |
204 | ✗ | free(object); | |
205 | ✗ | return -EIO; | |
206 | } | ||
207 | |||
208 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 2 times.
|
3 | if (object) { |
209 | 1 | memcpy(object, &info.object_id, sizeof(shash::Any)); | |
210 | 1 | MutexLockGuard _(lock_buffer_); | |
211 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
|
1 | while (!buffer_->HasSpaceFor(nbytes_in_buffer)) { |
212 | ✗ | RingBuffer::ObjectHandle_t deleted_handle = buffer_->RemoveBack(); | |
213 | // As long as we don't add any new objects, the deleted_handle can still | ||
214 | // be accessed | ||
215 | ✗ | shash::Any deleted_hash; | |
216 | ✗ | buffer_->CopySlice(deleted_handle, sizeof(shash::Any), 0, &deleted_hash); | |
217 | ✗ | buffered_objects_.Erase(deleted_hash); | |
218 | ✗ | perf::Inc(counters_->n_buffer_evicts); | |
219 | ✗ | perf::Dec(counters_->n_buffer_objects); | |
220 | } | ||
221 | RingBuffer::ObjectHandle_t handle = | ||
222 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | buffer_->PushFront(object, nbytes_in_buffer); |
223 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | buffered_objects_.Insert(info.object_id, handle); |
224 | 1 | perf::Inc(counters_->n_buffer_objects); | |
225 | 1 | } | |
226 | 3 | free(object); | |
227 | |||
228 | 3 | return sink.GetNBytesStreamed(); | |
229 | 3 | } | |
230 | |||
231 | |||
232 | 2 | StreamingCacheManager::StreamingCacheManager( | |
233 | unsigned max_open_fds, | ||
234 | CacheManager *cache_mgr, | ||
235 | download::DownloadManager *regular_download_mgr, | ||
236 | download::DownloadManager *external_download_mgr, | ||
237 | size_t buffer_size, | ||
238 | 2 | perf::Statistics *statistics) | |
239 | 2 | : cache_mgr_(cache_mgr) | |
240 | 2 | , regular_download_mgr_(regular_download_mgr) | |
241 | 2 | , external_download_mgr_(external_download_mgr) | |
242 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | , fd_table_(max_open_fds, FdInfo()) |
243 |
6/12✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 2 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 2 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 2 times.
✗ Branch 18 not taken.
|
4 | , counters_(new Counters(statistics)) |
244 | { | ||
245 | 2 | lock_fd_table_ = | |
246 | 2 | reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
247 | 2 | int retval = pthread_mutex_init(lock_fd_table_, NULL); | |
248 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | assert(retval == 0); |
249 | |||
250 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | delete quota_mgr_; |
251 | 2 | quota_mgr_ = cache_mgr_->quota_mgr(); | |
252 | |||
253 |
3/6✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | buffer_ = new RingBuffer(buffer_size); |
254 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | buffered_objects_.Init(16, shash::Any(), hasher_any); |
255 | 2 | lock_buffer_ = | |
256 | 2 | reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
257 | 2 | retval = pthread_mutex_init(lock_buffer_, NULL); | |
258 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | assert(retval == 0); |
259 | 2 | } | |
260 | |||
261 | 8 | StreamingCacheManager::~StreamingCacheManager() { | |
262 | 4 | pthread_mutex_destroy(lock_buffer_); | |
263 | 4 | free(lock_buffer_); | |
264 | 4 | pthread_mutex_destroy(lock_fd_table_); | |
265 | 4 | free(lock_fd_table_); | |
266 | 4 | quota_mgr_ = NULL; // gets deleted by cache_mgr_ | |
267 | 8 | } | |
268 | |||
269 | ✗ | std::string StreamingCacheManager::Describe() { | |
270 | ✗ | return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe(); | |
271 | } | ||
272 | |||
273 | ✗ | bool StreamingCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) { | |
274 | ✗ | bool result = cache_mgr_->AcquireQuotaManager(quota_mgr); | |
275 | ✗ | if (result) | |
276 | ✗ | quota_mgr_ = cache_mgr_->quota_mgr(); | |
277 | ✗ | return result; | |
278 | } | ||
279 | |||
280 | 2 | int StreamingCacheManager::Open(const LabeledObject &object) { | |
281 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | int fd_in_cache_mgr = cache_mgr_->Open(object); |
282 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (fd_in_cache_mgr >= 0) { |
283 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
284 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
285 | } | ||
286 | |||
287 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (fd_in_cache_mgr != -ENOENT) |
288 | ✗ | return fd_in_cache_mgr; | |
289 | |||
290 |
4/8✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 2 times.
|
4 | if (object.label.IsCatalog() || object.label.IsPinned() || |
291 | 2 | object.label.IsCertificate()) | |
292 | { | ||
293 | ✗ | return -ENOENT; | |
294 | } | ||
295 | |||
296 | 2 | MutexLockGuard lock_guard(lock_fd_table_); | |
297 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | return fd_table_.OpenFd(FdInfo(object)); |
298 | 2 | } | |
299 | |||
300 | ✗ | int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) { | |
301 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
302 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
303 | } | ||
304 | |||
305 | 2 | int64_t StreamingCacheManager::GetSize(int fd) { | |
306 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | FdInfo info; |
307 | { | ||
308 | 2 | MutexLockGuard lock_guard(lock_fd_table_); | |
309 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | info = fd_table_.GetHandle(fd); |
310 | 2 | } | |
311 | |||
312 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
|
2 | if (!info.IsValid()) |
313 | ✗ | return -EBADF; | |
314 | |||
315 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (info.fd_in_cache_mgr >= 0) |
316 | ✗ | return cache_mgr_->GetSize(info.fd_in_cache_mgr); | |
317 | |||
318 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | return Stream(info, NULL, 0, 0); |
319 | 2 | } | |
320 | |||
321 | ✗ | int StreamingCacheManager::Dup(int fd) { | |
322 | ✗ | FdInfo info; | |
323 | |||
324 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
325 | ✗ | info = fd_table_.GetHandle(fd); | |
326 | |||
327 | ✗ | if (!info.IsValid()) | |
328 | ✗ | return -EBADF; | |
329 | |||
330 | ✗ | if (info.fd_in_cache_mgr >= 0) { | |
331 | ✗ | int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr); | |
332 | ✗ | if (dup_fd < 0) | |
333 | ✗ | return dup_fd; | |
334 | ✗ | return fd_table_.OpenFd(FdInfo(dup_fd)); | |
335 | } | ||
336 | |||
337 | ✗ | return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label))); | |
338 | } | ||
339 | |||
340 | 2 | int StreamingCacheManager::Close(int fd) { | |
341 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | FdInfo info; |
342 | { | ||
343 | 2 | MutexLockGuard lock_guard(lock_fd_table_); | |
344 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | info = fd_table_.GetHandle(fd); |
345 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
|
2 | if (!info.IsValid()) |
346 | ✗ | return -EBADF; | |
347 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | fd_table_.CloseFd(fd); |
348 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | } |
349 | |||
350 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (info.fd_in_cache_mgr >= 0) |
351 | ✗ | return cache_mgr_->Close(info.fd_in_cache_mgr); | |
352 | |||
353 | 2 | return 0; | |
354 | 2 | } | |
355 | |||
356 | 2 | int64_t StreamingCacheManager::Pread( | |
357 | int fd, void *buf, uint64_t size, uint64_t offset) | ||
358 | { | ||
359 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | FdInfo info; |
360 | { | ||
361 | 2 | MutexLockGuard lock_guard(lock_fd_table_); | |
362 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | info = fd_table_.GetHandle(fd); |
363 | 2 | } | |
364 | |||
365 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
|
2 | if (!info.IsValid()) |
366 | ✗ | return -EBADF; | |
367 | |||
368 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (info.fd_in_cache_mgr >= 0) |
369 | ✗ | return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset); | |
370 | |||
371 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | int64_t nbytes_streamed = Stream(info, buf, size, offset); |
372 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (nbytes_streamed < 0) |
373 | ✗ | return nbytes_streamed; | |
374 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (static_cast<uint64_t>(nbytes_streamed) < offset) |
375 | ✗ | return 0; | |
376 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | if (static_cast<uint64_t>(nbytes_streamed) > (offset + size)) |
377 | 2 | return size; | |
378 | ✗ | return nbytes_streamed - offset; | |
379 | 2 | } | |
380 | |||
381 | ✗ | int StreamingCacheManager::Readahead(int fd) { | |
382 | ✗ | FdInfo info; | |
383 | { | ||
384 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
385 | ✗ | info = fd_table_.GetHandle(fd); | |
386 | } | ||
387 | |||
388 | ✗ | if (!info.IsValid()) | |
389 | ✗ | return -EBADF; | |
390 | |||
391 | ✗ | if (info.fd_in_cache_mgr >= 0) | |
392 | ✗ | return cache_mgr_->Readahead(info.fd_in_cache_mgr); | |
393 | |||
394 | ✗ | return 0; | |
395 | } | ||
396 | |||
397 | ✗ | int StreamingCacheManager::OpenFromTxn(void *txn) { | |
398 | ✗ | int fd = cache_mgr_->OpenFromTxn(txn); | |
399 | ✗ | if (fd < 0) | |
400 | ✗ | return fd; | |
401 | |||
402 | ✗ | MutexLockGuard lock_guard(lock_fd_table_); | |
403 | ✗ | return fd_table_.OpenFd(FdInfo(fd)); | |
404 | } | ||
405 | |||
406 | ✗ | void *StreamingCacheManager::DoSaveState() { | |
407 | ✗ | SavedState *state = new SavedState(); | |
408 | ✗ | state->fd_table = fd_table_.Clone(); | |
409 | ✗ | state->state_backing_cachemgr = cache_mgr_->SaveState(-1); | |
410 | ✗ | return state; | |
411 | } | ||
412 | |||
413 | ✗ | int StreamingCacheManager::DoRestoreState(void *data) { | |
414 | // When DoRestoreState is called, we have fd 0 assigned to the root file | ||
415 | // catalog | ||
416 | ✗ | FdInfo handle_root = fd_table_.GetHandle(0); | |
417 | |||
418 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
419 | |||
420 | int new_backing_root_fd = | ||
421 | ✗ | cache_mgr_->RestoreState(-1, state->state_backing_cachemgr); | |
422 | ✗ | fd_table_.AssignFrom(*state->fd_table); | |
423 | |||
424 | ✗ | int new_root_fd = -1; | |
425 | ✗ | if (handle_root.IsValid()) { | |
426 | ✗ | if (new_backing_root_fd >= 0) | |
427 | ✗ | handle_root.fd_in_cache_mgr = new_backing_root_fd; | |
428 | ✗ | new_root_fd = fd_table_.OpenFd(handle_root); | |
429 | // There must be a free file descriptor because the root file catalog gets | ||
430 | // closed before a reload | ||
431 | ✗ | assert(new_root_fd >= 0); | |
432 | } | ||
433 | ✗ | return new_root_fd; | |
434 | } | ||
435 | |||
436 | ✗ | bool StreamingCacheManager::DoFreeState(void *data) { | |
437 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
438 | ✗ | cache_mgr_->FreeState(-1, state->state_backing_cachemgr); | |
439 | ✗ | delete state->fd_table; | |
440 | ✗ | delete state; | |
441 | ✗ | return true; | |
442 | } | ||
443 | |||
444 | ✗ | CacheManager *StreamingCacheManager::MoveOutBackingCacheMgr(int *root_fd) { | |
445 | ✗ | *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr; | |
446 | ✗ | return cache_mgr_.Release(); | |
447 | } | ||
448 |