Directory: | cvmfs/ |
---|---|
File: | cvmfs/cache_extern.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 125 | 142 | 88.0% |
Branches: | 19 | 38 | 50.0% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | #ifndef CVMFS_CACHE_EXTERN_H_ | ||
5 | #define CVMFS_CACHE_EXTERN_H_ | ||
6 | |||
7 | #ifndef __STDC_FORMAT_MACROS | ||
8 | #define __STDC_FORMAT_MACROS | ||
9 | #endif | ||
10 | |||
11 | #include <pthread.h> | ||
12 | #include <stdint.h> | ||
13 | #include <unistd.h> | ||
14 | |||
15 | #include <cassert> | ||
16 | #include <string> | ||
17 | #include <vector> | ||
18 | |||
19 | #include "cache.h" | ||
20 | #include "cache_transport.h" | ||
21 | #include "crypto/hash.h" | ||
22 | #include "fd_table.h" | ||
23 | #include "gtest/gtest_prod.h" | ||
24 | #include "quota.h" | ||
25 | #include "util/atomic.h" | ||
26 | #include "util/concurrency.h" | ||
27 | #include "util/single_copy.h" | ||
28 | |||
29 | |||
30 | class ExternalCacheManager : public CacheManager { | ||
31 | FRIEND_TEST(T_ExternalCacheManager, TransactionAbort); | ||
32 | friend class ExternalQuotaManager; | ||
33 | |||
34 | public: | ||
35 | static const unsigned kPbProtocolVersion = 1; | ||
36 | /** | ||
37 | * Used for race-free startup of an external cache plugin. | ||
38 | */ | ||
39 | class PluginHandle { | ||
40 | friend class ExternalCacheManager; | ||
41 | |||
42 | public: | ||
43 | ✗ | PluginHandle() : fd_connection_(-1) { } | |
44 | ✗ | bool IsValid() const { return fd_connection_ >= 0; } | |
45 | ✗ | int fd_connection() const { return fd_connection_; } | |
46 | ✗ | std::string error_msg() const { return error_msg_; } | |
47 | |||
48 | private: | ||
49 | /** | ||
50 | * The connected file descriptor to pass to Create() | ||
51 | */ | ||
52 | int fd_connection_; | ||
53 | |||
54 | std::string error_msg_; | ||
55 | }; | ||
56 | |||
57 | static PluginHandle *CreatePlugin(const std::string &locator, | ||
58 | const std::vector<std::string> &cmd_line); | ||
59 | |||
60 | static ExternalCacheManager *Create(int fd_connection, | ||
61 | unsigned max_open_fds, | ||
62 | const std::string &ident); | ||
63 | virtual ~ExternalCacheManager(); | ||
64 | |||
65 | 90 | virtual CacheManagerIds id() { return kExternalCacheManager; } | |
66 | virtual std::string Describe(); | ||
67 | virtual bool AcquireQuotaManager(QuotaManager *quota_mgr); | ||
68 | |||
69 | virtual int Open(const LabeledObject &object); | ||
70 | virtual int64_t GetSize(int fd); | ||
71 | virtual int Close(int fd); | ||
72 | virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset); | ||
73 | virtual int Dup(int fd); | ||
74 | virtual int Readahead(int fd); | ||
75 | |||
76 | #ifdef __APPLE__ | ||
77 | virtual uint32_t SizeOfTxn() { return sizeof(Transaction); } | ||
78 | #else | ||
79 | 2152 | virtual uint32_t SizeOfTxn() { | |
80 | 2152 | return sizeof(Transaction) + max_object_size_; | |
81 | } | ||
82 | #endif | ||
83 | virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn); | ||
84 | virtual void CtrlTxn(const Label &label, const int flags, void *txn); | ||
85 | virtual int64_t Write(const void *buf, uint64_t size, void *txn); | ||
86 | virtual int Reset(void *txn); | ||
87 | virtual int AbortTxn(void *txn); | ||
88 | virtual int OpenFromTxn(void *txn); | ||
89 | virtual int CommitTxn(void *txn); | ||
90 | |||
91 | virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn); | ||
92 | virtual bool StoreBreadcrumb(const manifest::Manifest &manifest); | ||
93 | |||
94 | virtual void Spawn(); | ||
95 | |||
96 | 38 | int64_t session_id() const { return session_id_; } | |
97 | 196 | uint32_t max_object_size() const { return max_object_size_; } | |
98 | 10 | uint64_t capabilities() const { return capabilities_; } | |
99 | 15 | pid_t pid_plugin() const { return pid_plugin_; } | |
100 | |||
101 | protected: | ||
102 | virtual void *DoSaveState(); | ||
103 | virtual int DoRestoreState(void *data); | ||
104 | virtual bool DoFreeState(void *data); | ||
105 | |||
106 | private: | ||
107 | /** | ||
108 | * The null hash (hashed output is all null bytes) serves as a marker for an | ||
109 | * invalid handle | ||
110 | */ | ||
111 | static const shash::Any kInvalidHandle; | ||
112 | /** | ||
113 | * Objects cannot be larger than 512 kB. Keeps transaction memory consumption | ||
114 | * under control. | ||
115 | */ | ||
116 | static const unsigned kMaxSupportedObjectSize = 512 * 1024; | ||
117 | /** | ||
118 | * Statistically, at least half of our objects should not be further chunked. | ||
119 | */ | ||
120 | static const unsigned kMinSupportedObjectSize = 4 * 1024; | ||
121 | |||
122 | struct Transaction { | ||
123 | 2122 | explicit Transaction(const shash::Any &id) | |
124 | 2122 | : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction)) | |
125 | 2122 | , buf_pos(0) | |
126 | 2122 | , size(0) | |
127 | 2122 | , expected_size(kSizeUnknown) | |
128 | 2122 | , label() | |
129 | 2122 | , open_fds(0) | |
130 | 2122 | , flushed(false) | |
131 | 2122 | , committed(false) | |
132 | 2122 | , label_modified(false) | |
133 | 2122 | , transaction_id(0) | |
134 | 2122 | , id(id) { } | |
135 | |||
136 | /** | ||
137 | * Allocated size is max_object_size_, allocated by the caller at the end | ||
138 | * of the transaction (Linux only). | ||
139 | */ | ||
140 | unsigned char *buffer; | ||
141 | unsigned buf_pos; | ||
142 | uint64_t size; | ||
143 | uint64_t expected_size; | ||
144 | Label label; | ||
145 | int open_fds; | ||
146 | bool flushed; | ||
147 | bool committed; | ||
148 | bool label_modified; | ||
149 | uint64_t transaction_id; | ||
150 | shash::Any id; | ||
151 | }; // class Transaction | ||
152 | |||
153 | struct ReadOnlyHandle { | ||
154 | 8555 | ReadOnlyHandle() : id(kInvalidHandle) { } | |
155 | 4472 | explicit ReadOnlyHandle(const shash::Any &h) : id(h) { } | |
156 | 12724 | bool operator==(const ReadOnlyHandle &other) const { | |
157 | 12724 | return this->id == other.id; | |
158 | } | ||
159 | 17615 | bool operator!=(const ReadOnlyHandle &other) const { | |
160 | 17615 | return this->id != other.id; | |
161 | } | ||
162 | shash::Any id; | ||
163 | }; // class ReadOnlyHandle | ||
164 | |||
165 | class RpcJob { | ||
166 | public: | ||
167 | 10964 | explicit RpcJob(cvmfs::MsgRefcountReq *msg) | |
168 | 10964 | : req_id_(msg->req_id()) | |
169 | 10964 | , part_nr_(0) | |
170 | 10964 | , msg_req_(msg) | |
171 |
1/2✓ Branch 2 taken 10964 times.
✗ Branch 3 not taken.
|
10964 | , frame_send_(msg) { } |
172 | 264 | explicit RpcJob(cvmfs::MsgObjectInfoReq *msg) | |
173 | 264 | : req_id_(msg->req_id()) | |
174 | 264 | , part_nr_(0) | |
175 | 264 | , msg_req_(msg) | |
176 |
1/2✓ Branch 2 taken 264 times.
✗ Branch 3 not taken.
|
264 | , frame_send_(msg) { } |
177 | 33802 | explicit RpcJob(cvmfs::MsgReadReq *msg) | |
178 | 33802 | : req_id_(msg->req_id()) | |
179 | 33832 | , part_nr_(0) | |
180 | 33832 | , msg_req_(msg) | |
181 |
1/2✓ Branch 2 taken 33832 times.
✗ Branch 3 not taken.
|
33832 | , frame_send_(msg) { } |
182 | 11534 | explicit RpcJob(cvmfs::MsgStoreReq *msg) | |
183 | 11534 | : req_id_(msg->req_id()) | |
184 | 11534 | , part_nr_(msg->part_nr()) | |
185 | 11534 | , msg_req_(msg) | |
186 |
1/2✓ Branch 2 taken 11534 times.
✗ Branch 3 not taken.
|
11534 | , frame_send_(msg) { } |
187 | 15 | explicit RpcJob(cvmfs::MsgStoreAbortReq *msg) | |
188 | 15 | : req_id_(msg->req_id()) | |
189 | 15 | , part_nr_(0) | |
190 | 15 | , msg_req_(msg) | |
191 |
1/2✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
|
15 | , frame_send_(msg) { } |
192 | 84 | explicit RpcJob(cvmfs::MsgInfoReq *msg) | |
193 | 84 | : req_id_(msg->req_id()) | |
194 | 84 | , part_nr_(0) | |
195 | 84 | , msg_req_(msg) | |
196 |
1/2✓ Branch 2 taken 84 times.
✗ Branch 3 not taken.
|
84 | , frame_send_(msg) { } |
197 | 38 | explicit RpcJob(cvmfs::MsgShrinkReq *msg) | |
198 | 38 | : req_id_(msg->req_id()) | |
199 | 38 | , part_nr_(0) | |
200 | 38 | , msg_req_(msg) | |
201 |
1/2✓ Branch 2 taken 38 times.
✗ Branch 3 not taken.
|
38 | , frame_send_(msg) { } |
202 | 158 | explicit RpcJob(cvmfs::MsgListReq *msg) | |
203 | 158 | : req_id_(msg->req_id()) | |
204 | 158 | , part_nr_(0) | |
205 | 158 | , msg_req_(msg) | |
206 |
1/2✓ Branch 2 taken 158 times.
✗ Branch 3 not taken.
|
158 | , frame_send_(msg) { } |
207 | 34 | explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg) | |
208 | 34 | : req_id_(msg->req_id()) | |
209 | 34 | , part_nr_(0) | |
210 | 34 | , msg_req_(msg) | |
211 |
1/2✓ Branch 2 taken 34 times.
✗ Branch 3 not taken.
|
34 | , frame_send_(msg) { } |
212 | 17 | explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg) | |
213 | 17 | : req_id_(msg->req_id()) | |
214 | 17 | , part_nr_(0) | |
215 | 17 | , msg_req_(msg) | |
216 |
1/2✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
|
17 | , frame_send_(msg) { } |
217 | |||
218 | 11534 | void set_attachment_send(void *data, unsigned size) { | |
219 | 11534 | frame_send_.set_attachment(data, size); | |
220 | 11534 | } | |
221 | |||
222 | 33832 | void set_attachment_recv(void *data, unsigned size) { | |
223 | 33832 | frame_recv_.set_attachment(data, size); | |
224 | 33832 | } | |
225 | |||
226 | google::protobuf::MessageLite *msg_req() { return msg_req_; } | ||
227 | // Type checking has been already performed | ||
228 | 10964 | cvmfs::MsgRefcountReply *msg_refcount_reply() { | |
229 | cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>( | ||
230 | 10964 | frame_recv_.GetMsgTyped()); | |
231 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 10949 times.
|
10949 | assert(m->req_id() == req_id_); |
232 | 10949 | return m; | |
233 | } | ||
234 | 264 | cvmfs::MsgObjectInfoReply *msg_object_info_reply() { | |
235 | cvmfs::MsgObjectInfoReply | ||
236 | *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>( | ||
237 | 264 | frame_recv_.GetMsgTyped()); | |
238 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 264 times.
|
264 | assert(m->req_id() == req_id_); |
239 | 264 | return m; | |
240 | } | ||
241 | 33862 | cvmfs::MsgReadReply *msg_read_reply() { | |
242 | cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>( | ||
243 | 33862 | frame_recv_.GetMsgTyped()); | |
244 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 33787 times.
|
33787 | assert(m->req_id() == req_id_); |
245 | 33787 | return m; | |
246 | } | ||
247 | 11549 | cvmfs::MsgStoreReply *msg_store_reply() { | |
248 | cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>( | ||
249 | 11549 | frame_recv_.GetMsgTyped()); | |
250 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 11549 times.
|
11549 | assert(m->req_id() == req_id_); |
251 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 11549 times.
|
11549 | assert(m->part_nr() == part_nr_); |
252 | 11549 | return m; | |
253 | } | ||
254 | 84 | cvmfs::MsgInfoReply *msg_info_reply() { | |
255 | cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>( | ||
256 | 84 | frame_recv_.GetMsgTyped()); | |
257 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 84 times.
|
84 | assert(m->req_id() == req_id_); |
258 | 84 | return m; | |
259 | } | ||
260 | 38 | cvmfs::MsgShrinkReply *msg_shrink_reply() { | |
261 | cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>( | ||
262 | 38 | frame_recv_.GetMsgTyped()); | |
263 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 38 times.
|
38 | assert(m->req_id() == req_id_); |
264 | 38 | return m; | |
265 | } | ||
266 | 158 | cvmfs::MsgListReply *msg_list_reply() { | |
267 | cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>( | ||
268 | 158 | frame_recv_.GetMsgTyped()); | |
269 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
|
158 | assert(m->req_id() == req_id_); |
270 | 158 | return m; | |
271 | } | ||
272 | 51 | cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() { | |
273 | cvmfs::MsgBreadcrumbReply | ||
274 | *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>( | ||
275 | 51 | frame_recv_.GetMsgTyped()); | |
276 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 51 times.
|
51 | assert(m->req_id() == req_id_); |
277 | 51 | return m; | |
278 | } | ||
279 | |||
280 | 56970 | CacheTransport::Frame *frame_send() { return &frame_send_; } | |
281 | 177621 | CacheTransport::Frame *frame_recv() { return &frame_recv_; } | |
282 | 30480 | uint64_t req_id() const { return req_id_; } | |
283 | 30420 | uint64_t part_nr() const { return part_nr_; } | |
284 | |||
285 | private: | ||
286 | uint64_t req_id_; | ||
287 | uint64_t part_nr_; | ||
288 | google::protobuf::MessageLite *msg_req_; | ||
289 | CacheTransport::Frame frame_send_; | ||
290 | CacheTransport::Frame frame_recv_; | ||
291 | }; // class RpcJob | ||
292 | |||
293 | struct RpcInFlight { | ||
294 | 30420 | RpcInFlight() : rpc_job(NULL), signal(NULL) { } | |
295 | 30420 | RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { } | |
296 | |||
297 | RpcJob *rpc_job; | ||
298 | Signal *signal; | ||
299 | }; | ||
300 | |||
301 | static void *MainRead(void *data); | ||
302 | static int ConnectLocator(const std::string &locator, bool print_error); | ||
303 | static bool SpawnPlugin(const std::vector<std::string> &cmd_line); | ||
304 | |||
305 | explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds); | ||
306 | 47498 | int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); } | |
307 | void CallRemotely(RpcJob *rpc_job); | ||
308 | int ChangeRefcount(const shash::Any &id, int change_by); | ||
309 | int DoOpen(const shash::Any &id); | ||
310 | shash::Any GetHandle(int fd); | ||
311 | int Flush(bool do_commit, Transaction *transaction); | ||
312 | |||
313 | pid_t pid_plugin_; | ||
314 | FdTable<ReadOnlyHandle> fd_table_; | ||
315 | CacheTransport transport_; | ||
316 | int64_t session_id_; | ||
317 | uint32_t max_object_size_; | ||
318 | bool spawned_; | ||
319 | bool terminated_; | ||
320 | pthread_rwlock_t rwlock_fd_table_; | ||
321 | atomic_int64 next_request_id_; | ||
322 | |||
323 | /** | ||
324 | * Serialize concurrent write access to the session fd | ||
325 | */ | ||
326 | pthread_mutex_t lock_send_fd_; | ||
327 | std::vector<RpcInFlight> inflight_rpcs_; | ||
328 | pthread_mutex_t lock_inflight_rpcs_; | ||
329 | pthread_t thread_read_; | ||
330 | uint64_t capabilities_; | ||
331 | }; // class ExternalCacheManager | ||
332 | |||
333 | |||
334 | class ExternalQuotaManager : public QuotaManager { | ||
335 | public: | ||
336 | static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr); | ||
337 | virtual bool HasCapability(Capabilities capability); | ||
338 | |||
339 | ✗ | virtual void Insert(const shash::Any &hash, const uint64_t size, | |
340 | ✗ | const std::string &description) { } | |
341 | |||
342 | ✗ | virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, | |
343 | ✗ | const std::string &description) { } | |
344 | |||
345 | ✗ | virtual bool Pin(const shash::Any &hash, const uint64_t size, | |
346 | const std::string &description, const bool is_catalog) { | ||
347 | ✗ | return is_catalog; | |
348 | } | ||
349 | |||
350 | ✗ | virtual void Unpin(const shash::Any &hash) { } | |
351 | ✗ | virtual void Touch(const shash::Any &hash) { } | |
352 | ✗ | virtual void Remove(const shash::Any &file) { } | |
353 | virtual bool Cleanup(const uint64_t leave_size); | ||
354 | |||
355 | virtual void RegisterBackChannel(int back_channel[2], | ||
356 | const std::string &channel_id); | ||
357 | virtual void UnregisterBackChannel(int back_channel[2], | ||
358 | const std::string &channel_id); | ||
359 | |||
360 | virtual std::vector<std::string> List(); | ||
361 | virtual std::vector<std::string> ListPinned(); | ||
362 | virtual std::vector<std::string> ListCatalogs(); | ||
363 | virtual std::vector<std::string> ListVolatile(); | ||
364 | ✗ | virtual uint64_t GetMaxFileSize() { return uint64_t(-1); } | |
365 | virtual uint64_t GetCapacity(); | ||
366 | virtual uint64_t GetSize(); | ||
367 | virtual uint64_t GetSizePinned(); | ||
368 | ✗ | virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT | |
369 | virtual uint64_t GetCleanupRate(uint64_t period_s); | ||
370 | |||
371 | ✗ | virtual void Spawn() { } | |
372 | 15 | virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); } | |
373 | ✗ | virtual uint32_t GetProtocolRevision() { return 0; } | |
374 | |||
375 | private: | ||
376 | struct QuotaInfo { | ||
377 | 84 | QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { } | |
378 | uint64_t size; | ||
379 | uint64_t used; | ||
380 | uint64_t pinned; | ||
381 | uint64_t no_shrink; | ||
382 | }; | ||
383 | |||
384 | 294 | explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr) | |
385 | 294 | : cache_mgr_(cache_mgr) { } | |
386 | int GetInfo(QuotaInfo *quota_info); | ||
387 | bool DoListing(cvmfs::EnumObjectType type, | ||
388 | std::vector<cvmfs::MsgListRecord> *result); | ||
389 | |||
390 | ExternalCacheManager *cache_mgr_; | ||
391 | }; | ||
392 | |||
393 | #endif // CVMFS_CACHE_EXTERN_H_ | ||
394 |