GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 125 142 88.0%
Branches: 19 38 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10
11 #include <pthread.h>
12 #include <stdint.h>
13 #include <unistd.h>
14
15 #include <cassert>
16 #include <string>
17 #include <vector>
18
19 #include "cache.h"
20 #include "cache_transport.h"
21 #include "crypto/hash.h"
22 #include "fd_table.h"
23 #include "gtest/gtest_prod.h"
24 #include "quota.h"
25 #include "util/atomic.h"
26 #include "util/concurrency.h"
27 #include "util/single_copy.h"
28
29
30 class ExternalCacheManager : public CacheManager {
31 FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
32 friend class ExternalQuotaManager;
33
34 public:
35 static const unsigned kPbProtocolVersion = 1;
36 /**
37 * Used for race-free startup of an external cache plugin.
38 */
39 class PluginHandle {
40 friend class ExternalCacheManager;
41
42 public:
43 PluginHandle() : fd_connection_(-1) { }
44 bool IsValid() const { return fd_connection_ >= 0; }
45 int fd_connection() const { return fd_connection_; }
46 std::string error_msg() const { return error_msg_; }
47
48 private:
49 /**
50 * The connected file descriptor to pass to Create()
51 */
52 int fd_connection_;
53
54 std::string error_msg_;
55 };
56
57 static PluginHandle *CreatePlugin(const std::string &locator,
58 const std::vector<std::string> &cmd_line);
59
60 static ExternalCacheManager *Create(int fd_connection,
61 unsigned max_open_fds,
62 const std::string &ident);
63 virtual ~ExternalCacheManager();
64
65 90 virtual CacheManagerIds id() { return kExternalCacheManager; }
66 virtual std::string Describe();
67 virtual bool AcquireQuotaManager(QuotaManager *quota_mgr);
68
69 virtual int Open(const LabeledObject &object);
70 virtual int64_t GetSize(int fd);
71 virtual int Close(int fd);
72 virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
73 virtual int Dup(int fd);
74 virtual int Readahead(int fd);
75
76 #ifdef __APPLE__
77 virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
78 #else
79 2152 virtual uint32_t SizeOfTxn() {
80 2152 return sizeof(Transaction) + max_object_size_;
81 }
82 #endif
83 virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
84 virtual void CtrlTxn(const Label &label, const int flags, void *txn);
85 virtual int64_t Write(const void *buf, uint64_t size, void *txn);
86 virtual int Reset(void *txn);
87 virtual int AbortTxn(void *txn);
88 virtual int OpenFromTxn(void *txn);
89 virtual int CommitTxn(void *txn);
90
91 virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
92 virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
93
94 virtual void Spawn();
95
96 38 int64_t session_id() const { return session_id_; }
97 196 uint32_t max_object_size() const { return max_object_size_; }
98 10 uint64_t capabilities() const { return capabilities_; }
99 15 pid_t pid_plugin() const { return pid_plugin_; }
100
101 protected:
102 virtual void *DoSaveState();
103 virtual int DoRestoreState(void *data);
104 virtual bool DoFreeState(void *data);
105
106 private:
107 /**
108 * The null hash (hashed output is all null bytes) serves as a marker for an
109 * invalid handle
110 */
111 static const shash::Any kInvalidHandle;
112 /**
113 * Objects cannot be larger than 512 kB. Keeps transaction memory consumption
114 * under control.
115 */
116 static const unsigned kMaxSupportedObjectSize = 512 * 1024;
117 /**
118 * Statistically, at least half of our objects should not be further chunked.
119 */
120 static const unsigned kMinSupportedObjectSize = 4 * 1024;
121
122 struct Transaction {
123 2122 explicit Transaction(const shash::Any &id)
124 2122 : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
125 2122 , buf_pos(0)
126 2122 , size(0)
127 2122 , expected_size(kSizeUnknown)
128 2122 , label()
129 2122 , open_fds(0)
130 2122 , flushed(false)
131 2122 , committed(false)
132 2122 , label_modified(false)
133 2122 , transaction_id(0)
134 2122 , id(id) { }
135
136 /**
137 * Allocated size is max_object_size_, allocated by the caller at the end
138 * of the transaction (Linux only).
139 */
140 unsigned char *buffer;
141 unsigned buf_pos;
142 uint64_t size;
143 uint64_t expected_size;
144 Label label;
145 int open_fds;
146 bool flushed;
147 bool committed;
148 bool label_modified;
149 uint64_t transaction_id;
150 shash::Any id;
151 }; // class Transaction
152
153 struct ReadOnlyHandle {
154 8555 ReadOnlyHandle() : id(kInvalidHandle) { }
155 4472 explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
156 12724 bool operator==(const ReadOnlyHandle &other) const {
157 12724 return this->id == other.id;
158 }
159 17615 bool operator!=(const ReadOnlyHandle &other) const {
160 17615 return this->id != other.id;
161 }
162 shash::Any id;
163 }; // class ReadOnlyHandle
164
165 class RpcJob {
166 public:
167 10964 explicit RpcJob(cvmfs::MsgRefcountReq *msg)
168 10964 : req_id_(msg->req_id())
169 10964 , part_nr_(0)
170 10964 , msg_req_(msg)
171
1/2
✓ Branch 2 taken 10964 times.
✗ Branch 3 not taken.
10964 , frame_send_(msg) { }
172 264 explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
173 264 : req_id_(msg->req_id())
174 264 , part_nr_(0)
175 264 , msg_req_(msg)
176
1/2
✓ Branch 2 taken 264 times.
✗ Branch 3 not taken.
264 , frame_send_(msg) { }
177 33802 explicit RpcJob(cvmfs::MsgReadReq *msg)
178 33802 : req_id_(msg->req_id())
179 33832 , part_nr_(0)
180 33832 , msg_req_(msg)
181
1/2
✓ Branch 2 taken 33832 times.
✗ Branch 3 not taken.
33832 , frame_send_(msg) { }
182 11534 explicit RpcJob(cvmfs::MsgStoreReq *msg)
183 11534 : req_id_(msg->req_id())
184 11534 , part_nr_(msg->part_nr())
185 11534 , msg_req_(msg)
186
1/2
✓ Branch 2 taken 11534 times.
✗ Branch 3 not taken.
11534 , frame_send_(msg) { }
187 15 explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
188 15 : req_id_(msg->req_id())
189 15 , part_nr_(0)
190 15 , msg_req_(msg)
191
1/2
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
15 , frame_send_(msg) { }
192 84 explicit RpcJob(cvmfs::MsgInfoReq *msg)
193 84 : req_id_(msg->req_id())
194 84 , part_nr_(0)
195 84 , msg_req_(msg)
196
1/2
✓ Branch 2 taken 84 times.
✗ Branch 3 not taken.
84 , frame_send_(msg) { }
197 38 explicit RpcJob(cvmfs::MsgShrinkReq *msg)
198 38 : req_id_(msg->req_id())
199 38 , part_nr_(0)
200 38 , msg_req_(msg)
201
1/2
✓ Branch 2 taken 38 times.
✗ Branch 3 not taken.
38 , frame_send_(msg) { }
202 158 explicit RpcJob(cvmfs::MsgListReq *msg)
203 158 : req_id_(msg->req_id())
204 158 , part_nr_(0)
205 158 , msg_req_(msg)
206
1/2
✓ Branch 2 taken 158 times.
✗ Branch 3 not taken.
158 , frame_send_(msg) { }
207 34 explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
208 34 : req_id_(msg->req_id())
209 34 , part_nr_(0)
210 34 , msg_req_(msg)
211
1/2
✓ Branch 2 taken 34 times.
✗ Branch 3 not taken.
34 , frame_send_(msg) { }
212 17 explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
213 17 : req_id_(msg->req_id())
214 17 , part_nr_(0)
215 17 , msg_req_(msg)
216
1/2
✓ Branch 2 taken 17 times.
✗ Branch 3 not taken.
17 , frame_send_(msg) { }
217
218 11534 void set_attachment_send(void *data, unsigned size) {
219 11534 frame_send_.set_attachment(data, size);
220 11534 }
221
222 33832 void set_attachment_recv(void *data, unsigned size) {
223 33832 frame_recv_.set_attachment(data, size);
224 33832 }
225
226 google::protobuf::MessageLite *msg_req() { return msg_req_; }
227 // Type checking has been already performed
228 10964 cvmfs::MsgRefcountReply *msg_refcount_reply() {
229 cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
230 10964 frame_recv_.GetMsgTyped());
231
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10949 times.
10949 assert(m->req_id() == req_id_);
232 10949 return m;
233 }
234 264 cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
235 cvmfs::MsgObjectInfoReply
236 *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
237 264 frame_recv_.GetMsgTyped());
238
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 264 times.
264 assert(m->req_id() == req_id_);
239 264 return m;
240 }
241 33862 cvmfs::MsgReadReply *msg_read_reply() {
242 cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
243 33862 frame_recv_.GetMsgTyped());
244
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 33787 times.
33787 assert(m->req_id() == req_id_);
245 33787 return m;
246 }
247 11549 cvmfs::MsgStoreReply *msg_store_reply() {
248 cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
249 11549 frame_recv_.GetMsgTyped());
250
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 11549 times.
11549 assert(m->req_id() == req_id_);
251
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 11549 times.
11549 assert(m->part_nr() == part_nr_);
252 11549 return m;
253 }
254 84 cvmfs::MsgInfoReply *msg_info_reply() {
255 cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
256 84 frame_recv_.GetMsgTyped());
257
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 84 times.
84 assert(m->req_id() == req_id_);
258 84 return m;
259 }
260 38 cvmfs::MsgShrinkReply *msg_shrink_reply() {
261 cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
262 38 frame_recv_.GetMsgTyped());
263
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 38 times.
38 assert(m->req_id() == req_id_);
264 38 return m;
265 }
266 158 cvmfs::MsgListReply *msg_list_reply() {
267 cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
268 158 frame_recv_.GetMsgTyped());
269
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
158 assert(m->req_id() == req_id_);
270 158 return m;
271 }
272 51 cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
273 cvmfs::MsgBreadcrumbReply
274 *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
275 51 frame_recv_.GetMsgTyped());
276
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 51 times.
51 assert(m->req_id() == req_id_);
277 51 return m;
278 }
279
280 56970 CacheTransport::Frame *frame_send() { return &frame_send_; }
281 177621 CacheTransport::Frame *frame_recv() { return &frame_recv_; }
282 30480 uint64_t req_id() const { return req_id_; }
283 30420 uint64_t part_nr() const { return part_nr_; }
284
285 private:
286 uint64_t req_id_;
287 uint64_t part_nr_;
288 google::protobuf::MessageLite *msg_req_;
289 CacheTransport::Frame frame_send_;
290 CacheTransport::Frame frame_recv_;
291 }; // class RpcJob
292
293 struct RpcInFlight {
294 30420 RpcInFlight() : rpc_job(NULL), signal(NULL) { }
295 30420 RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
296
297 RpcJob *rpc_job;
298 Signal *signal;
299 };
300
301 static void *MainRead(void *data);
302 static int ConnectLocator(const std::string &locator, bool print_error);
303 static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
304
305 explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
306 47498 int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
307 void CallRemotely(RpcJob *rpc_job);
308 int ChangeRefcount(const shash::Any &id, int change_by);
309 int DoOpen(const shash::Any &id);
310 shash::Any GetHandle(int fd);
311 int Flush(bool do_commit, Transaction *transaction);
312
313 pid_t pid_plugin_;
314 FdTable<ReadOnlyHandle> fd_table_;
315 CacheTransport transport_;
316 int64_t session_id_;
317 uint32_t max_object_size_;
318 bool spawned_;
319 bool terminated_;
320 pthread_rwlock_t rwlock_fd_table_;
321 atomic_int64 next_request_id_;
322
323 /**
324 * Serialize concurrent write access to the session fd
325 */
326 pthread_mutex_t lock_send_fd_;
327 std::vector<RpcInFlight> inflight_rpcs_;
328 pthread_mutex_t lock_inflight_rpcs_;
329 pthread_t thread_read_;
330 uint64_t capabilities_;
331 }; // class ExternalCacheManager
332
333
334 class ExternalQuotaManager : public QuotaManager {
335 public:
336 static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr);
337 virtual bool HasCapability(Capabilities capability);
338
339 virtual void Insert(const shash::Any &hash, const uint64_t size,
340 const std::string &description) { }
341
342 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
343 const std::string &description) { }
344
345 virtual bool Pin(const shash::Any &hash, const uint64_t size,
346 const std::string &description, const bool is_catalog) {
347 return is_catalog;
348 }
349
350 virtual void Unpin(const shash::Any &hash) { }
351 virtual void Touch(const shash::Any &hash) { }
352 virtual void Remove(const shash::Any &file) { }
353 virtual bool Cleanup(const uint64_t leave_size);
354
355 virtual void RegisterBackChannel(int back_channel[2],
356 const std::string &channel_id);
357 virtual void UnregisterBackChannel(int back_channel[2],
358 const std::string &channel_id);
359
360 virtual std::vector<std::string> List();
361 virtual std::vector<std::string> ListPinned();
362 virtual std::vector<std::string> ListCatalogs();
363 virtual std::vector<std::string> ListVolatile();
364 virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
365 virtual uint64_t GetCapacity();
366 virtual uint64_t GetSize();
367 virtual uint64_t GetSizePinned();
368 virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT
369 virtual uint64_t GetCleanupRate(uint64_t period_s);
370
371 virtual void Spawn() { }
372 15 virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
373 virtual uint32_t GetProtocolRevision() { return 0; }
374
375 private:
376 struct QuotaInfo {
377 84 QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
378 uint64_t size;
379 uint64_t used;
380 uint64_t pinned;
381 uint64_t no_shrink;
382 };
383
384 294 explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr)
385 294 : cache_mgr_(cache_mgr) { }
386 int GetInfo(QuotaInfo *quota_info);
387 bool DoListing(cvmfs::EnumObjectType type,
388 std::vector<cvmfs::MsgListRecord> *result);
389
390 ExternalCacheManager *cache_mgr_;
391 };
392
393 #endif // CVMFS_CACHE_EXTERN_H_
394