GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.h
Date: 2025-06-29 02:35:41
Exec Total Coverage
Lines: 125 142 88.0%
Branches: 19 38 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10
11 #include <pthread.h>
12 #include <stdint.h>
13 #include <unistd.h>
14
15 #include <cassert>
16 #include <string>
17 #include <vector>
18
19 #include "cache.h"
20 #include "cache_transport.h"
21 #include "crypto/hash.h"
22 #include "fd_table.h"
23 #include "gtest/gtest_prod.h"
24 #include "quota.h"
25 #include "util/atomic.h"
26 #include "util/concurrency.h"
27 #include "util/single_copy.h"
28
29
30 class ExternalCacheManager : public CacheManager {
31 FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
32 friend class ExternalQuotaManager;
33
34 public:
35 static const unsigned kPbProtocolVersion = 1;
36 /**
37 * Used for race-free startup of an external cache plugin.
38 */
39 class PluginHandle {
40 friend class ExternalCacheManager;
41
42 public:
43 PluginHandle() : fd_connection_(-1) { }
44 bool IsValid() const { return fd_connection_ >= 0; }
45 int fd_connection() const { return fd_connection_; }
46 std::string error_msg() const { return error_msg_; }
47
48 private:
49 /**
50 * The connected file descriptor to pass to Create()
51 */
52 int fd_connection_;
53
54 std::string error_msg_;
55 };
56
57 static PluginHandle *CreatePlugin(const std::string &locator,
58 const std::vector<std::string> &cmd_line);
59
60 static ExternalCacheManager *Create(int fd_connection,
61 unsigned max_open_fds,
62 const std::string &ident);
63 virtual ~ExternalCacheManager();
64
65 84 virtual CacheManagerIds id() { return kExternalCacheManager; }
66 virtual std::string Describe();
67 virtual bool AcquireQuotaManager(QuotaManager *quota_mgr);
68
69 virtual int Open(const LabeledObject &object);
70 virtual int64_t GetSize(int fd);
71 virtual int Close(int fd);
72 virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
73 virtual int Dup(int fd);
74 virtual int Readahead(int fd);
75
76 #ifdef __APPLE__
77 virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
78 #else
79 2144 virtual uint32_t SizeOfTxn() {
80 2144 return sizeof(Transaction) + max_object_size_;
81 }
82 #endif
83 virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
84 virtual void CtrlTxn(const Label &label, const int flags, void *txn);
85 virtual int64_t Write(const void *buf, uint64_t size, void *txn);
86 virtual int Reset(void *txn);
87 virtual int AbortTxn(void *txn);
88 virtual int OpenFromTxn(void *txn);
89 virtual int CommitTxn(void *txn);
90
91 virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
92 virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
93
94 virtual void Spawn();
95
96 36 int64_t session_id() const { return session_id_; }
97 188 uint32_t max_object_size() const { return max_object_size_; }
98 10 uint64_t capabilities() const { return capabilities_; }
99 14 pid_t pid_plugin() const { return pid_plugin_; }
100
101 protected:
102 virtual void *DoSaveState();
103 virtual int DoRestoreState(void *data);
104 virtual bool DoFreeState(void *data);
105
106 private:
107 /**
108 * The null hash (hashed output is all null bytes) serves as a marker for an
109 * invalid handle
110 */
111 static const shash::Any kInvalidHandle;
112 /**
113 * Objects cannot be larger than 512 kB. Keeps transaction memory consumption
114 * under control.
115 */
116 static const unsigned kMaxSupportedObjectSize = 512 * 1024;
117 /**
118 * Statistically, at least half of our objects should not be further chunked.
119 */
120 static const unsigned kMinSupportedObjectSize = 4 * 1024;
121
122 struct Transaction {
123 2116 explicit Transaction(const shash::Any &id)
124 2116 : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
125 2116 , buf_pos(0)
126 2116 , size(0)
127 2116 , expected_size(kSizeUnknown)
128 2116 , label()
129 2116 , open_fds(0)
130 2116 , flushed(false)
131 2116 , committed(false)
132 2116 , label_modified(false)
133 2116 , transaction_id(0)
134 2116 , id(id) { }
135
136 /**
137 * Allocated size is max_object_size_, allocated by the caller at the end
138 * of the transaction (Linux only).
139 */
140 unsigned char *buffer;
141 unsigned buf_pos;
142 uint64_t size;
143 uint64_t expected_size;
144 Label label;
145 int open_fds;
146 bool flushed;
147 bool committed;
148 bool label_modified;
149 uint64_t transaction_id;
150 shash::Any id;
151 }; // class Transaction
152
153 struct ReadOnlyHandle {
154 8002 ReadOnlyHandle() : id(kInvalidHandle) { }
155 4190 explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
156 11908 bool operator==(const ReadOnlyHandle &other) const {
157 11908 return this->id == other.id;
158 }
159 16474 bool operator!=(const ReadOnlyHandle &other) const {
160 16474 return this->id != other.id;
161 }
162 shash::Any id;
163 }; // class ReadOnlyHandle
164
165 class RpcJob {
166 public:
167 10400 explicit RpcJob(cvmfs::MsgRefcountReq *msg)
168 10400 : req_id_(msg->req_id())
169 10400 , part_nr_(0)
170 10400 , msg_req_(msg)
171
1/2
✓ Branch 2 taken 10400 times.
✗ Branch 3 not taken.
10400 , frame_send_(msg) { }
172 234 explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
173 234 : req_id_(msg->req_id())
174 234 , part_nr_(0)
175 234 , msg_req_(msg)
176
1/2
✓ Branch 2 taken 248 times.
✗ Branch 3 not taken.
234 , frame_send_(msg) { }
177 31651 explicit RpcJob(cvmfs::MsgReadReq *msg)
178 31651 : req_id_(msg->req_id())
179 31651 , part_nr_(0)
180 31651 , msg_req_(msg)
181
1/2
✓ Branch 2 taken 31651 times.
✗ Branch 3 not taken.
31651 , frame_send_(msg) { }
182 10926 explicit RpcJob(cvmfs::MsgStoreReq *msg)
183 10926 : req_id_(msg->req_id())
184 10926 , part_nr_(msg->part_nr())
185 10926 , msg_req_(msg)
186
1/2
✓ Branch 2 taken 10926 times.
✗ Branch 3 not taken.
10926 , frame_send_(msg) { }
187 14 explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
188 14 : req_id_(msg->req_id())
189 14 , part_nr_(0)
190 14 , msg_req_(msg)
191
1/2
✓ Branch 2 taken 14 times.
✗ Branch 3 not taken.
14 , frame_send_(msg) { }
192 80 explicit RpcJob(cvmfs::MsgInfoReq *msg)
193 80 : req_id_(msg->req_id())
194 80 , part_nr_(0)
195 80 , msg_req_(msg)
196
1/2
✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
80 , frame_send_(msg) { }
197 36 explicit RpcJob(cvmfs::MsgShrinkReq *msg)
198 36 : req_id_(msg->req_id())
199 36 , part_nr_(0)
200 36 , msg_req_(msg)
201
1/2
✓ Branch 2 taken 36 times.
✗ Branch 3 not taken.
36 , frame_send_(msg) { }
202 148 explicit RpcJob(cvmfs::MsgListReq *msg)
203 148 : req_id_(msg->req_id())
204 148 , part_nr_(0)
205 148 , msg_req_(msg)
206
1/2
✓ Branch 2 taken 148 times.
✗ Branch 3 not taken.
148 , frame_send_(msg) { }
207 32 explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
208 32 : req_id_(msg->req_id())
209 32 , part_nr_(0)
210 32 , msg_req_(msg)
211
1/2
✓ Branch 2 taken 32 times.
✗ Branch 3 not taken.
32 , frame_send_(msg) { }
212 16 explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
213 16 : req_id_(msg->req_id())
214 16 , part_nr_(0)
215 16 , msg_req_(msg)
216
1/2
✓ Branch 2 taken 16 times.
✗ Branch 3 not taken.
16 , frame_send_(msg) { }
217
218 10926 void set_attachment_send(void *data, unsigned size) {
219 10926 frame_send_.set_attachment(data, size);
220 10926 }
221
222 31651 void set_attachment_recv(void *data, unsigned size) {
223 31651 frame_recv_.set_attachment(data, size);
224 31651 }
225
226 google::protobuf::MessageLite *msg_req() { return msg_req_; }
227 // Type checking has been already performed
228 10372 cvmfs::MsgRefcountReply *msg_refcount_reply() {
229 cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
230 10372 frame_recv_.GetMsgTyped());
231
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10386 times.
10386 assert(m->req_id() == req_id_);
232 10386 return m;
233 }
234 248 cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
235 cvmfs::MsgObjectInfoReply
236 *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
237 248 frame_recv_.GetMsgTyped());
238
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 248 times.
248 assert(m->req_id() == req_id_);
239 248 return m;
240 }
241 31651 cvmfs::MsgReadReply *msg_read_reply() {
242 cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
243 31651 frame_recv_.GetMsgTyped());
244
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 31651 times.
31651 assert(m->req_id() == req_id_);
245 31651 return m;
246 }
247 10940 cvmfs::MsgStoreReply *msg_store_reply() {
248 cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
249 10940 frame_recv_.GetMsgTyped());
250
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10940 times.
10940 assert(m->req_id() == req_id_);
251
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10940 times.
10940 assert(m->part_nr() == part_nr_);
252 10940 return m;
253 }
254 80 cvmfs::MsgInfoReply *msg_info_reply() {
255 cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
256 80 frame_recv_.GetMsgTyped());
257
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 80 times.
80 assert(m->req_id() == req_id_);
258 80 return m;
259 }
260 36 cvmfs::MsgShrinkReply *msg_shrink_reply() {
261 cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
262 36 frame_recv_.GetMsgTyped());
263
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 36 times.
36 assert(m->req_id() == req_id_);
264 36 return m;
265 }
266 148 cvmfs::MsgListReply *msg_list_reply() {
267 cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
268 148 frame_recv_.GetMsgTyped());
269
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 148 times.
148 assert(m->req_id() == req_id_);
270 148 return m;
271 }
272 48 cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
273 cvmfs::MsgBreadcrumbReply
274 *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
275 48 frame_recv_.GetMsgTyped());
276
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 48 times.
48 assert(m->req_id() == req_id_);
277 48 return m;
278 }
279
280 53551 CacheTransport::Frame *frame_send() { return &frame_send_; }
281 167191 CacheTransport::Frame *frame_recv() { return &frame_recv_; }
282 28588 uint64_t req_id() const { return req_id_; }
283 28392 uint64_t part_nr() const { return part_nr_; }
284
285 private:
286 uint64_t req_id_;
287 uint64_t part_nr_;
288 google::protobuf::MessageLite *msg_req_;
289 CacheTransport::Frame frame_send_;
290 CacheTransport::Frame frame_recv_;
291 }; // class RpcJob
292
293 struct RpcInFlight {
294 28392 RpcInFlight() : rpc_job(NULL), signal(NULL) { }
295 28392 RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
296
297 RpcJob *rpc_job;
298 Signal *signal;
299 };
300
301 static void *MainRead(void *data);
302 static int ConnectLocator(const std::string &locator, bool print_error);
303 static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
304
305 explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
306 44727 int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
307 void CallRemotely(RpcJob *rpc_job);
308 int ChangeRefcount(const shash::Any &id, int change_by);
309 int DoOpen(const shash::Any &id);
310 shash::Any GetHandle(int fd);
311 int Flush(bool do_commit, Transaction *transaction);
312
313 pid_t pid_plugin_;
314 FdTable<ReadOnlyHandle> fd_table_;
315 CacheTransport transport_;
316 int64_t session_id_;
317 uint32_t max_object_size_;
318 bool spawned_;
319 bool terminated_;
320 pthread_rwlock_t rwlock_fd_table_;
321 atomic_int64 next_request_id_;
322
323 /**
324 * Serialize concurrent write access to the session fd
325 */
326 pthread_mutex_t lock_send_fd_;
327 std::vector<RpcInFlight> inflight_rpcs_;
328 pthread_mutex_t lock_inflight_rpcs_;
329 pthread_t thread_read_;
330 uint64_t capabilities_;
331 }; // class ExternalCacheManager
332
333
334 class ExternalQuotaManager : public QuotaManager {
335 public:
336 static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr);
337 virtual bool HasCapability(Capabilities capability);
338
339 virtual void Insert(const shash::Any &hash, const uint64_t size,
340 const std::string &description) { }
341
342 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
343 const std::string &description) { }
344
345 virtual bool Pin(const shash::Any &hash, const uint64_t size,
346 const std::string &description, const bool is_catalog) {
347 return is_catalog;
348 }
349
350 virtual void Unpin(const shash::Any &hash) { }
351 virtual void Touch(const shash::Any &hash) { }
352 virtual void Remove(const shash::Any &file) { }
353 virtual bool Cleanup(const uint64_t leave_size);
354
355 virtual void RegisterBackChannel(int back_channel[2],
356 const std::string &channel_id);
357 virtual void UnregisterBackChannel(int back_channel[2],
358 const std::string &channel_id);
359
360 virtual std::vector<std::string> List();
361 virtual std::vector<std::string> ListPinned();
362 virtual std::vector<std::string> ListCatalogs();
363 virtual std::vector<std::string> ListVolatile();
364 virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
365 virtual uint64_t GetCapacity();
366 virtual uint64_t GetSize();
367 virtual uint64_t GetSizePinned();
368 virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT
369 virtual uint64_t GetCleanupRate(uint64_t period_s);
370
371 virtual void Spawn() { }
372 14 virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
373 virtual uint32_t GetProtocolRevision() { return 0; }
374
375 private:
376 struct QuotaInfo {
377 80 QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
378 uint64_t size;
379 uint64_t used;
380 uint64_t pinned;
381 uint64_t no_shrink;
382 };
383
384 276 explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr)
385 276 : cache_mgr_(cache_mgr) { }
386 int GetInfo(QuotaInfo *quota_info);
387 bool DoListing(cvmfs::EnumObjectType type,
388 std::vector<cvmfs::MsgListRecord> *result);
389
390 ExternalCacheManager *cache_mgr_;
391 };
392
393 #endif // CVMFS_CACHE_EXTERN_H_
394