GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.h
Date: 2026-05-10 02:36:07
Exec Total Coverage
Lines: 125 142 88.0%
Branches: 19 38 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6
7 #include <pthread.h>
8 #include <stdint.h>
9 #include <unistd.h>
10
11 #include <cassert>
12 #include <string>
13 #include <vector>
14
15 #include "cache.h"
16 #include "cache_transport.h"
17 #include "crypto/hash.h"
18 #include "fd_table.h"
19 #include "duplex_testing.h"
20 #include "quota.h"
21 #include "util/atomic.h"
22 #include "util/concurrency.h"
23
24
25 class ExternalCacheManager : public CacheManager {
26 FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
27 friend class ExternalQuotaManager;
28
29 public:
30 static const unsigned kPbProtocolVersion = 1;
31 /**
32 * Used for race-free startup of an external cache plugin.
33 */
34 class PluginHandle {
35 friend class ExternalCacheManager;
36
37 public:
38 PluginHandle() : fd_connection_(-1) { }
39 bool IsValid() const { return fd_connection_ >= 0; }
40 int fd_connection() const { return fd_connection_; }
41 std::string error_msg() const { return error_msg_; }
42
43 private:
44 /**
45 * The connected file descriptor to pass to Create()
46 */
47 int fd_connection_;
48
49 std::string error_msg_;
50 };
51
52 static PluginHandle *CreatePlugin(const std::string &locator,
53 const std::vector<std::string> &cmd_line);
54
55 static ExternalCacheManager *Create(int fd_connection,
56 unsigned max_open_fds,
57 const std::string &ident);
58 virtual ~ExternalCacheManager();
59
60 288 virtual CacheManagerIds id() { return kExternalCacheManager; }
61 virtual std::string Describe();
62 virtual bool AcquireQuotaManager(QuotaManager *quota_mgr);
63
64 virtual int Open(const LabeledObject &object);
65 virtual int64_t GetSize(int fd);
66 virtual int Close(int fd);
67 virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
68 virtual int Dup(int fd);
69 virtual int Readahead(int fd);
70
71 #ifdef __APPLE__
72 virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
73 #else
74 2416 virtual uint32_t SizeOfTxn() {
75 2416 return sizeof(Transaction) + max_object_size_;
76 }
77 #endif
78 virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
79 virtual void CtrlTxn(const Label &label, const int flags, void *txn);
80 virtual int64_t Write(const void *buf, uint64_t size, void *txn);
81 virtual int Reset(void *txn);
82 virtual int AbortTxn(void *txn);
83 virtual int OpenFromTxn(void *txn);
84 virtual int CommitTxn(void *txn);
85
86 virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
87 virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
88
89 virtual void Spawn();
90
91 104 int64_t session_id() const { return session_id_; }
92 196 uint32_t max_object_size() const { return max_object_size_; }
93 10 uint64_t capabilities() const { return capabilities_; }
94 48 pid_t pid_plugin() const { return pid_plugin_; }
95
96 protected:
97 virtual void *DoSaveState();
98 virtual int DoRestoreState(void *data);
99 virtual bool DoFreeState(void *data);
100
101 private:
102 /**
103 * The null hash (hashed output is all null bytes) serves as a marker for an
104 * invalid handle
105 */
106 static const shash::Any kInvalidHandle;
107 /**
108 * Objects cannot be larger than 512 kB. Keeps transaction memory consumption
109 * under control.
110 */
111 static const unsigned kMaxSupportedObjectSize = 512 * 1024;
112 /**
113 * Statistically, at least half of our objects should not be further chunked.
114 */
115 static const unsigned kMinSupportedObjectSize = 4 * 1024;
116
117 struct Transaction {
118 2320 explicit Transaction(const shash::Any &id)
119 2320 : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
120 2320 , buf_pos(0)
121 2320 , size(0)
122 2320 , expected_size(kSizeUnknown)
123 2320 , label()
124 2320 , open_fds(0)
125 2320 , flushed(false)
126 2320 , committed(false)
127 2320 , label_modified(false)
128 2320 , transaction_id(0)
129 2320 , id(id) { }
130
131 /**
132 * Allocated size is max_object_size_, allocated by the caller at the end
133 * of the transaction (Linux only).
134 */
135 unsigned char *buffer;
136 unsigned buf_pos;
137 uint64_t size;
138 uint64_t expected_size;
139 Label label;
140 int open_fds;
141 bool flushed;
142 bool committed;
143 bool label_modified;
144 uint64_t transaction_id;
145 shash::Any id;
146 }; // class Transaction
147
148 struct ReadOnlyHandle {
149 26804 ReadOnlyHandle() : id(kInvalidHandle) { }
150 13778 explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
151 39652 bool operator==(const ReadOnlyHandle &other) const {
152 39652 return this->id == other.id;
153 }
154 54446 bool operator!=(const ReadOnlyHandle &other) const {
155 54446 return this->id != other.id;
156 }
157 shash::Any id;
158 }; // class ReadOnlyHandle
159
160 class RpcJob {
161 public:
162 29576 explicit RpcJob(cvmfs::MsgRefcountReq *msg)
163 29576 : req_id_(msg->req_id())
164 29576 , part_nr_(0)
165 29576 , msg_req_(msg)
166
1/2
✓ Branch 2 taken 29576 times.
✗ Branch 3 not taken.
29576 , frame_send_(msg) { }
167 792 explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
168 792 : req_id_(msg->req_id())
169 792 , part_nr_(0)
170 792 , msg_req_(msg)
171
1/2
✓ Branch 2 taken 792 times.
✗ Branch 3 not taken.
792 , frame_send_(msg) { }
172 106832 explicit RpcJob(cvmfs::MsgReadReq *msg)
173 106832 : req_id_(msg->req_id())
174 106832 , part_nr_(0)
175 106832 , msg_req_(msg)
176
1/2
✓ Branch 2 taken 106832 times.
✗ Branch 3 not taken.
106832 , frame_send_(msg) { }
177 31598 explicit RpcJob(cvmfs::MsgStoreReq *msg)
178 31598 : req_id_(msg->req_id())
179 31598 , part_nr_(msg->part_nr())
180 31598 , msg_req_(msg)
181
1/2
✓ Branch 2 taken 31598 times.
✗ Branch 3 not taken.
31598 , frame_send_(msg) { }
182 48 explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
183 48 : req_id_(msg->req_id())
184 48 , part_nr_(0)
185 48 , msg_req_(msg)
186
1/2
✓ Branch 2 taken 48 times.
✗ Branch 3 not taken.
48 , frame_send_(msg) { }
187 216 explicit RpcJob(cvmfs::MsgInfoReq *msg)
188 216 : req_id_(msg->req_id())
189 216 , part_nr_(0)
190 216 , msg_req_(msg)
191
1/2
✓ Branch 2 taken 216 times.
✗ Branch 3 not taken.
216 , frame_send_(msg) { }
192 104 explicit RpcJob(cvmfs::MsgShrinkReq *msg)
193 104 : req_id_(msg->req_id())
194 104 , part_nr_(0)
195 104 , msg_req_(msg)
196
1/2
✓ Branch 2 taken 104 times.
✗ Branch 3 not taken.
104 , frame_send_(msg) { }
197 488 explicit RpcJob(cvmfs::MsgListReq *msg)
198 488 : req_id_(msg->req_id())
199 488 , part_nr_(0)
200 488 , msg_req_(msg)
201
1/2
✓ Branch 2 taken 488 times.
✗ Branch 3 not taken.
488 , frame_send_(msg) { }
202 100 explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
203 100 : req_id_(msg->req_id())
204 100 , part_nr_(0)
205 100 , msg_req_(msg)
206
1/2
✓ Branch 2 taken 100 times.
✗ Branch 3 not taken.
100 , frame_send_(msg) { }
207 50 explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
208 50 : req_id_(msg->req_id())
209 50 , part_nr_(0)
210 50 , msg_req_(msg)
211
1/2
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
50 , frame_send_(msg) { }
212
213 31598 void set_attachment_send(void *data, unsigned size) {
214 31598 frame_send_.set_attachment(data, size);
215 31598 }
216
217 106832 void set_attachment_recv(void *data, unsigned size) {
218 106832 frame_recv_.set_attachment(data, size);
219 106832 }
220
221 google::protobuf::MessageLite *msg_req() { return msg_req_; }
222 // Type checking has been already performed
223 29576 cvmfs::MsgRefcountReply *msg_refcount_reply() {
224 cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
225 29576 frame_recv_.GetMsgTyped());
226
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 29576 times.
29576 assert(m->req_id() == req_id_);
227 29576 return m;
228 }
229 792 cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
230 cvmfs::MsgObjectInfoReply
231 *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
232 792 frame_recv_.GetMsgTyped());
233
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 792 times.
792 assert(m->req_id() == req_id_);
234 792 return m;
235 }
236 106832 cvmfs::MsgReadReply *msg_read_reply() {
237 cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
238 106832 frame_recv_.GetMsgTyped());
239
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 106832 times.
106832 assert(m->req_id() == req_id_);
240 106832 return m;
241 }
242 31646 cvmfs::MsgStoreReply *msg_store_reply() {
243 cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
244 31646 frame_recv_.GetMsgTyped());
245
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 31646 times.
31646 assert(m->req_id() == req_id_);
246
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 31646 times.
31646 assert(m->part_nr() == part_nr_);
247 31646 return m;
248 }
249 216 cvmfs::MsgInfoReply *msg_info_reply() {
250 cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
251 216 frame_recv_.GetMsgTyped());
252
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 216 times.
216 assert(m->req_id() == req_id_);
253 216 return m;
254 }
255 104 cvmfs::MsgShrinkReply *msg_shrink_reply() {
256 cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
257 104 frame_recv_.GetMsgTyped());
258
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 104 times.
104 assert(m->req_id() == req_id_);
259 104 return m;
260 }
261 488 cvmfs::MsgListReply *msg_list_reply() {
262 cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
263 488 frame_recv_.GetMsgTyped());
264
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 488 times.
488 assert(m->req_id() == req_id_);
265 488 return m;
266 }
267 150 cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
268 cvmfs::MsgBreadcrumbReply
269 *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
270 150 frame_recv_.GetMsgTyped());
271
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 150 times.
150 assert(m->req_id() == req_id_);
272 150 return m;
273 }
274
275 169804 CacheTransport::Frame *frame_send() { return &frame_send_; }
276 528476 CacheTransport::Frame *frame_recv() { return &frame_recv_; }
277 98400 uint64_t req_id() const { return req_id_; }
278 97344 uint64_t part_nr() const { return part_nr_; }
279
280 private:
281 uint64_t req_id_;
282 uint64_t part_nr_;
283 google::protobuf::MessageLite *msg_req_;
284 CacheTransport::Frame frame_send_;
285 CacheTransport::Frame frame_recv_;
286 }; // class RpcJob
287
288 struct RpcInFlight {
289 97344 RpcInFlight() : rpc_job(NULL), signal(NULL) { }
290 97344 RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
291
292 RpcJob *rpc_job;
293 Signal *signal;
294 };
295
296 static void *MainRead(void *data);
297 static int ConnectLocator(const std::string &locator, bool print_error);
298 static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
299
300 explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
301 140526 int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
302 void CallRemotely(RpcJob *rpc_job);
303 int ChangeRefcount(const shash::Any &id, int change_by);
304 int DoOpen(const shash::Any &id);
305 shash::Any GetHandle(int fd);
306 int Flush(bool do_commit, Transaction *transaction);
307
308 pid_t pid_plugin_;
309 FdTable<ReadOnlyHandle> fd_table_;
310 CacheTransport transport_;
311 int64_t session_id_;
312 uint32_t max_object_size_;
313 bool spawned_;
314 bool terminated_;
315 pthread_rwlock_t rwlock_fd_table_;
316 atomic_int64 next_request_id_;
317
318 /**
319 * Serialize concurrent write access to the session fd
320 */
321 pthread_mutex_t lock_send_fd_;
322 std::vector<RpcInFlight> inflight_rpcs_;
323 pthread_mutex_t lock_inflight_rpcs_;
324 pthread_t thread_read_;
325 uint64_t capabilities_;
326 }; // class ExternalCacheManager
327
328
329 class ExternalQuotaManager : public QuotaManager {
330 public:
331 static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr);
332 virtual bool HasCapability(Capabilities capability);
333
334 virtual void Insert(const shash::Any &hash, const uint64_t size,
335 const std::string &description) { }
336
337 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
338 const std::string &description) { }
339
340 virtual bool Pin(const shash::Any &hash, const uint64_t size,
341 const std::string &description, const bool is_catalog) {
342 return is_catalog;
343 }
344
345 virtual void Unpin(const shash::Any &hash) { }
346 virtual void Touch(const shash::Any &hash) { }
347 virtual void Remove(const shash::Any &file) { }
348 virtual bool Cleanup(const uint64_t leave_size);
349
350 virtual void RegisterBackChannel(int back_channel[2],
351 const std::string &channel_id);
352 virtual void UnregisterBackChannel(int back_channel[2],
353 const std::string &channel_id);
354
355 virtual std::vector<std::string> List();
356 virtual std::vector<std::string> ListPinned();
357 virtual std::vector<std::string> ListCatalogs();
358 virtual std::vector<std::string> ListVolatile();
359 virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
360 virtual uint64_t GetCapacity();
361 virtual uint64_t GetSize();
362 virtual uint64_t GetSizePinned();
363 virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT
364 virtual uint64_t GetCleanupRate(uint64_t period_s);
365
366 virtual void Spawn() { }
367 48 virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
368 virtual uint32_t GetProtocolRevision() { return 0; }
369
370 private:
371 struct QuotaInfo {
372 216 QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
373 uint64_t size;
374 uint64_t used;
375 uint64_t pinned;
376 uint64_t no_shrink;
377 };
378
379 888 explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr)
380 888 : cache_mgr_(cache_mgr) { }
381 int GetInfo(QuotaInfo *quota_info);
382 bool DoListing(cvmfs::EnumObjectType type,
383 std::vector<cvmfs::MsgListRecord> *result);
384
385 ExternalCacheManager *cache_mgr_;
386 };
387
388 #endif // CVMFS_CACHE_EXTERN_H_
389