| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/cache_stream.cc |
| Date: | 2025-11-02 02:35:35 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 157 | 255 | 61.6% |
| Branches: | 106 | 296 | 35.8% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | |||
| 6 | #include "cache_stream.h" | ||
| 7 | |||
| 8 | #include <algorithm> | ||
| 9 | #include <cstdlib> | ||
| 10 | #include <cstring> | ||
| 11 | #include <string> | ||
| 12 | |||
| 13 | #include "clientctx.h" | ||
| 14 | #include "network/download.h" | ||
| 15 | #include "network/sink.h" | ||
| 16 | #include "quota.h" | ||
| 17 | #include "statistics.h" | ||
| 18 | #include "util/mutex.h" | ||
| 19 | #include "util/platform.h" | ||
| 20 | #include "util/smalloc.h" | ||
| 21 | |||
| 22 | |||
| 23 | namespace { | ||
| 24 | |||
| 25 | class StreamingSink : public cvmfs::Sink { | ||
| 26 | public: | ||
| 27 | 120 | StreamingSink(void *buf, uint64_t size, uint64_t offset, | |
| 28 | unsigned char *object) | ||
| 29 | 120 | : Sink(false /* is_owner */) | |
| 30 | 120 | , pos_(0) | |
| 31 | 120 | , window_buf_(buf) | |
| 32 | 120 | , window_size_(size) | |
| 33 | 120 | , window_offset_(offset) | |
| 34 | 120 | , object_(object) { } | |
| 35 | |||
| 36 | 240 | virtual ~StreamingSink() { } | |
| 37 | |||
| 38 | 120 | virtual int64_t Write(const void *buf, uint64_t sz) { | |
| 39 |
2/2✓ Branch 0 taken 40 times.
✓ Branch 1 taken 80 times.
|
120 | if (object_) { |
| 40 | 40 | memcpy(object_ + pos_, buf, sz); | |
| 41 | } | ||
| 42 | |||
| 43 | 120 | const uint64_t old_pos = pos_; | |
| 44 | 120 | pos_ += sz; | |
| 45 | |||
| 46 |
2/2✓ Branch 0 taken 80 times.
✓ Branch 1 taken 40 times.
|
120 | if (!window_buf_) |
| 47 | 80 | return static_cast<int64_t>(sz); | |
| 48 | |||
| 49 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
|
40 | if (pos_ < window_offset_) |
| 50 | ✗ | return static_cast<int64_t>(sz); | |
| 51 | |||
| 52 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
|
40 | if (old_pos >= (window_offset_ + window_size_)) |
| 53 | ✗ | return static_cast<int64_t>(sz); | |
| 54 | |||
| 55 | 40 | const uint64_t copy_offset = std::max(old_pos, window_offset_); | |
| 56 | 40 | const uint64_t inbuf_offset = copy_offset - old_pos; | |
| 57 | 40 | const uint64_t outbuf_offset = copy_offset - window_offset_; | |
| 58 | 80 | const uint64_t copy_size = std::min(sz - inbuf_offset, | |
| 59 | 40 | window_size_ - outbuf_offset); | |
| 60 | |||
| 61 | 40 | memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset, | |
| 62 | reinterpret_cast<const unsigned char *>(buf) + inbuf_offset, | ||
| 63 | copy_size); | ||
| 64 | |||
| 65 | 40 | return static_cast<int64_t>(sz); | |
| 66 | } | ||
| 67 | |||
| 68 | ✗ | virtual int Reset() { | |
| 69 | ✗ | pos_ = 0; | |
| 70 | ✗ | return 0; | |
| 71 | } | ||
| 72 | |||
| 73 | ✗ | virtual int Purge() { return Reset(); } | |
| 74 | 120 | virtual bool IsValid() { | |
| 75 |
3/4✓ Branch 0 taken 80 times.
✓ Branch 1 taken 40 times.
✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
|
120 | return (window_buf_ != NULL) || (window_size_ == 0); |
| 76 | } | ||
| 77 | 120 | virtual int Flush() { return 0; } | |
| 78 | ✗ | virtual bool Reserve(size_t /* size */) { return true; } | |
| 79 | 480 | virtual bool RequiresReserve() { return false; } | |
| 80 | ✗ | virtual std::string Describe() { | |
| 81 | ✗ | std::string result = "Streaming sink that is "; | |
| 82 | ✗ | result += IsValid() ? "valid" : "invalid"; | |
| 83 | ✗ | return result; | |
| 84 | } | ||
| 85 | |||
| 86 | 240 | int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); } | |
| 87 | |||
| 88 | private: | ||
| 89 | uint64_t pos_; | ||
| 90 | void *window_buf_; | ||
| 91 | uint64_t window_size_; | ||
| 92 | uint64_t window_offset_; | ||
| 93 | unsigned char *object_; | ||
| 94 | }; // class StreamingSink | ||
| 95 | |||
| 96 | 200 | static inline uint32_t hasher_any(const shash::Any &key) { | |
| 97 | 200 | return *const_cast<uint32_t *>(reinterpret_cast<const uint32_t *>(key.digest) | |
| 98 | 200 | + 1); | |
| 99 | } | ||
| 100 | |||
| 101 | } // anonymous namespace | ||
| 102 | |||
| 103 | |||
| 104 | const size_t StreamingCacheManager::kDefaultBufferSize = 64 * 1024 * 1024; | ||
| 105 | |||
| 106 | |||
| 107 | 80 | StreamingCacheManager::Counters::Counters(perf::Statistics *statistics) { | |
| 108 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 80 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 80 times.
✗ Branch 10 not taken.
|
80 | sz_transferred_bytes = statistics->Register( |
| 109 | "streaming_cache_mgr.sz_transferred_bytes", | ||
| 110 | "Number of bytes downloaded by the streaming cache manager"); | ||
| 111 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 80 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 80 times.
✗ Branch 10 not taken.
|
80 | sz_transfer_ms = statistics->Register( |
| 112 | "streaming_cache_mgr.sz_transfer_ms", | ||
| 113 | "Time spent downloading data by the streaming cache manager"); | ||
| 114 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 80 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 80 times.
✗ Branch 10 not taken.
|
80 | n_downloads = statistics->Register("streaming_cache_mgr.n_downloads", |
| 115 | "Number of objects requested remotely"); | ||
| 116 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 80 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 80 times.
✗ Branch 10 not taken.
|
80 | n_buffer_hits = statistics->Register( |
| 117 | "streaming_cache_mgr.n_buffer_hits", | ||
| 118 | "Number of requests served from the buffer"); | ||
| 119 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 80 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 80 times.
✗ Branch 10 not taken.
|
80 | n_buffer_evicts = statistics->Register( |
| 120 | "streaming_cache_mgr.n_buffer_evicts", | ||
| 121 | "Number of objects evicted from the buffer"); | ||
| 122 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 80 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 80 times.
✗ Branch 10 not taken.
|
80 | n_buffer_objects = statistics->Register( |
| 123 | "streaming_cache_mgr.n_buffer_objects", | ||
| 124 | "Number of objects in the buffer"); | ||
| 125 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 80 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 80 times.
✗ Branch 10 not taken.
|
80 | n_buffer_obstacles = statistics->Register( |
| 126 | "streaming_cache_mgr.n_buffer_obstacles", | ||
| 127 | "Number of objects that could not be stored in the buffer " | ||
| 128 | "(e.g., too large)"); | ||
| 129 | 80 | } | |
| 130 | |||
| 131 | |||
| 132 | 120 | download::DownloadManager *StreamingCacheManager::SelectDownloadManager( | |
| 133 | const FdInfo &info) { | ||
| 134 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 120 times.
|
120 | if (info.label.IsExternal()) |
| 135 | ✗ | return external_download_mgr_; | |
| 136 | 120 | return regular_download_mgr_; | |
| 137 | } | ||
| 138 | |||
| 139 | |||
| 140 | 160 | int64_t StreamingCacheManager::Stream(const FdInfo &info, | |
| 141 | void *buf, | ||
| 142 | uint64_t size, | ||
| 143 | uint64_t offset) { | ||
| 144 | // Note: objects stored in the ring buffer are prepended by their hash | ||
| 145 | |||
| 146 | { | ||
| 147 | 160 | const MutexLockGuard _(lock_buffer_); | |
| 148 | RingBuffer::ObjectHandle_t handle; | ||
| 149 |
3/4✓ Branch 1 taken 160 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 40 times.
✓ Branch 4 taken 120 times.
|
160 | if (buffered_objects_.Lookup(info.object_id, &handle)) { |
| 150 | 40 | perf::Inc(counters_->n_buffer_hits); | |
| 151 |
1/2✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
|
40 | buffer_->CopySlice(handle, size, offset + sizeof(shash::Any), buf); |
| 152 |
1/2✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
|
40 | return buffer_->GetObjectSize(handle) - sizeof(shash::Any); |
| 153 | } | ||
| 154 |
2/2✓ Branch 1 taken 120 times.
✓ Branch 2 taken 40 times.
|
160 | } |
| 155 | |||
| 156 | 120 | unsigned char *object = NULL; | |
| 157 | 120 | size_t nbytes_in_buffer = 0; | |
| 158 | 240 | if ((info.label.size != CacheManager::kSizeUnknown) | |
| 159 |
5/6✓ Branch 0 taken 40 times.
✓ Branch 1 taken 80 times.
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 40 times.
✓ Branch 5 taken 80 times.
|
160 | && (info.label.size + sizeof(shash::Any) |
| 160 | 40 | <= buffer_->GetMaxObjectSize())) { | |
| 161 | 40 | nbytes_in_buffer = sizeof(shash::Any) + info.label.size; | |
| 162 | 40 | object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer)); | |
| 163 | } else { | ||
| 164 | 80 | perf::Inc(counters_->n_buffer_obstacles); | |
| 165 | } | ||
| 166 | |||
| 167 | StreamingSink sink(buf, size, offset, | ||
| 168 |
2/2✓ Branch 0 taken 40 times.
✓ Branch 1 taken 80 times.
|
120 | object ? (object + sizeof(shash::Any)) : NULL); |
| 169 | 120 | std::string url; | |
| 170 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 120 times.
|
120 | if (info.label.IsExternal()) { |
| 171 | ✗ | url = info.label.path; | |
| 172 | } else { | ||
| 173 |
2/4✓ Branch 1 taken 120 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 120 times.
✗ Branch 5 not taken.
|
120 | url = "/data/" + info.object_id.MakePath(); |
| 174 | } | ||
| 175 | 120 | const bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault; | |
| 176 | |||
| 177 | download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */, | ||
| 178 |
1/2✓ Branch 1 taken 120 times.
✗ Branch 2 not taken.
|
120 | &info.object_id, &sink); |
| 179 | 120 | download_job.SetExtraInfo(&info.label.path); | |
| 180 | 120 | download_job.SetRangeOffset(info.label.range_offset); | |
| 181 | 120 | download_job.SetRangeSize(static_cast<int64_t>(info.label.size)); | |
| 182 |
1/2✓ Branch 1 taken 120 times.
✗ Branch 2 not taken.
|
120 | ClientCtx *ctx = ClientCtx::GetInstance(); |
| 183 |
2/4✓ Branch 1 taken 120 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 120 times.
|
120 | if (ctx->IsSet()) { |
| 184 | ✗ | ctx->Get(download_job.GetUidPtr(), | |
| 185 | download_job.GetGidPtr(), | ||
| 186 | download_job.GetPidPtr(), | ||
| 187 | download_job.GetInterruptCuePtr()); | ||
| 188 | } | ||
| 189 | |||
| 190 | { | ||
| 191 | 120 | const uint64_t timestamp = platform_monotonic_time_ns(); | |
| 192 |
1/2✓ Branch 2 taken 120 times.
✗ Branch 3 not taken.
|
120 | SelectDownloadManager(info)->Fetch(&download_job); |
| 193 | 120 | perf::Xadd(counters_->sz_transfer_ms, | |
| 194 | 120 | (platform_monotonic_time_ns() - timestamp) / (1000 * 1000)); | |
| 195 | } | ||
| 196 | |||
| 197 | 120 | perf::Inc(counters_->n_downloads); | |
| 198 | 120 | perf::Xadd(counters_->sz_transferred_bytes, sink.GetNBytesStreamed()); | |
| 199 | |||
| 200 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 120 times.
|
120 | if (download_job.error_code() != download::kFailOk) { |
| 201 | ✗ | free(object); | |
| 202 | ✗ | return -EIO; | |
| 203 | } | ||
| 204 | |||
| 205 |
2/2✓ Branch 0 taken 40 times.
✓ Branch 1 taken 80 times.
|
120 | if (object) { |
| 206 | 40 | memcpy(object, &info.object_id, sizeof(shash::Any)); | |
| 207 | 40 | const MutexLockGuard _(lock_buffer_); | |
| 208 |
1/2✗ Branch 2 not taken.
✓ Branch 3 taken 40 times.
|
40 | while (!buffer_->HasSpaceFor(nbytes_in_buffer)) { |
| 209 | ✗ | const RingBuffer::ObjectHandle_t deleted_handle = buffer_->RemoveBack(); | |
| 210 | // As long as we don't add any new objects, the deleted_handle can still | ||
| 211 | // be accessed | ||
| 212 | ✗ | shash::Any deleted_hash; | |
| 213 | ✗ | buffer_->CopySlice(deleted_handle, sizeof(shash::Any), 0, &deleted_hash); | |
| 214 | ✗ | buffered_objects_.Erase(deleted_hash); | |
| 215 | ✗ | perf::Inc(counters_->n_buffer_evicts); | |
| 216 | ✗ | perf::Dec(counters_->n_buffer_objects); | |
| 217 | } | ||
| 218 |
1/2✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
|
40 | const RingBuffer::ObjectHandle_t handle = buffer_->PushFront( |
| 219 | 40 | object, nbytes_in_buffer); | |
| 220 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | buffered_objects_.Insert(info.object_id, handle); |
| 221 | 40 | perf::Inc(counters_->n_buffer_objects); | |
| 222 | 40 | } | |
| 223 | 120 | free(object); | |
| 224 | |||
| 225 | 120 | return sink.GetNBytesStreamed(); | |
| 226 | 120 | } | |
| 227 | |||
| 228 | |||
| 229 | 80 | StreamingCacheManager::StreamingCacheManager( | |
| 230 | unsigned max_open_fds, | ||
| 231 | CacheManager *cache_mgr, | ||
| 232 | download::DownloadManager *regular_download_mgr, | ||
| 233 | download::DownloadManager *external_download_mgr, | ||
| 234 | size_t buffer_size, | ||
| 235 | 80 | perf::Statistics *statistics) | |
| 236 | 80 | : cache_mgr_(cache_mgr) | |
| 237 | 80 | , regular_download_mgr_(regular_download_mgr) | |
| 238 | 80 | , external_download_mgr_(external_download_mgr) | |
| 239 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
|
80 | , fd_table_(max_open_fds, FdInfo()) |
| 240 |
6/12✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 80 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 80 times.
✗ Branch 9 not taken.
✓ Branch 11 taken 80 times.
✗ Branch 12 not taken.
✓ Branch 14 taken 80 times.
✗ Branch 15 not taken.
✓ Branch 17 taken 80 times.
✗ Branch 18 not taken.
|
160 | , counters_(new Counters(statistics)) { |
| 241 | 80 | lock_fd_table_ = reinterpret_cast<pthread_mutex_t *>( | |
| 242 | 80 | smalloc(sizeof(pthread_mutex_t))); | |
| 243 | 80 | int retval = pthread_mutex_init(lock_fd_table_, NULL); | |
| 244 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | assert(retval == 0); |
| 245 | |||
| 246 |
1/2✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
|
80 | delete quota_mgr_; |
| 247 | 80 | quota_mgr_ = cache_mgr_->quota_mgr(); | |
| 248 | |||
| 249 |
3/6✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 80 times.
✗ Branch 8 not taken.
|
80 | buffer_ = new RingBuffer(buffer_size); |
| 250 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
|
80 | buffered_objects_.Init(16, shash::Any(), hasher_any); |
| 251 | 80 | lock_buffer_ = reinterpret_cast<pthread_mutex_t *>( | |
| 252 | 80 | smalloc(sizeof(pthread_mutex_t))); | |
| 253 | 80 | retval = pthread_mutex_init(lock_buffer_, NULL); | |
| 254 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | assert(retval == 0); |
| 255 | 80 | } | |
| 256 | |||
| 257 | 320 | StreamingCacheManager::~StreamingCacheManager() { | |
| 258 | 160 | pthread_mutex_destroy(lock_buffer_); | |
| 259 | 160 | free(lock_buffer_); | |
| 260 | 160 | pthread_mutex_destroy(lock_fd_table_); | |
| 261 | 160 | free(lock_fd_table_); | |
| 262 | 160 | quota_mgr_ = NULL; // gets deleted by cache_mgr_ | |
| 263 | 320 | } | |
| 264 | |||
| 265 | ✗ | std::string StreamingCacheManager::Describe() { | |
| 266 | ✗ | return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe(); | |
| 267 | } | ||
| 268 | |||
| 269 | ✗ | bool StreamingCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) { | |
| 270 | ✗ | const bool result = cache_mgr_->AcquireQuotaManager(quota_mgr); | |
| 271 | ✗ | if (result) | |
| 272 | ✗ | quota_mgr_ = cache_mgr_->quota_mgr(); | |
| 273 | ✗ | return result; | |
| 274 | } | ||
| 275 | |||
| 276 | 80 | int StreamingCacheManager::Open(const LabeledObject &object) { | |
| 277 |
1/2✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
|
80 | const int fd_in_cache_mgr = cache_mgr_->Open(object); |
| 278 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | if (fd_in_cache_mgr >= 0) { |
| 279 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 280 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
| 281 | } | ||
| 282 | |||
| 283 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | if (fd_in_cache_mgr != -ENOENT) |
| 284 | ✗ | return fd_in_cache_mgr; | |
| 285 | |||
| 286 |
1/2✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
|
160 | if (object.label.IsCatalog() || object.label.IsPinned() |
| 287 |
3/6✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 80 times.
|
160 | || object.label.IsCertificate()) { |
| 288 | ✗ | return -ENOENT; | |
| 289 | } | ||
| 290 | |||
| 291 | 80 | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 292 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
|
80 | return fd_table_.OpenFd(FdInfo(object)); |
| 293 | 80 | } | |
| 294 | |||
| 295 | ✗ | int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) { | |
| 296 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 297 | ✗ | return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr)); | |
| 298 | } | ||
| 299 | |||
| 300 | 80 | int64_t StreamingCacheManager::GetSize(int fd) { | |
| 301 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | FdInfo info; |
| 302 | { | ||
| 303 | 80 | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 304 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | info = fd_table_.GetHandle(fd); |
| 305 | 80 | } | |
| 306 | |||
| 307 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 80 times.
|
80 | if (!info.IsValid()) |
| 308 | ✗ | return -EBADF; | |
| 309 | |||
| 310 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | if (info.fd_in_cache_mgr >= 0) |
| 311 | ✗ | return cache_mgr_->GetSize(info.fd_in_cache_mgr); | |
| 312 | |||
| 313 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | return Stream(info, NULL, 0, 0); |
| 314 | 80 | } | |
| 315 | |||
| 316 | ✗ | int StreamingCacheManager::Dup(int fd) { | |
| 317 | ✗ | FdInfo info; | |
| 318 | |||
| 319 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 320 | ✗ | info = fd_table_.GetHandle(fd); | |
| 321 | |||
| 322 | ✗ | if (!info.IsValid()) | |
| 323 | ✗ | return -EBADF; | |
| 324 | |||
| 325 | ✗ | if (info.fd_in_cache_mgr >= 0) { | |
| 326 | ✗ | const int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr); | |
| 327 | ✗ | if (dup_fd < 0) | |
| 328 | ✗ | return dup_fd; | |
| 329 | ✗ | return fd_table_.OpenFd(FdInfo(dup_fd)); | |
| 330 | } | ||
| 331 | |||
| 332 | ✗ | return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label))); | |
| 333 | } | ||
| 334 | |||
| 335 | 80 | int StreamingCacheManager::Close(int fd) { | |
| 336 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | FdInfo info; |
| 337 | { | ||
| 338 | 80 | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 339 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | info = fd_table_.GetHandle(fd); |
| 340 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 80 times.
|
80 | if (!info.IsValid()) |
| 341 | ✗ | return -EBADF; | |
| 342 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | fd_table_.CloseFd(fd); |
| 343 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | } |
| 344 | |||
| 345 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | if (info.fd_in_cache_mgr >= 0) |
| 346 | ✗ | return cache_mgr_->Close(info.fd_in_cache_mgr); | |
| 347 | |||
| 348 | 80 | return 0; | |
| 349 | 80 | } | |
| 350 | |||
| 351 | 80 | int64_t StreamingCacheManager::Pread(int fd, void *buf, uint64_t size, | |
| 352 | uint64_t offset) { | ||
| 353 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | FdInfo info; |
| 354 | { | ||
| 355 | 80 | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 356 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | info = fd_table_.GetHandle(fd); |
| 357 | 80 | } | |
| 358 | |||
| 359 |
2/4✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 80 times.
|
80 | if (!info.IsValid()) |
| 360 | ✗ | return -EBADF; | |
| 361 | |||
| 362 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | if (info.fd_in_cache_mgr >= 0) |
| 363 | ✗ | return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset); | |
| 364 | |||
| 365 |
1/2✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
|
80 | const int64_t nbytes_streamed = Stream(info, buf, size, offset); |
| 366 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | if (nbytes_streamed < 0) |
| 367 | ✗ | return nbytes_streamed; | |
| 368 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 80 times.
|
80 | if (static_cast<uint64_t>(nbytes_streamed) < offset) |
| 369 | ✗ | return 0; | |
| 370 |
1/2✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
|
80 | if (static_cast<uint64_t>(nbytes_streamed) > (offset + size)) |
| 371 | 80 | return size; | |
| 372 | ✗ | return nbytes_streamed - offset; | |
| 373 | 80 | } | |
| 374 | |||
| 375 | ✗ | int StreamingCacheManager::Readahead(int fd) { | |
| 376 | ✗ | FdInfo info; | |
| 377 | { | ||
| 378 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 379 | ✗ | info = fd_table_.GetHandle(fd); | |
| 380 | } | ||
| 381 | |||
| 382 | ✗ | if (!info.IsValid()) | |
| 383 | ✗ | return -EBADF; | |
| 384 | |||
| 385 | ✗ | if (info.fd_in_cache_mgr >= 0) | |
| 386 | ✗ | return cache_mgr_->Readahead(info.fd_in_cache_mgr); | |
| 387 | |||
| 388 | ✗ | return 0; | |
| 389 | } | ||
| 390 | |||
| 391 | ✗ | int StreamingCacheManager::OpenFromTxn(void *txn) { | |
| 392 | ✗ | const int fd = cache_mgr_->OpenFromTxn(txn); | |
| 393 | ✗ | if (fd < 0) | |
| 394 | ✗ | return fd; | |
| 395 | |||
| 396 | ✗ | const MutexLockGuard lock_guard(lock_fd_table_); | |
| 397 | ✗ | return fd_table_.OpenFd(FdInfo(fd)); | |
| 398 | } | ||
| 399 | |||
| 400 | ✗ | void *StreamingCacheManager::DoSaveState() { | |
| 401 | ✗ | SavedState *state = new SavedState(); | |
| 402 | ✗ | state->fd_table = fd_table_.Clone(); | |
| 403 | ✗ | state->state_backing_cachemgr = cache_mgr_->SaveState(-1); | |
| 404 | ✗ | return state; | |
| 405 | } | ||
| 406 | |||
| 407 | ✗ | int StreamingCacheManager::DoRestoreState(void *data) { | |
| 408 | // When DoRestoreState is called, we have fd 0 assigned to the root file | ||
| 409 | // catalog | ||
| 410 | ✗ | FdInfo handle_root = fd_table_.GetHandle(0); | |
| 411 | |||
| 412 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
| 413 | |||
| 414 | ✗ | const int new_backing_root_fd = cache_mgr_->RestoreState( | |
| 415 | -1, state->state_backing_cachemgr); | ||
| 416 | ✗ | fd_table_.AssignFrom(*state->fd_table); | |
| 417 | |||
| 418 | ✗ | int new_root_fd = -1; | |
| 419 | ✗ | if (handle_root.IsValid()) { | |
| 420 | ✗ | if (new_backing_root_fd >= 0) | |
| 421 | ✗ | handle_root.fd_in_cache_mgr = new_backing_root_fd; | |
| 422 | ✗ | new_root_fd = fd_table_.OpenFd(handle_root); | |
| 423 | // There must be a free file descriptor because the root file catalog gets | ||
| 424 | // closed before a reload | ||
| 425 | ✗ | assert(new_root_fd >= 0); | |
| 426 | } | ||
| 427 | ✗ | return new_root_fd; | |
| 428 | } | ||
| 429 | |||
| 430 | ✗ | bool StreamingCacheManager::DoFreeState(void *data) { | |
| 431 | ✗ | SavedState *state = reinterpret_cast<SavedState *>(data); | |
| 432 | ✗ | cache_mgr_->FreeState(-1, state->state_backing_cachemgr); | |
| 433 | ✗ | delete state->fd_table; | |
| 434 | ✗ | delete state; | |
| 435 | ✗ | return true; | |
| 436 | } | ||
| 437 | |||
| 438 | ✗ | CacheManager *StreamingCacheManager::MoveOutBackingCacheMgr(int *root_fd) { | |
| 439 | ✗ | *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr; | |
| 440 | ✗ | return cache_mgr_.Release(); | |
| 441 | } | ||
| 442 |