| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/cache_stream.h |
| Date: | 2025-11-02 02:35:35 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 9 | 31 | 29.0% |
| Branches: | 5 | 8 | 62.5% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_CACHE_STREAM_H_ | ||
| 6 | #define CVMFS_CACHE_STREAM_H_ | ||
| 7 | |||
| 8 | #include <pthread.h> | ||
| 9 | |||
| 10 | #include <string> | ||
| 11 | |||
| 12 | #include "cache.h" | ||
| 13 | #include "crypto/hash.h" | ||
| 14 | #include "fd_table.h" | ||
| 15 | #include "ring_buffer.h" | ||
| 16 | #include "smallhash.h" | ||
| 17 | #include "util/pointer.h" | ||
| 18 | |||
| 19 | namespace download { | ||
| 20 | class DownloadManager; | ||
| 21 | } | ||
| 22 | namespace perf { | ||
| 23 | class Counter; | ||
| 24 | class Statistics; | ||
| 25 | } // namespace perf | ||
| 26 | |||
| 27 | /** | ||
| 28 | * Cache manager that streams regular files using a download manager and stores | ||
| 29 | * file catalogs in an underlying cache manager. | ||
| 30 | */ | ||
| 31 | class StreamingCacheManager : public CacheManager { | ||
| 32 | public: | ||
| 33 | static const size_t kDefaultBufferSize; | ||
| 34 | |||
| 35 | struct Counters { | ||
| 36 | perf::Counter *sz_transferred_bytes; | ||
| 37 | perf::Counter *sz_transfer_ms; | ||
| 38 | perf::Counter *n_downloads; | ||
| 39 | perf::Counter *n_buffer_hits; | ||
| 40 | perf::Counter *n_buffer_evicts; | ||
| 41 | perf::Counter *n_buffer_objects; | ||
| 42 | perf::Counter *n_buffer_obstacles; | ||
| 43 | |||
| 44 | explicit Counters(perf::Statistics *statistics); | ||
| 45 | }; | ||
| 46 | |||
| 47 | StreamingCacheManager(unsigned max_open_fds, | ||
| 48 | CacheManager *cache_mgr, | ||
| 49 | download::DownloadManager *regular_download_mgr, | ||
| 50 | download::DownloadManager *external_download_mgr, | ||
| 51 | size_t buffer_size, | ||
| 52 | perf::Statistics *statistics); | ||
| 53 | virtual ~StreamingCacheManager(); | ||
| 54 | |||
| 55 | // In the files system / mountpoint initialization, we create the cache | ||
| 56 | // manager before we know about the download manager. Hence we allow to | ||
| 57 | // patch in the download manager at a later point. | ||
| 58 | ✗ | void SetRegularDownloadManager(download::DownloadManager *download_mgr) { | |
| 59 | ✗ | regular_download_mgr_ = download_mgr; | |
| 60 | } | ||
| 61 | ✗ | void SetExternalDownloadManager(download::DownloadManager *download_mgr) { | |
| 62 | ✗ | external_download_mgr_ = download_mgr; | |
| 63 | } | ||
| 64 | |||
| 65 | ✗ | virtual CacheManagerIds id() { return kStreamingCacheManager; } | |
| 66 | virtual std::string Describe(); | ||
| 67 | |||
| 68 | virtual bool AcquireQuotaManager(QuotaManager *quota_mgr); | ||
| 69 | |||
| 70 | virtual int Open(const LabeledObject &object); | ||
| 71 | virtual int64_t GetSize(int fd); | ||
| 72 | virtual int Close(int fd); | ||
| 73 | virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset); | ||
| 74 | virtual int Dup(int fd); | ||
| 75 | virtual int Readahead(int fd); | ||
| 76 | |||
| 77 | // Only pinned objects and catalogs are written to the cache. Transactions | ||
| 78 | // are passed through to the backing cache manager. | ||
| 79 | ✗ | virtual uint32_t SizeOfTxn() { return cache_mgr_->SizeOfTxn(); } | |
| 80 | ✗ | virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn) { | |
| 81 | ✗ | return cache_mgr_->StartTxn(id, size, txn); | |
| 82 | } | ||
| 83 | ✗ | virtual void CtrlTxn(const Label &label, const int flags, void *txn) { | |
| 84 | ✗ | cache_mgr_->CtrlTxn(label, flags, txn); | |
| 85 | } | ||
| 86 | ✗ | virtual int64_t Write(const void *buf, uint64_t size, void *txn) { | |
| 87 | ✗ | return cache_mgr_->Write(buf, size, txn); | |
| 88 | } | ||
| 89 | ✗ | virtual int Reset(void *txn) { return cache_mgr_->Reset(txn); } | |
| 90 | virtual int OpenFromTxn(void *txn); | ||
| 91 | ✗ | virtual int AbortTxn(void *txn) { return cache_mgr_->AbortTxn(txn); } | |
| 92 | ✗ | virtual int CommitTxn(void *txn) { return cache_mgr_->CommitTxn(txn); } | |
| 93 | |||
| 94 | ✗ | virtual void Spawn() { cache_mgr_->Spawn(); } | |
| 95 | |||
| 96 | ✗ | virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn) { | |
| 97 | ✗ | return cache_mgr_->LoadBreadcrumb(fqrn); | |
| 98 | } | ||
| 99 | ✗ | virtual bool StoreBreadcrumb(const manifest::Manifest &manifest) { | |
| 100 | ✗ | return cache_mgr_->StoreBreadcrumb(manifest); | |
| 101 | } | ||
| 102 | |||
| 103 | // Used in cvmfs' RestoreState to switch back from the streaming to the | ||
| 104 | // regular cache manager. At this point, the streaming cache manager has | ||
| 105 | // opened the root file catalog. We need to return the file descriptor in | ||
| 106 | // the wrapped cache manager, too. | ||
| 107 | CacheManager *MoveOutBackingCacheMgr(int *root_fd); | ||
| 108 | // Used in cvmfs' RestoreState to create a virtual file descriptor for the | ||
| 109 | // root catalog fd, that has been already opened in the backing cache manager | ||
| 110 | int PlantFd(int fd_in_cache_mgr); | ||
| 111 | |||
| 112 | 320 | const Counters &counters() const { return *counters_; } | |
| 113 | |||
| 114 | protected: | ||
| 115 | virtual void *DoSaveState(); | ||
| 116 | virtual int DoRestoreState(void *data); | ||
| 117 | virtual bool DoFreeState(void *data); | ||
| 118 | |||
| 119 | private: | ||
| 120 | struct FdInfo { | ||
| 121 | int fd_in_cache_mgr; | ||
| 122 | shash::Any object_id; | ||
| 123 | CacheManager::Label label; | ||
| 124 | |||
| 125 | 320 | FdInfo() : fd_in_cache_mgr(-1) { } | |
| 126 | ✗ | explicit FdInfo(int fd) : fd_in_cache_mgr(fd) { } | |
| 127 | 80 | explicit FdInfo(const CacheManager::LabeledObject &object) | |
| 128 | 80 | : fd_in_cache_mgr(-1), object_id(object.id), label(object.label) { } | |
| 129 | |||
| 130 | 480 | bool operator==(const FdInfo &other) const { | |
| 131 | 480 | return this->fd_in_cache_mgr == other.fd_in_cache_mgr | |
| 132 |
3/4✓ Branch 0 taken 480 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 80 times.
✓ Branch 4 taken 400 times.
|
480 | && this->object_id == other.object_id; |
| 133 | } | ||
| 134 | 320 | bool operator!=(const FdInfo &other) const { return !(*this == other); } | |
| 135 | |||
| 136 |
2/4✓ Branch 0 taken 240 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 240 times.
✗ Branch 4 not taken.
|
240 | bool IsValid() const { return fd_in_cache_mgr >= 0 || !object_id.IsNull(); } |
| 137 | }; | ||
| 138 | |||
| 139 | struct SavedState { | ||
| 140 | ✗ | SavedState() : version(0), fd_table(NULL), state_backing_cachemgr(NULL) { } | |
| 141 | unsigned int version; | ||
| 142 | FdTable<FdInfo> *fd_table; | ||
| 143 | void *state_backing_cachemgr; | ||
| 144 | }; | ||
| 145 | |||
| 146 | /// Depending on info.flags, selects either the regular or the external | ||
| 147 | /// download manager | ||
| 148 | download::DownloadManager *SelectDownloadManager(const FdInfo &info); | ||
| 149 | |||
| 150 | /// Streams an object using the download manager. The complete object is read | ||
| 151 | /// and its size is returned (-errno on error). | ||
| 152 | /// The given section of the object is copied into the provided buffer, | ||
| 153 | /// which may be NULL if only the size of the object is relevant. | ||
| 154 | int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset); | ||
| 155 | |||
| 156 | UniquePtr<CacheManager> cache_mgr_; | ||
| 157 | download::DownloadManager *regular_download_mgr_; | ||
| 158 | download::DownloadManager *external_download_mgr_; | ||
| 159 | |||
| 160 | pthread_mutex_t *lock_fd_table_; | ||
| 161 | FdTable<FdInfo> fd_table_; | ||
| 162 | |||
| 163 | /// A small in-memory cache to avoid frequent re-downloads if multiple blocks | ||
| 164 | /// from the same chunk are read | ||
| 165 | UniquePtr<RingBuffer> buffer_; | ||
| 166 | SmallHashDynamic<shash::Any, RingBuffer::ObjectHandle_t> buffered_objects_; | ||
| 167 | pthread_mutex_t *lock_buffer_; | ||
| 168 | |||
| 169 | UniquePtr<Counters> counters_; | ||
| 170 | }; // class StreamingCacheManager | ||
| 171 | |||
| 172 | #endif // CVMFS_CACHE_STREAM_H_ | ||
| 173 |