Directory: | cvmfs/ |
---|---|
File: | cvmfs/cache_stream.h |
Date: | 2025-03-09 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 10 | 32 | 31.2% |
Branches: | 5 | 8 | 62.5% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_CACHE_STREAM_H_ | ||
6 | #define CVMFS_CACHE_STREAM_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <string> | ||
11 | |||
12 | #include "cache.h" | ||
13 | #include "crypto/hash.h" | ||
14 | #include "fd_table.h" | ||
15 | #include "ring_buffer.h" | ||
16 | #include "smallhash.h" | ||
17 | #include "util/pointer.h" | ||
18 | |||
19 | namespace download { | ||
20 | class DownloadManager; | ||
21 | } | ||
22 | namespace perf { | ||
23 | class Counter; | ||
24 | class Statistics; | ||
25 | } | ||
26 | |||
27 | /** | ||
28 | * Cache manager that streams regular files using a download manager and stores | ||
29 | * file catalogs in an underlying cache manager. | ||
30 | */ | ||
31 | class StreamingCacheManager : public CacheManager { | ||
32 | public: | ||
33 | static const size_t kDefaultBufferSize; | ||
34 | |||
35 | struct Counters { | ||
36 | perf::Counter *sz_transferred_bytes; | ||
37 | perf::Counter *sz_transfer_ms; | ||
38 | perf::Counter *n_downloads; | ||
39 | perf::Counter *n_buffer_hits; | ||
40 | perf::Counter *n_buffer_evicts; | ||
41 | perf::Counter *n_buffer_objects; | ||
42 | perf::Counter *n_buffer_obstacles; | ||
43 | |||
44 | explicit Counters(perf::Statistics *statistics); | ||
45 | }; | ||
46 | |||
47 | StreamingCacheManager(unsigned max_open_fds, | ||
48 | CacheManager *cache_mgr, | ||
49 | download::DownloadManager *regular_download_mgr, | ||
50 | download::DownloadManager *external_download_mgr, | ||
51 | size_t buffer_size, | ||
52 | perf::Statistics *statistics); | ||
53 | virtual ~StreamingCacheManager(); | ||
54 | |||
55 | // In the files system / mountpoint initialization, we create the cache | ||
56 | // manager before we know about the download manager. Hence we allow to | ||
57 | // patch in the download manager at a later point. | ||
58 | ✗ | void SetRegularDownloadManager(download::DownloadManager *download_mgr) { | |
59 | ✗ | regular_download_mgr_ = download_mgr; | |
60 | } | ||
61 | ✗ | void SetExternalDownloadManager(download::DownloadManager *download_mgr) { | |
62 | ✗ | external_download_mgr_ = download_mgr; | |
63 | } | ||
64 | |||
65 | ✗ | virtual CacheManagerIds id() { return kStreamingCacheManager; } | |
66 | virtual std::string Describe(); | ||
67 | |||
68 | virtual bool AcquireQuotaManager(QuotaManager *quota_mgr); | ||
69 | |||
70 | virtual int Open(const LabeledObject &object); | ||
71 | virtual int64_t GetSize(int fd); | ||
72 | virtual int Close(int fd); | ||
73 | virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset); | ||
74 | virtual int Dup(int fd); | ||
75 | virtual int Readahead(int fd); | ||
76 | |||
77 | // Only pinned objects and catalogs are written to the cache. Transactions | ||
78 | // are passed through to the backing cache manager. | ||
79 | ✗ | virtual uint32_t SizeOfTxn() { return cache_mgr_->SizeOfTxn(); } | |
80 | ✗ | virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn) { | |
81 | ✗ | return cache_mgr_->StartTxn(id, size, txn); | |
82 | } | ||
83 | ✗ | virtual void CtrlTxn(const Label &label, const int flags, void *txn) { | |
84 | ✗ | cache_mgr_->CtrlTxn(label, flags, txn); | |
85 | } | ||
86 | ✗ | virtual int64_t Write(const void *buf, uint64_t size, void *txn) | |
87 | { | ||
88 | ✗ | return cache_mgr_->Write(buf, size, txn); | |
89 | } | ||
90 | ✗ | virtual int Reset(void *txn) { return cache_mgr_->Reset(txn); } | |
91 | virtual int OpenFromTxn(void *txn); | ||
92 | ✗ | virtual int AbortTxn(void *txn) { return cache_mgr_->AbortTxn(txn); } | |
93 | ✗ | virtual int CommitTxn(void *txn) { return cache_mgr_->CommitTxn(txn); } | |
94 | |||
95 | ✗ | virtual void Spawn() { cache_mgr_->Spawn(); } | |
96 | |||
97 | ✗ | virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn) { | |
98 | ✗ | return cache_mgr_->LoadBreadcrumb(fqrn); | |
99 | } | ||
100 | ✗ | virtual bool StoreBreadcrumb(const manifest::Manifest &manifest) { | |
101 | ✗ | return cache_mgr_->StoreBreadcrumb(manifest); | |
102 | } | ||
103 | |||
104 | // Used in cvmfs' RestoreState to switch back from the streaming to the | ||
105 | // regular cache manager. At this point, the streaming cache manager has | ||
106 | // opened the root file catalog. We need to return the file descriptor in | ||
107 | // the wrapped cache manager, too. | ||
108 | CacheManager *MoveOutBackingCacheMgr(int *root_fd); | ||
109 | // Used in cvmfs' RestoreState to create a virtual file descriptor for the | ||
110 | // root catalog fd, that has been already opened in the backing cache manager | ||
111 | int PlantFd(int fd_in_cache_mgr); | ||
112 | |||
113 | 8 | const Counters &counters() const { return *counters_; } | |
114 | |||
115 | protected: | ||
116 | virtual void *DoSaveState(); | ||
117 | virtual int DoRestoreState(void *data); | ||
118 | virtual bool DoFreeState(void *data); | ||
119 | |||
120 | private: | ||
121 | struct FdInfo { | ||
122 | int fd_in_cache_mgr; | ||
123 | shash::Any object_id; | ||
124 | CacheManager::Label label; | ||
125 | |||
126 | 8 | FdInfo() : fd_in_cache_mgr(-1) {} | |
127 | ✗ | explicit FdInfo(int fd) : fd_in_cache_mgr(fd) {} | |
128 | 2 | explicit FdInfo(const CacheManager::LabeledObject &object) | |
129 | 2 | : fd_in_cache_mgr(-1), object_id(object.id), label(object.label) {} | |
130 | |||
131 | 12 | bool operator ==(const FdInfo &other) const { | |
132 |
3/4✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 10 times.
|
24 | return this->fd_in_cache_mgr == other.fd_in_cache_mgr && |
133 | 24 | this->object_id == other.object_id; | |
134 | } | ||
135 | 8 | bool operator !=(const FdInfo &other) const { | |
136 | 8 | return !(*this == other); | |
137 | } | ||
138 | |||
139 |
2/4✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 6 times.
✗ Branch 4 not taken.
|
6 | bool IsValid() const { return fd_in_cache_mgr >= 0 || !object_id.IsNull(); } |
140 | }; | ||
141 | |||
142 | struct SavedState { | ||
143 | ✗ | SavedState() : version(0), fd_table(NULL), state_backing_cachemgr(NULL) { } | |
144 | unsigned int version; | ||
145 | FdTable<FdInfo> *fd_table; | ||
146 | void *state_backing_cachemgr; | ||
147 | }; | ||
148 | |||
149 | /// Depending on info.flags, selects either the regular or the external | ||
150 | /// download manager | ||
151 | download::DownloadManager *SelectDownloadManager(const FdInfo &info); | ||
152 | |||
153 | /// Streams an object using the download manager. The complete object is read | ||
154 | /// and its size is returned (-errno on error). | ||
155 | /// The given section of the object is copied into the provided buffer, | ||
156 | /// which may be NULL if only the size of the object is relevant. | ||
157 | int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset); | ||
158 | |||
159 | UniquePtr<CacheManager> cache_mgr_; | ||
160 | download::DownloadManager *regular_download_mgr_; | ||
161 | download::DownloadManager *external_download_mgr_; | ||
162 | |||
163 | pthread_mutex_t *lock_fd_table_; | ||
164 | FdTable<FdInfo> fd_table_; | ||
165 | |||
166 | /// A small in-memory cache to avoid frequent re-downloads if multiple blocks | ||
167 | /// from the same chunk are read | ||
168 | UniquePtr<RingBuffer> buffer_; | ||
169 | SmallHashDynamic<shash::Any, RingBuffer::ObjectHandle_t> buffered_objects_; | ||
170 | pthread_mutex_t *lock_buffer_; | ||
171 | |||
172 | UniquePtr<Counters> counters_; | ||
173 | }; // class StreamingCacheManager | ||
174 | |||
175 | #endif // CVMFS_CACHE_STREAM_H_ | ||
176 |