GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.h
Date: 2026-04-19 02:41:37
Exec Total Coverage
Lines: 125 142 88.0%
Branches: 19 38 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10
11 #include <pthread.h>
12 #include <stdint.h>
13 #include <unistd.h>
14
15 #include <cassert>
16 #include <string>
17 #include <vector>
18
19 #include "cache.h"
20 #include "cache_transport.h"
21 #include "crypto/hash.h"
22 #include "fd_table.h"
23 #include "duplex_testing.h"
24 #include "quota.h"
25 #include "util/atomic.h"
26 #include "util/concurrency.h"
27
28
29 class ExternalCacheManager : public CacheManager {
30 FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
31 friend class ExternalQuotaManager;
32
33 public:
34 static const unsigned kPbProtocolVersion = 1;
35 /**
36 * Used for race-free startup of an external cache plugin.
37 */
38 class PluginHandle {
39 friend class ExternalCacheManager;
40
41 public:
42 PluginHandle() : fd_connection_(-1) { }
43 bool IsValid() const { return fd_connection_ >= 0; }
44 int fd_connection() const { return fd_connection_; }
45 std::string error_msg() const { return error_msg_; }
46
47 private:
48 /**
49 * The connected file descriptor to pass to Create()
50 */
51 int fd_connection_;
52
53 std::string error_msg_;
54 };
55
56 static PluginHandle *CreatePlugin(const std::string &locator,
57 const std::vector<std::string> &cmd_line);
58
59 static ExternalCacheManager *Create(int fd_connection,
60 unsigned max_open_fds,
61 const std::string &ident);
62 virtual ~ExternalCacheManager();
63
64 30 virtual CacheManagerIds id() { return kExternalCacheManager; }
65 virtual std::string Describe();
66 virtual bool AcquireQuotaManager(QuotaManager *quota_mgr);
67
68 virtual int Open(const LabeledObject &object);
69 virtual int64_t GetSize(int fd);
70 virtual int Close(int fd);
71 virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
72 virtual int Dup(int fd);
73 virtual int Readahead(int fd);
74
75 #ifdef __APPLE__
76 virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
77 #else
78 2072 virtual uint32_t SizeOfTxn() {
79 2072 return sizeof(Transaction) + max_object_size_;
80 }
81 #endif
82 virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
83 virtual void CtrlTxn(const Label &label, const int flags, void *txn);
84 virtual int64_t Write(const void *buf, uint64_t size, void *txn);
85 virtual int Reset(void *txn);
86 virtual int AbortTxn(void *txn);
87 virtual int OpenFromTxn(void *txn);
88 virtual int CommitTxn(void *txn);
89
90 virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
91 virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
92
93 virtual void Spawn();
94
95 18 int64_t session_id() const { return session_id_; }
96 196 uint32_t max_object_size() const { return max_object_size_; }
97 10 uint64_t capabilities() const { return capabilities_; }
98 5 pid_t pid_plugin() const { return pid_plugin_; }
99
100 protected:
101 virtual void *DoSaveState();
102 virtual int DoRestoreState(void *data);
103 virtual bool DoFreeState(void *data);
104
105 private:
106 /**
107 * The null hash (hashed output is all null bytes) serves as a marker for an
108 * invalid handle
109 */
110 static const shash::Any kInvalidHandle;
111 /**
112 * Objects cannot be larger than 512 kB. Keeps transaction memory consumption
113 * under control.
114 */
115 static const unsigned kMaxSupportedObjectSize = 512 * 1024;
116 /**
117 * Statistically, at least half of our objects should not be further chunked.
118 */
119 static const unsigned kMinSupportedObjectSize = 4 * 1024;
120
121 struct Transaction {
122 2062 explicit Transaction(const shash::Any &id)
123 2062 : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
124 2062 , buf_pos(0)
125 2062 , size(0)
126 2062 , expected_size(kSizeUnknown)
127 2062 , label()
128 2062 , open_fds(0)
129 2062 , flushed(false)
130 2062 , committed(false)
131 2062 , label_modified(false)
132 2062 , transaction_id(0)
133 2062 , id(id) { }
134
135 /**
136 * Allocated size is max_object_size_, allocated by the caller at the end
137 * of the transaction (Linux only).
138 */
139 unsigned char *buffer;
140 unsigned buf_pos;
141 uint64_t size;
142 uint64_t expected_size;
143 Label label;
144 int open_fds;
145 bool flushed;
146 bool committed;
147 bool label_modified;
148 uint64_t transaction_id;
149 shash::Any id;
150 }; // class Transaction
151
152 struct ReadOnlyHandle {
153 3025 ReadOnlyHandle() : id(kInvalidHandle) { }
154 1652 explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
155 4564 bool operator==(const ReadOnlyHandle &other) const {
156 4564 return this->id == other.id;
157 }
158 6425 bool operator!=(const ReadOnlyHandle &other) const {
159 6425 return this->id != other.id;
160 }
161 shash::Any id;
162 }; // class ReadOnlyHandle
163
164 class RpcJob {
165 public:
166 5324 explicit RpcJob(cvmfs::MsgRefcountReq *msg)
167 5324 : req_id_(msg->req_id())
168 5324 , part_nr_(0)
169 5324 , msg_req_(msg)
170
1/2
✓ Branch 2 taken 5324 times.
✗ Branch 3 not taken.
5324 , frame_send_(msg) { }
171 104 explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
172 104 : req_id_(msg->req_id())
173 104 , part_nr_(0)
174 104 , msg_req_(msg)
175
1/2
✓ Branch 2 taken 104 times.
✗ Branch 3 not taken.
104 , frame_send_(msg) { }
176 11751 explicit RpcJob(cvmfs::MsgReadReq *msg)
177 11751 : req_id_(msg->req_id())
178 11751 , part_nr_(0)
179 11751 , msg_req_(msg)
180
1/2
✓ Branch 2 taken 11751 times.
✗ Branch 3 not taken.
11751 , frame_send_(msg) { }
181 5454 explicit RpcJob(cvmfs::MsgStoreReq *msg)
182 5454 : req_id_(msg->req_id())
183 5454 , part_nr_(msg->part_nr())
184 5454 , msg_req_(msg)
185
1/2
✓ Branch 2 taken 5454 times.
✗ Branch 3 not taken.
5454 , frame_send_(msg) { }
186 5 explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
187 5 : req_id_(msg->req_id())
188 5 , part_nr_(0)
189 5 , msg_req_(msg)
190
1/2
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
5 , frame_send_(msg) { }
191 44 explicit RpcJob(cvmfs::MsgInfoReq *msg)
192 44 : req_id_(msg->req_id())
193 44 , part_nr_(0)
194 44 , msg_req_(msg)
195
1/2
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
44 , frame_send_(msg) { }
196 18 explicit RpcJob(cvmfs::MsgShrinkReq *msg)
197 18 : req_id_(msg->req_id())
198 18 , part_nr_(0)
199 18 , msg_req_(msg)
200
1/2
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
18 , frame_send_(msg) { }
201 58 explicit RpcJob(cvmfs::MsgListReq *msg)
202 58 : req_id_(msg->req_id())
203 58 , part_nr_(0)
204 58 , msg_req_(msg)
205
1/2
✓ Branch 2 taken 58 times.
✗ Branch 3 not taken.
58 , frame_send_(msg) { }
206 14 explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
207 14 : req_id_(msg->req_id())
208 14 , part_nr_(0)
209 14 , msg_req_(msg)
210
1/2
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
14 , frame_send_(msg) { }
211 7 explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
212 7 : req_id_(msg->req_id())
213 7 , part_nr_(0)
214 7 , msg_req_(msg)
215
1/2
✓ Branch 2 taken 7 times.
✗ Branch 3 not taken.
7 , frame_send_(msg) { }
216
217 5454 void set_attachment_send(void *data, unsigned size) {
218 5454 frame_send_.set_attachment(data, size);
219 5454 }
220
221 11751 void set_attachment_recv(void *data, unsigned size) {
222 11751 frame_recv_.set_attachment(data, size);
223 11751 }
224
225 google::protobuf::MessageLite *msg_req() { return msg_req_; }
226 // Type checking has been already performed
227 5324 cvmfs::MsgRefcountReply *msg_refcount_reply() {
228 cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
229 5324 frame_recv_.GetMsgTyped());
230
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5324 times.
5324 assert(m->req_id() == req_id_);
231 5324 return m;
232 }
233 99 cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
234 cvmfs::MsgObjectInfoReply
235 *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
236 99 frame_recv_.GetMsgTyped());
237
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 104 times.
104 assert(m->req_id() == req_id_);
238 104 return m;
239 }
240 11751 cvmfs::MsgReadReply *msg_read_reply() {
241 cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
242 11751 frame_recv_.GetMsgTyped());
243
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 11751 times.
11751 assert(m->req_id() == req_id_);
244 11751 return m;
245 }
246 5459 cvmfs::MsgStoreReply *msg_store_reply() {
247 cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
248 5459 frame_recv_.GetMsgTyped());
249
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5459 times.
5459 assert(m->req_id() == req_id_);
250
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5459 times.
5459 assert(m->part_nr() == part_nr_);
251 5459 return m;
252 }
253 44 cvmfs::MsgInfoReply *msg_info_reply() {
254 cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
255 44 frame_recv_.GetMsgTyped());
256
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 44 times.
44 assert(m->req_id() == req_id_);
257 44 return m;
258 }
259 18 cvmfs::MsgShrinkReply *msg_shrink_reply() {
260 cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
261 18 frame_recv_.GetMsgTyped());
262
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18 times.
18 assert(m->req_id() == req_id_);
263 18 return m;
264 }
265 58 cvmfs::MsgListReply *msg_list_reply() {
266 cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
267 58 frame_recv_.GetMsgTyped());
268
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 58 times.
58 assert(m->req_id() == req_id_);
269 58 return m;
270 }
271 21 cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
272 cvmfs::MsgBreadcrumbReply
273 *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
274 21 frame_recv_.GetMsgTyped());
275
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 21 times.
21 assert(m->req_id() == req_id_);
276 21 return m;
277 }
278
279 22779 CacheTransport::Frame *frame_send() { return &frame_send_; }
280 71561 CacheTransport::Frame *frame_recv() { return &frame_recv_; }
281 10175 uint64_t req_id() const { return req_id_; }
282 10140 uint64_t part_nr() const { return part_nr_; }
283
284 private:
285 uint64_t req_id_;
286 uint64_t part_nr_;
287 google::protobuf::MessageLite *msg_req_;
288 CacheTransport::Frame frame_send_;
289 CacheTransport::Frame frame_recv_;
290 }; // class RpcJob
291
292 struct RpcInFlight {
293 10140 RpcInFlight() : rpc_job(NULL), signal(NULL) { }
294 10140 RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
295
296 RpcJob *rpc_job;
297 Signal *signal;
298 };
299
300 static void *MainRead(void *data);
301 static int ConnectLocator(const std::string &locator, bool print_error);
302 static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
303
304 explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
305 19387 int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
306 void CallRemotely(RpcJob *rpc_job);
307 int ChangeRefcount(const shash::Any &id, int change_by);
308 int DoOpen(const shash::Any &id);
309 shash::Any GetHandle(int fd);
310 int Flush(bool do_commit, Transaction *transaction);
311
312 pid_t pid_plugin_;
313 FdTable<ReadOnlyHandle> fd_table_;
314 CacheTransport transport_;
315 int64_t session_id_;
316 uint32_t max_object_size_;
317 bool spawned_;
318 bool terminated_;
319 pthread_rwlock_t rwlock_fd_table_;
320 atomic_int64 next_request_id_;
321
322 /**
323 * Serialize concurrent write access to the session fd
324 */
325 pthread_mutex_t lock_send_fd_;
326 std::vector<RpcInFlight> inflight_rpcs_;
327 pthread_mutex_t lock_inflight_rpcs_;
328 pthread_t thread_read_;
329 uint64_t capabilities_;
330 }; // class ExternalCacheManager
331
332
333 class ExternalQuotaManager : public QuotaManager {
334 public:
335 static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr);
336 virtual bool HasCapability(Capabilities capability);
337
338 virtual void Insert(const shash::Any &hash, const uint64_t size,
339 const std::string &description) { }
340
341 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
342 const std::string &description) { }
343
344 virtual bool Pin(const shash::Any &hash, const uint64_t size,
345 const std::string &description, const bool is_catalog) {
346 return is_catalog;
347 }
348
349 virtual void Unpin(const shash::Any &hash) { }
350 virtual void Touch(const shash::Any &hash) { }
351 virtual void Remove(const shash::Any &file) { }
352 virtual bool Cleanup(const uint64_t leave_size);
353
354 virtual void RegisterBackChannel(int back_channel[2],
355 const std::string &channel_id);
356 virtual void UnregisterBackChannel(int back_channel[2],
357 const std::string &channel_id);
358
359 virtual std::vector<std::string> List();
360 virtual std::vector<std::string> ListPinned();
361 virtual std::vector<std::string> ListCatalogs();
362 virtual std::vector<std::string> ListVolatile();
363 virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
364 virtual uint64_t GetCapacity();
365 virtual uint64_t GetSize();
366 virtual uint64_t GetSizePinned();
367 virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT
368 virtual uint64_t GetCleanupRate(uint64_t period_s);
369
370 virtual void Spawn() { }
371 5 virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
372 virtual uint32_t GetProtocolRevision() { return 0; }
373
374 private:
375 struct QuotaInfo {
376 44 QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
377 uint64_t size;
378 uint64_t used;
379 uint64_t pinned;
380 uint64_t no_shrink;
381 };
382
383 114 explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr)
384 114 : cache_mgr_(cache_mgr) { }
385 int GetInfo(QuotaInfo *quota_info);
386 bool DoListing(cvmfs::EnumObjectType type,
387 std::vector<cvmfs::MsgListRecord> *result);
388
389 ExternalCacheManager *cache_mgr_;
390 };
391
392 #endif // CVMFS_CACHE_EXTERN_H_
393