GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.h
Date: 2025-10-19 02:35:28
Exec Total Coverage
Lines: 125 142 88.0%
Branches: 19 38 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10
11 #include <pthread.h>
12 #include <stdint.h>
13 #include <unistd.h>
14
15 #include <cassert>
16 #include <string>
17 #include <vector>
18
19 #include "cache.h"
20 #include "cache_transport.h"
21 #include "crypto/hash.h"
22 #include "fd_table.h"
23 #include "duplex_testing.h"
24 #include "quota.h"
25 #include "util/atomic.h"
26 #include "util/concurrency.h"
27 #include "util/single_copy.h"
28
29
30 class ExternalCacheManager : public CacheManager {
31 FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
32 friend class ExternalQuotaManager;
33
34 public:
35 static const unsigned kPbProtocolVersion = 1;
36 /**
37 * Used for race-free startup of an external cache plugin.
38 */
39 class PluginHandle {
40 friend class ExternalCacheManager;
41
42 public:
43 PluginHandle() : fd_connection_(-1) { }
44 bool IsValid() const { return fd_connection_ >= 0; }
45 int fd_connection() const { return fd_connection_; }
46 std::string error_msg() const { return error_msg_; }
47
48 private:
49 /**
50 * The connected file descriptor to pass to Create()
51 */
52 int fd_connection_;
53
54 std::string error_msg_;
55 };
56
57 static PluginHandle *CreatePlugin(const std::string &locator,
58 const std::vector<std::string> &cmd_line);
59
60 static ExternalCacheManager *Create(int fd_connection,
61 unsigned max_open_fds,
62 const std::string &ident);
63 virtual ~ExternalCacheManager();
64
65 126 virtual CacheManagerIds id() { return kExternalCacheManager; }
66 virtual std::string Describe();
67 virtual bool AcquireQuotaManager(QuotaManager *quota_mgr);
68
69 virtual int Open(const LabeledObject &object);
70 virtual int64_t GetSize(int fd);
71 virtual int Close(int fd);
72 virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
73 virtual int Dup(int fd);
74 virtual int Readahead(int fd);
75
76 #ifdef __APPLE__
77 virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
78 #else
79 2200 virtual uint32_t SizeOfTxn() {
80 2200 return sizeof(Transaction) + max_object_size_;
81 }
82 #endif
83 virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
84 virtual void CtrlTxn(const Label &label, const int flags, void *txn);
85 virtual int64_t Write(const void *buf, uint64_t size, void *txn);
86 virtual int Reset(void *txn);
87 virtual int AbortTxn(void *txn);
88 virtual int OpenFromTxn(void *txn);
89 virtual int CommitTxn(void *txn);
90
91 virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
92 virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
93
94 virtual void Spawn();
95
96 50 int64_t session_id() const { return session_id_; }
97 205 uint32_t max_object_size() const { return max_object_size_; }
98 10 uint64_t capabilities() const { return capabilities_; }
99 21 pid_t pid_plugin() const { return pid_plugin_; }
100
101 protected:
102 virtual void *DoSaveState();
103 virtual int DoRestoreState(void *data);
104 virtual bool DoFreeState(void *data);
105
106 private:
107 /**
108 * The null hash (hashed output is all null bytes) serves as a marker for an
109 * invalid handle
110 */
111 static const shash::Any kInvalidHandle;
112 /**
113 * Objects cannot be larger than 512 kB. Keeps transaction memory consumption
114 * under control.
115 */
116 static const unsigned kMaxSupportedObjectSize = 512 * 1024;
117 /**
118 * Statistically, at least half of our objects should not be further chunked.
119 */
120 static const unsigned kMinSupportedObjectSize = 4 * 1024;
121
122 struct Transaction {
123 2158 explicit Transaction(const shash::Any &id)
124 2158 : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
125 2158 , buf_pos(0)
126 2158 , size(0)
127 2158 , expected_size(kSizeUnknown)
128 2158 , label()
129 2158 , open_fds(0)
130 2158 , flushed(false)
131 2158 , committed(false)
132 2158 , label_modified(false)
133 2158 , transaction_id(0)
134 2158 , id(id) { }
135
136 /**
137 * Allocated size is max_object_size_, allocated by the caller at the end
138 * of the transaction (Linux only).
139 */
140 unsigned char *buffer;
141 unsigned buf_pos;
142 uint64_t size;
143 uint64_t expected_size;
144 Label label;
145 int open_fds;
146 bool flushed;
147 bool committed;
148 bool label_modified;
149 uint64_t transaction_id;
150 shash::Any id;
151 }; // class Transaction
152
153 struct ReadOnlyHandle {
154 11873 ReadOnlyHandle() : id(kInvalidHandle) { }
155 6164 explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
156 17620 bool operator==(const ReadOnlyHandle &other) const {
157 17620 return this->id == other.id;
158 }
159 24359 bool operator!=(const ReadOnlyHandle &other) const {
160 24359 return this->id != other.id;
161 }
162 shash::Any id;
163 }; // class ReadOnlyHandle
164
165 class RpcJob {
166 public:
167 14348 explicit RpcJob(cvmfs::MsgRefcountReq *msg)
168 14348 : req_id_(msg->req_id())
169 14348 , part_nr_(0)
170 14348 , msg_req_(msg)
171
1/2
✓ Branch 2 taken 14348 times.
✗ Branch 3 not taken.
14348 , frame_send_(msg) { }
172 360 explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
173 360 : req_id_(msg->req_id())
174 360 , part_nr_(0)
175 360 , msg_req_(msg)
176
1/2
✓ Branch 2 taken 360 times.
✗ Branch 3 not taken.
360 , frame_send_(msg) { }
177 47142 explicit RpcJob(cvmfs::MsgReadReq *msg)
178 47142 : req_id_(msg->req_id())
179 47142 , part_nr_(0)
180 47142 , msg_req_(msg)
181
1/2
✓ Branch 2 taken 47142 times.
✗ Branch 3 not taken.
47142 , frame_send_(msg) { }
182 15182 explicit RpcJob(cvmfs::MsgStoreReq *msg)
183 15182 : req_id_(msg->req_id())
184 15182 , part_nr_(msg->part_nr())
185 15182 , msg_req_(msg)
186
1/2
✓ Branch 2 taken 15182 times.
✗ Branch 3 not taken.
15182 , frame_send_(msg) { }
187 21 explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
188 21 : req_id_(msg->req_id())
189 21 , part_nr_(0)
190 21 , msg_req_(msg)
191
1/2
✓ Branch 2 taken 21 times.
✗ Branch 3 not taken.
21 , frame_send_(msg) { }
192 108 explicit RpcJob(cvmfs::MsgInfoReq *msg)
193 108 : req_id_(msg->req_id())
194 108 , part_nr_(0)
195 108 , msg_req_(msg)
196
1/2
✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
108 , frame_send_(msg) { }
197 50 explicit RpcJob(cvmfs::MsgShrinkReq *msg)
198 50 : req_id_(msg->req_id())
199 50 , part_nr_(0)
200 50 , msg_req_(msg)
201
1/2
✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
50 , frame_send_(msg) { }
202 218 explicit RpcJob(cvmfs::MsgListReq *msg)
203 218 : req_id_(msg->req_id())
204 218 , part_nr_(0)
205 218 , msg_req_(msg)
206
1/2
✓ Branch 2 taken 218 times.
✗ Branch 3 not taken.
218 , frame_send_(msg) { }
207 46 explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
208 46 : req_id_(msg->req_id())
209 46 , part_nr_(0)
210 46 , msg_req_(msg)
211
1/2
✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
46 , frame_send_(msg) { }
212 23 explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
213 23 : req_id_(msg->req_id())
214 23 , part_nr_(0)
215 23 , msg_req_(msg)
216
1/2
✓ Branch 2 taken 23 times.
✗ Branch 3 not taken.
23 , frame_send_(msg) { }
217
218 15182 void set_attachment_send(void *data, unsigned size) {
219 15182 frame_send_.set_attachment(data, size);
220 15182 }
221
222 47142 void set_attachment_recv(void *data, unsigned size) {
223 47142 frame_recv_.set_attachment(data, size);
224 47142 }
225
226 google::protobuf::MessageLite *msg_req() { return msg_req_; }
227 // Type checking has been already performed
228 14348 cvmfs::MsgRefcountReply *msg_refcount_reply() {
229 cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
230 14348 frame_recv_.GetMsgTyped());
231
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 14348 times.
14348 assert(m->req_id() == req_id_);
232 14348 return m;
233 }
234 360 cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
235 cvmfs::MsgObjectInfoReply
236 *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
237 360 frame_recv_.GetMsgTyped());
238
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 360 times.
360 assert(m->req_id() == req_id_);
239 360 return m;
240 }
241 47142 cvmfs::MsgReadReply *msg_read_reply() {
242 cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
243 47142 frame_recv_.GetMsgTyped());
244
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 47142 times.
47142 assert(m->req_id() == req_id_);
245 47142 return m;
246 }
247 15203 cvmfs::MsgStoreReply *msg_store_reply() {
248 cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
249 15203 frame_recv_.GetMsgTyped());
250
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 15203 times.
15203 assert(m->req_id() == req_id_);
251
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 15203 times.
15203 assert(m->part_nr() == part_nr_);
252 15203 return m;
253 }
254 108 cvmfs::MsgInfoReply *msg_info_reply() {
255 cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
256 108 frame_recv_.GetMsgTyped());
257
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 108 times.
108 assert(m->req_id() == req_id_);
258 108 return m;
259 }
260 50 cvmfs::MsgShrinkReply *msg_shrink_reply() {
261 cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
262 50 frame_recv_.GetMsgTyped());
263
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 50 times.
50 assert(m->req_id() == req_id_);
264 50 return m;
265 }
266 218 cvmfs::MsgListReply *msg_list_reply() {
267 cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
268 218 frame_recv_.GetMsgTyped());
269
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 218 times.
218 assert(m->req_id() == req_id_);
270 218 return m;
271 }
272 69 cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
273 cvmfs::MsgBreadcrumbReply
274 *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
275 69 frame_recv_.GetMsgTyped());
276
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 69 times.
69 assert(m->req_id() == req_id_);
277 69 return m;
278 }
279
280 77498 CacheTransport::Frame *frame_send() { return &frame_send_; }
281 241636 CacheTransport::Frame *frame_recv() { return &frame_recv_; }
282 42819 uint64_t req_id() const { return req_id_; }
283 42588 uint64_t part_nr() const { return part_nr_; }
284
285 private:
286 uint64_t req_id_;
287 uint64_t part_nr_;
288 google::protobuf::MessageLite *msg_req_;
289 CacheTransport::Frame frame_send_;
290 CacheTransport::Frame frame_recv_;
291 }; // class RpcJob
292
293 struct RpcInFlight {
294 42588 RpcInFlight() : rpc_job(NULL), signal(NULL) { }
295 42588 RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
296
297 RpcJob *rpc_job;
298 Signal *signal;
299 };
300
301 static void *MainRead(void *data);
302 static int ConnectLocator(const std::string &locator, bool print_error);
303 static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
304
305 explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
306 64474 int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
307 void CallRemotely(RpcJob *rpc_job);
308 int ChangeRefcount(const shash::Any &id, int change_by);
309 int DoOpen(const shash::Any &id);
310 shash::Any GetHandle(int fd);
311 int Flush(bool do_commit, Transaction *transaction);
312
313 pid_t pid_plugin_;
314 FdTable<ReadOnlyHandle> fd_table_;
315 CacheTransport transport_;
316 int64_t session_id_;
317 uint32_t max_object_size_;
318 bool spawned_;
319 bool terminated_;
320 pthread_rwlock_t rwlock_fd_table_;
321 atomic_int64 next_request_id_;
322
323 /**
324 * Serialize concurrent write access to the session fd
325 */
326 pthread_mutex_t lock_send_fd_;
327 std::vector<RpcInFlight> inflight_rpcs_;
328 pthread_mutex_t lock_inflight_rpcs_;
329 pthread_t thread_read_;
330 uint64_t capabilities_;
331 }; // class ExternalCacheManager
332
333
334 class ExternalQuotaManager : public QuotaManager {
335 public:
336 static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr);
337 virtual bool HasCapability(Capabilities capability);
338
339 virtual void Insert(const shash::Any &hash, const uint64_t size,
340 const std::string &description) { }
341
342 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
343 const std::string &description) { }
344
345 virtual bool Pin(const shash::Any &hash, const uint64_t size,
346 const std::string &description, const bool is_catalog) {
347 return is_catalog;
348 }
349
350 virtual void Unpin(const shash::Any &hash) { }
351 virtual void Touch(const shash::Any &hash) { }
352 virtual void Remove(const shash::Any &file) { }
353 virtual bool Cleanup(const uint64_t leave_size);
354
355 virtual void RegisterBackChannel(int back_channel[2],
356 const std::string &channel_id);
357 virtual void UnregisterBackChannel(int back_channel[2],
358 const std::string &channel_id);
359
360 virtual std::vector<std::string> List();
361 virtual std::vector<std::string> ListPinned();
362 virtual std::vector<std::string> ListCatalogs();
363 virtual std::vector<std::string> ListVolatile();
364 virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
365 virtual uint64_t GetCapacity();
366 virtual uint64_t GetSize();
367 virtual uint64_t GetSizePinned();
368 virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT
369 virtual uint64_t GetCleanupRate(uint64_t period_s);
370
371 virtual void Spawn() { }
372 21 virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
373 virtual uint32_t GetProtocolRevision() { return 0; }
374
375 private:
376 struct QuotaInfo {
377 108 QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
378 uint64_t size;
379 uint64_t used;
380 uint64_t pinned;
381 uint64_t no_shrink;
382 };
383
384 402 explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr)
385 402 : cache_mgr_(cache_mgr) { }
386 int GetInfo(QuotaInfo *quota_info);
387 bool DoListing(cvmfs::EnumObjectType type,
388 std::vector<cvmfs::MsgListRecord> *result);
389
390 ExternalCacheManager *cache_mgr_;
391 };
392
393 #endif // CVMFS_CACHE_EXTERN_H_
394