GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_extern.h
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 98 114 86.0%
Branches: 19 38 50.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10
11 #include <pthread.h>
12 #include <stdint.h>
13 #include <unistd.h>
14
15 #include <cassert>
16 #include <string>
17 #include <vector>
18
19 #include "cache.h"
20 #include "cache_transport.h"
21 #include "crypto/hash.h"
22 #include "fd_table.h"
23 #include "gtest/gtest_prod.h"
24 #include "quota.h"
25 #include "util/atomic.h"
26 #include "util/concurrency.h"
27 #include "util/single_copy.h"
28
29
30 class ExternalCacheManager : public CacheManager {
31 FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
32 friend class ExternalQuotaManager;
33
34 public:
35 static const unsigned kPbProtocolVersion = 1;
36 /**
37 * Used for race-free startup of an external cache plugin.
38 */
39 class PluginHandle {
40 friend class ExternalCacheManager;
41 public:
42 PluginHandle() : fd_connection_(-1) { }
43 bool IsValid() const { return fd_connection_ >= 0; }
44 int fd_connection() const { return fd_connection_; }
45 std::string error_msg() const { return error_msg_; }
46
47 private:
48 /**
49 * The connected file descriptor to pass to Create()
50 */
51 int fd_connection_;
52
53 std::string error_msg_;
54 };
55
56 static PluginHandle *CreatePlugin(
57 const std::string &locator,
58 const std::vector<std::string> &cmd_line);
59
60 static ExternalCacheManager *Create(int fd_connection,
61 unsigned max_open_fds,
62 const std::string &ident);
63 virtual ~ExternalCacheManager();
64
65 6 virtual CacheManagerIds id() { return kExternalCacheManager; }
66 virtual std::string Describe();
67 virtual bool AcquireQuotaManager(QuotaManager *quota_mgr);
68
69 virtual int Open(const LabeledObject &object);
70 virtual int64_t GetSize(int fd);
71 virtual int Close(int fd);
72 virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
73 virtual int Dup(int fd);
74 virtual int Readahead(int fd);
75
76 #ifdef __APPLE__
77 virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
78 #else
79 2040 virtual uint32_t SizeOfTxn() {
80 2040 return sizeof(Transaction) + max_object_size_;
81 }
82 #endif
83 virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
84 virtual void CtrlTxn(const Label &label,
85 const int flags,
86 void *txn);
87 virtual int64_t Write(const void *buf, uint64_t size, void *txn);
88 virtual int Reset(void *txn);
89 virtual int AbortTxn(void *txn);
90 virtual int OpenFromTxn(void *txn);
91 virtual int CommitTxn(void *txn);
92
93 virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
94 virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
95
96 virtual void Spawn();
97
98 10 int64_t session_id() const { return session_id_; }
99 193 uint32_t max_object_size() const { return max_object_size_; }
100 10 uint64_t capabilities() const { return capabilities_; }
101 1 pid_t pid_plugin() const { return pid_plugin_; }
102
103 protected:
104 virtual void *DoSaveState();
105 virtual int DoRestoreState(void *data);
106 virtual bool DoFreeState(void *data);
107
108 private:
109 /**
110 * The null hash (hashed output is all null bytes) serves as a marker for an
111 * invalid handle
112 */
113 static const shash::Any kInvalidHandle;
114 /**
115 * Objects cannot be larger than 512 kB. Keeps transaction memory consumption
116 * under control.
117 */
118 static const unsigned kMaxSupportedObjectSize = 512 * 1024;
119 /**
120 * Statistically, at least half of our objects should not be further chunked.
121 */
122 static const unsigned kMinSupportedObjectSize = 4 * 1024;
123
124 struct Transaction {
125 2038 explicit Transaction(const shash::Any &id)
126 2038 : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
127 2038 , buf_pos(0)
128 2038 , size(0)
129 2038 , expected_size(kSizeUnknown)
130 2038 , label()
131 2038 , open_fds(0)
132 2038 , flushed(false)
133 2038 , committed(false)
134 2038 , label_modified(false)
135 2038 , transaction_id(0)
136 2038 , id(id)
137 2038 { }
138
139 /**
140 * Allocated size is max_object_size_, allocated by the caller at the end
141 * of the transaction (Linux only).
142 */
143 unsigned char *buffer;
144 unsigned buf_pos;
145 uint64_t size;
146 uint64_t expected_size;
147 Label label;
148 int open_fds;
149 bool flushed;
150 bool committed;
151 bool label_modified;
152 uint64_t transaction_id;
153 shash::Any id;
154 }; // class Transaction
155
156 struct ReadOnlyHandle {
157 813 ReadOnlyHandle() : id(kInvalidHandle) { }
158 524 explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
159 1300 bool operator ==(const ReadOnlyHandle &other) const {
160 1300 return this->id == other.id;
161 }
162 1942 bool operator !=(const ReadOnlyHandle &other) const {
163 1942 return this->id != other.id;
164 }
165 shash::Any id;
166 }; // class ReadOnlyHandle
167
168 class RpcJob {
169 public:
170 3068 explicit RpcJob(cvmfs::MsgRefcountReq *msg)
171
1/2
✓ Branch 3 taken 3068 times.
✗ Branch 4 not taken.
3068 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
172 38 explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
173
1/2
✓ Branch 3 taken 37 times.
✗ Branch 4 not taken.
38 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
174 2907 explicit RpcJob(cvmfs::MsgReadReq *msg)
175
1/2
✓ Branch 3 taken 2908 times.
✗ Branch 4 not taken.
2907 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
176 3022 explicit RpcJob(cvmfs::MsgStoreReq *msg)
177 3022 : req_id_(msg->req_id()), part_nr_(msg->part_nr()), msg_req_(msg),
178
1/2
✓ Branch 2 taken 3022 times.
✗ Branch 3 not taken.
3022 frame_send_(msg) { }
179 1 explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
180 1 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg),
181
1/2
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
1 frame_send_(msg) { }
182 28 explicit RpcJob(cvmfs::MsgInfoReq *msg)
183
1/2
✓ Branch 3 taken 28 times.
✗ Branch 4 not taken.
28 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
184 10 explicit RpcJob(cvmfs::MsgShrinkReq *msg)
185
1/2
✓ Branch 3 taken 10 times.
✗ Branch 4 not taken.
10 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
186 18 explicit RpcJob(cvmfs::MsgListReq *msg)
187
1/2
✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
18 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
188 6 explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
189
1/2
✓ Branch 3 taken 6 times.
✗ Branch 4 not taken.
6 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
190 3 explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
191
1/2
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
3 : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
192
193 3022 void set_attachment_send(void *data, unsigned size) {
194 3022 frame_send_.set_attachment(data, size);
195 3022 }
196
197 2908 void set_attachment_recv(void *data, unsigned size) {
198 2908 frame_recv_.set_attachment(data, size);
199 2908 }
200
201 google::protobuf::MessageLite *msg_req() { return msg_req_; }
202 // Type checking has been already performed
203 3068 cvmfs::MsgRefcountReply *msg_refcount_reply() {
204 cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
205 3068 frame_recv_.GetMsgTyped());
206
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3064 times.
3064 assert(m->req_id() == req_id_);
207 3064 return m;
208 }
209 40 cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
210 cvmfs::MsgObjectInfoReply *m =
211 reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
212 40 frame_recv_.GetMsgTyped());
213
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 39 times.
39 assert(m->req_id() == req_id_);
214 39 return m;
215 }
216 2908 cvmfs::MsgReadReply *msg_read_reply() {
217 cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
218 2908 frame_recv_.GetMsgTyped());
219
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2908 times.
2908 assert(m->req_id() == req_id_);
220 2908 return m;
221 }
222 3023 cvmfs::MsgStoreReply *msg_store_reply() {
223 cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
224 3023 frame_recv_.GetMsgTyped());
225
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3023 times.
3023 assert(m->req_id() == req_id_);
226
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3023 times.
3023 assert(m->part_nr() == part_nr_);
227 3023 return m;
228 }
229 28 cvmfs::MsgInfoReply *msg_info_reply() {
230 cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
231 28 frame_recv_.GetMsgTyped());
232
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 28 times.
28 assert(m->req_id() == req_id_);
233 28 return m;
234 }
235 10 cvmfs::MsgShrinkReply *msg_shrink_reply() {
236 cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
237 10 frame_recv_.GetMsgTyped());
238
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10 times.
10 assert(m->req_id() == req_id_);
239 10 return m;
240 }
241 18 cvmfs::MsgListReply *msg_list_reply() {
242 cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
243 18 frame_recv_.GetMsgTyped());
244
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18 times.
18 assert(m->req_id() == req_id_);
245 18 return m;
246 }
247 9 cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
248 cvmfs::MsgBreadcrumbReply *m =
249 reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
250 9 frame_recv_.GetMsgTyped());
251
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
9 assert(m->req_id() == req_id_);
252 9 return m;
253 }
254
255 9104 CacheTransport::Frame *frame_send() { return &frame_send_; }
256 29066 CacheTransport::Frame *frame_recv() { return &frame_recv_; }
257 2040 uint64_t req_id() const { return req_id_; }
258 2028 uint64_t part_nr() const { return part_nr_; }
259
260 private:
261 uint64_t req_id_;
262 uint64_t part_nr_;
263 google::protobuf::MessageLite *msg_req_;
264 CacheTransport::Frame frame_send_;
265 CacheTransport::Frame frame_recv_;
266 }; // class RpcJob
267
268 struct RpcInFlight {
269 2028 RpcInFlight() : rpc_job(NULL), signal(NULL) { }
270 2028 RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
271
272 RpcJob *rpc_job;
273 Signal *signal;
274 };
275
276 static void *MainRead(void *data);
277 static int ConnectLocator(const std::string &locator, bool print_error);
278 static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
279
280 explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
281 8117 int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
282 void CallRemotely(RpcJob *rpc_job);
283 int ChangeRefcount(const shash::Any &id, int change_by);
284 int DoOpen(const shash::Any &id);
285 shash::Any GetHandle(int fd);
286 int Flush(bool do_commit, Transaction *transaction);
287
288 pid_t pid_plugin_;
289 FdTable<ReadOnlyHandle> fd_table_;
290 CacheTransport transport_;
291 int64_t session_id_;
292 uint32_t max_object_size_;
293 bool spawned_;
294 bool terminated_;
295 pthread_rwlock_t rwlock_fd_table_;
296 atomic_int64 next_request_id_;
297
298 /**
299 * Serialize concurrent write access to the session fd
300 */
301 pthread_mutex_t lock_send_fd_;
302 std::vector<RpcInFlight> inflight_rpcs_;
303 pthread_mutex_t lock_inflight_rpcs_;
304 pthread_t thread_read_;
305 uint64_t capabilities_;
306 }; // class ExternalCacheManager
307
308
309 class ExternalQuotaManager : public QuotaManager {
310 public:
311 static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr);
312 virtual bool HasCapability(Capabilities capability);
313
314 virtual void Insert(const shash::Any &hash, const uint64_t size,
315 const std::string &description)
316 { }
317
318 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
319 const std::string &description)
320 { }
321
322 virtual bool Pin(const shash::Any &hash, const uint64_t size,
323 const std::string &description, const bool is_catalog)
324 { return is_catalog; }
325
326 virtual void Unpin(const shash::Any &hash) { }
327 virtual void Touch(const shash::Any &hash) { }
328 virtual void Remove(const shash::Any &file) { }
329 virtual bool Cleanup(const uint64_t leave_size);
330
331 virtual void RegisterBackChannel(int back_channel[2],
332 const std::string &channel_id);
333 virtual void UnregisterBackChannel(int back_channel[2],
334 const std::string &channel_id);
335
336 virtual std::vector<std::string> List();
337 virtual std::vector<std::string> ListPinned();
338 virtual std::vector<std::string> ListCatalogs();
339 virtual std::vector<std::string> ListVolatile();
340 virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
341 virtual uint64_t GetCapacity();
342 virtual uint64_t GetSize();
343 virtual uint64_t GetSizePinned();
344 virtual uint64_t GetCleanupRate(uint64_t period_s);
345
346 virtual void Spawn() { }
347 1 virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
348 virtual uint32_t GetProtocolRevision() { return 0; }
349
350 private:
351 struct QuotaInfo {
352 28 QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
353 uint64_t size;
354 uint64_t used;
355 uint64_t pinned;
356 uint64_t no_shrink;
357 };
358
359 42 explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr)
360 42 : cache_mgr_(cache_mgr) { }
361 int GetInfo(QuotaInfo *quota_info);
362 bool DoListing(cvmfs::EnumObjectType type,
363 std::vector<cvmfs::MsgListRecord> *result);
364
365 ExternalCacheManager *cache_mgr_;
366 };
367
368 #endif // CVMFS_CACHE_EXTERN_H_
369