| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/cache_extern.h |
| Date: | 2025-10-19 02:35:28 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 125 | 142 | 88.0% |
| Branches: | 19 | 38 | 50.0% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | #ifndef CVMFS_CACHE_EXTERN_H_ | ||
| 5 | #define CVMFS_CACHE_EXTERN_H_ | ||
| 6 | |||
| 7 | #ifndef __STDC_FORMAT_MACROS | ||
| 8 | #define __STDC_FORMAT_MACROS | ||
| 9 | #endif | ||
| 10 | |||
| 11 | #include <pthread.h> | ||
| 12 | #include <stdint.h> | ||
| 13 | #include <unistd.h> | ||
| 14 | |||
| 15 | #include <cassert> | ||
| 16 | #include <string> | ||
| 17 | #include <vector> | ||
| 18 | |||
| 19 | #include "cache.h" | ||
| 20 | #include "cache_transport.h" | ||
| 21 | #include "crypto/hash.h" | ||
| 22 | #include "fd_table.h" | ||
| 23 | #include "duplex_testing.h" | ||
| 24 | #include "quota.h" | ||
| 25 | #include "util/atomic.h" | ||
| 26 | #include "util/concurrency.h" | ||
| 27 | #include "util/single_copy.h" | ||
| 28 | |||
| 29 | |||
| 30 | class ExternalCacheManager : public CacheManager { | ||
| 31 | FRIEND_TEST(T_ExternalCacheManager, TransactionAbort); | ||
| 32 | friend class ExternalQuotaManager; | ||
| 33 | |||
| 34 | public: | ||
| 35 | static const unsigned kPbProtocolVersion = 1; | ||
| 36 | /** | ||
| 37 | * Used for race-free startup of an external cache plugin. | ||
| 38 | */ | ||
| 39 | class PluginHandle { | ||
| 40 | friend class ExternalCacheManager; | ||
| 41 | |||
| 42 | public: | ||
| 43 | ✗ | PluginHandle() : fd_connection_(-1) { } | |
| 44 | ✗ | bool IsValid() const { return fd_connection_ >= 0; } | |
| 45 | ✗ | int fd_connection() const { return fd_connection_; } | |
| 46 | ✗ | std::string error_msg() const { return error_msg_; } | |
| 47 | |||
| 48 | private: | ||
| 49 | /** | ||
| 50 | * The connected file descriptor to pass to Create() | ||
| 51 | */ | ||
| 52 | int fd_connection_; | ||
| 53 | |||
| 54 | std::string error_msg_; | ||
| 55 | }; | ||
| 56 | |||
| 57 | static PluginHandle *CreatePlugin(const std::string &locator, | ||
| 58 | const std::vector<std::string> &cmd_line); | ||
| 59 | |||
| 60 | static ExternalCacheManager *Create(int fd_connection, | ||
| 61 | unsigned max_open_fds, | ||
| 62 | const std::string &ident); | ||
| 63 | virtual ~ExternalCacheManager(); | ||
| 64 | |||
| 65 | 126 | virtual CacheManagerIds id() { return kExternalCacheManager; } | |
| 66 | virtual std::string Describe(); | ||
| 67 | virtual bool AcquireQuotaManager(QuotaManager *quota_mgr); | ||
| 68 | |||
| 69 | virtual int Open(const LabeledObject &object); | ||
| 70 | virtual int64_t GetSize(int fd); | ||
| 71 | virtual int Close(int fd); | ||
| 72 | virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset); | ||
| 73 | virtual int Dup(int fd); | ||
| 74 | virtual int Readahead(int fd); | ||
| 75 | |||
| 76 | #ifdef __APPLE__ | ||
| 77 | virtual uint32_t SizeOfTxn() { return sizeof(Transaction); } | ||
| 78 | #else | ||
| 79 | 2200 | virtual uint32_t SizeOfTxn() { | |
| 80 | 2200 | return sizeof(Transaction) + max_object_size_; | |
| 81 | } | ||
| 82 | #endif | ||
| 83 | virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn); | ||
| 84 | virtual void CtrlTxn(const Label &label, const int flags, void *txn); | ||
| 85 | virtual int64_t Write(const void *buf, uint64_t size, void *txn); | ||
| 86 | virtual int Reset(void *txn); | ||
| 87 | virtual int AbortTxn(void *txn); | ||
| 88 | virtual int OpenFromTxn(void *txn); | ||
| 89 | virtual int CommitTxn(void *txn); | ||
| 90 | |||
| 91 | virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn); | ||
| 92 | virtual bool StoreBreadcrumb(const manifest::Manifest &manifest); | ||
| 93 | |||
| 94 | virtual void Spawn(); | ||
| 95 | |||
| 96 | 50 | int64_t session_id() const { return session_id_; } | |
| 97 | 205 | uint32_t max_object_size() const { return max_object_size_; } | |
| 98 | 10 | uint64_t capabilities() const { return capabilities_; } | |
| 99 | 21 | pid_t pid_plugin() const { return pid_plugin_; } | |
| 100 | |||
| 101 | protected: | ||
| 102 | virtual void *DoSaveState(); | ||
| 103 | virtual int DoRestoreState(void *data); | ||
| 104 | virtual bool DoFreeState(void *data); | ||
| 105 | |||
| 106 | private: | ||
| 107 | /** | ||
| 108 | * The null hash (hashed output is all null bytes) serves as a marker for an | ||
| 109 | * invalid handle | ||
| 110 | */ | ||
| 111 | static const shash::Any kInvalidHandle; | ||
| 112 | /** | ||
| 113 | * Objects cannot be larger than 512 kB. Keeps transaction memory consumption | ||
| 114 | * under control. | ||
| 115 | */ | ||
| 116 | static const unsigned kMaxSupportedObjectSize = 512 * 1024; | ||
| 117 | /** | ||
| 118 | * Statistically, at least half of our objects should not be further chunked. | ||
| 119 | */ | ||
| 120 | static const unsigned kMinSupportedObjectSize = 4 * 1024; | ||
| 121 | |||
| 122 | struct Transaction { | ||
| 123 | 2158 | explicit Transaction(const shash::Any &id) | |
| 124 | 2158 | : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction)) | |
| 125 | 2158 | , buf_pos(0) | |
| 126 | 2158 | , size(0) | |
| 127 | 2158 | , expected_size(kSizeUnknown) | |
| 128 | 2158 | , label() | |
| 129 | 2158 | , open_fds(0) | |
| 130 | 2158 | , flushed(false) | |
| 131 | 2158 | , committed(false) | |
| 132 | 2158 | , label_modified(false) | |
| 133 | 2158 | , transaction_id(0) | |
| 134 | 2158 | , id(id) { } | |
| 135 | |||
| 136 | /** | ||
| 137 | * Allocated size is max_object_size_, allocated by the caller at the end | ||
| 138 | * of the transaction (Linux only). | ||
| 139 | */ | ||
| 140 | unsigned char *buffer; | ||
| 141 | unsigned buf_pos; | ||
| 142 | uint64_t size; | ||
| 143 | uint64_t expected_size; | ||
| 144 | Label label; | ||
| 145 | int open_fds; | ||
| 146 | bool flushed; | ||
| 147 | bool committed; | ||
| 148 | bool label_modified; | ||
| 149 | uint64_t transaction_id; | ||
| 150 | shash::Any id; | ||
| 151 | }; // class Transaction | ||
| 152 | |||
| 153 | struct ReadOnlyHandle { | ||
| 154 | 11873 | ReadOnlyHandle() : id(kInvalidHandle) { } | |
| 155 | 6164 | explicit ReadOnlyHandle(const shash::Any &h) : id(h) { } | |
| 156 | 17620 | bool operator==(const ReadOnlyHandle &other) const { | |
| 157 | 17620 | return this->id == other.id; | |
| 158 | } | ||
| 159 | 24359 | bool operator!=(const ReadOnlyHandle &other) const { | |
| 160 | 24359 | return this->id != other.id; | |
| 161 | } | ||
| 162 | shash::Any id; | ||
| 163 | }; // class ReadOnlyHandle | ||
| 164 | |||
| 165 | class RpcJob { | ||
| 166 | public: | ||
| 167 | 14348 | explicit RpcJob(cvmfs::MsgRefcountReq *msg) | |
| 168 | 14348 | : req_id_(msg->req_id()) | |
| 169 | 14348 | , part_nr_(0) | |
| 170 | 14348 | , msg_req_(msg) | |
| 171 |
1/2✓ Branch 2 taken 14348 times.
✗ Branch 3 not taken.
|
14348 | , frame_send_(msg) { } |
| 172 | 360 | explicit RpcJob(cvmfs::MsgObjectInfoReq *msg) | |
| 173 | 360 | : req_id_(msg->req_id()) | |
| 174 | 360 | , part_nr_(0) | |
| 175 | 360 | , msg_req_(msg) | |
| 176 |
1/2✓ Branch 2 taken 360 times.
✗ Branch 3 not taken.
|
360 | , frame_send_(msg) { } |
| 177 | 47142 | explicit RpcJob(cvmfs::MsgReadReq *msg) | |
| 178 | 47142 | : req_id_(msg->req_id()) | |
| 179 | 47142 | , part_nr_(0) | |
| 180 | 47142 | , msg_req_(msg) | |
| 181 |
1/2✓ Branch 2 taken 47142 times.
✗ Branch 3 not taken.
|
47142 | , frame_send_(msg) { } |
| 182 | 15182 | explicit RpcJob(cvmfs::MsgStoreReq *msg) | |
| 183 | 15182 | : req_id_(msg->req_id()) | |
| 184 | 15182 | , part_nr_(msg->part_nr()) | |
| 185 | 15182 | , msg_req_(msg) | |
| 186 |
1/2✓ Branch 2 taken 15182 times.
✗ Branch 3 not taken.
|
15182 | , frame_send_(msg) { } |
| 187 | 21 | explicit RpcJob(cvmfs::MsgStoreAbortReq *msg) | |
| 188 | 21 | : req_id_(msg->req_id()) | |
| 189 | 21 | , part_nr_(0) | |
| 190 | 21 | , msg_req_(msg) | |
| 191 |
1/2✓ Branch 2 taken 21 times.
✗ Branch 3 not taken.
|
21 | , frame_send_(msg) { } |
| 192 | 108 | explicit RpcJob(cvmfs::MsgInfoReq *msg) | |
| 193 | 108 | : req_id_(msg->req_id()) | |
| 194 | 108 | , part_nr_(0) | |
| 195 | 108 | , msg_req_(msg) | |
| 196 |
1/2✓ Branch 2 taken 108 times.
✗ Branch 3 not taken.
|
108 | , frame_send_(msg) { } |
| 197 | 50 | explicit RpcJob(cvmfs::MsgShrinkReq *msg) | |
| 198 | 50 | : req_id_(msg->req_id()) | |
| 199 | 50 | , part_nr_(0) | |
| 200 | 50 | , msg_req_(msg) | |
| 201 |
1/2✓ Branch 2 taken 50 times.
✗ Branch 3 not taken.
|
50 | , frame_send_(msg) { } |
| 202 | 218 | explicit RpcJob(cvmfs::MsgListReq *msg) | |
| 203 | 218 | : req_id_(msg->req_id()) | |
| 204 | 218 | , part_nr_(0) | |
| 205 | 218 | , msg_req_(msg) | |
| 206 |
1/2✓ Branch 2 taken 218 times.
✗ Branch 3 not taken.
|
218 | , frame_send_(msg) { } |
| 207 | 46 | explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg) | |
| 208 | 46 | : req_id_(msg->req_id()) | |
| 209 | 46 | , part_nr_(0) | |
| 210 | 46 | , msg_req_(msg) | |
| 211 |
1/2✓ Branch 2 taken 46 times.
✗ Branch 3 not taken.
|
46 | , frame_send_(msg) { } |
| 212 | 23 | explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg) | |
| 213 | 23 | : req_id_(msg->req_id()) | |
| 214 | 23 | , part_nr_(0) | |
| 215 | 23 | , msg_req_(msg) | |
| 216 |
1/2✓ Branch 2 taken 23 times.
✗ Branch 3 not taken.
|
23 | , frame_send_(msg) { } |
| 217 | |||
| 218 | 15182 | void set_attachment_send(void *data, unsigned size) { | |
| 219 | 15182 | frame_send_.set_attachment(data, size); | |
| 220 | 15182 | } | |
| 221 | |||
| 222 | 47142 | void set_attachment_recv(void *data, unsigned size) { | |
| 223 | 47142 | frame_recv_.set_attachment(data, size); | |
| 224 | 47142 | } | |
| 225 | |||
| 226 | google::protobuf::MessageLite *msg_req() { return msg_req_; } | ||
| 227 | // Type checking has been already performed | ||
| 228 | 14348 | cvmfs::MsgRefcountReply *msg_refcount_reply() { | |
| 229 | cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>( | ||
| 230 | 14348 | frame_recv_.GetMsgTyped()); | |
| 231 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 14348 times.
|
14348 | assert(m->req_id() == req_id_); |
| 232 | 14348 | return m; | |
| 233 | } | ||
| 234 | 360 | cvmfs::MsgObjectInfoReply *msg_object_info_reply() { | |
| 235 | cvmfs::MsgObjectInfoReply | ||
| 236 | *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>( | ||
| 237 | 360 | frame_recv_.GetMsgTyped()); | |
| 238 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 360 times.
|
360 | assert(m->req_id() == req_id_); |
| 239 | 360 | return m; | |
| 240 | } | ||
| 241 | 47142 | cvmfs::MsgReadReply *msg_read_reply() { | |
| 242 | cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>( | ||
| 243 | 47142 | frame_recv_.GetMsgTyped()); | |
| 244 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 47142 times.
|
47142 | assert(m->req_id() == req_id_); |
| 245 | 47142 | return m; | |
| 246 | } | ||
| 247 | 15203 | cvmfs::MsgStoreReply *msg_store_reply() { | |
| 248 | cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>( | ||
| 249 | 15203 | frame_recv_.GetMsgTyped()); | |
| 250 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 15203 times.
|
15203 | assert(m->req_id() == req_id_); |
| 251 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 15203 times.
|
15203 | assert(m->part_nr() == part_nr_); |
| 252 | 15203 | return m; | |
| 253 | } | ||
| 254 | 108 | cvmfs::MsgInfoReply *msg_info_reply() { | |
| 255 | cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>( | ||
| 256 | 108 | frame_recv_.GetMsgTyped()); | |
| 257 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 108 times.
|
108 | assert(m->req_id() == req_id_); |
| 258 | 108 | return m; | |
| 259 | } | ||
| 260 | 50 | cvmfs::MsgShrinkReply *msg_shrink_reply() { | |
| 261 | cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>( | ||
| 262 | 50 | frame_recv_.GetMsgTyped()); | |
| 263 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 50 times.
|
50 | assert(m->req_id() == req_id_); |
| 264 | 50 | return m; | |
| 265 | } | ||
| 266 | 218 | cvmfs::MsgListReply *msg_list_reply() { | |
| 267 | cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>( | ||
| 268 | 218 | frame_recv_.GetMsgTyped()); | |
| 269 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 218 times.
|
218 | assert(m->req_id() == req_id_); |
| 270 | 218 | return m; | |
| 271 | } | ||
| 272 | 69 | cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() { | |
| 273 | cvmfs::MsgBreadcrumbReply | ||
| 274 | *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>( | ||
| 275 | 69 | frame_recv_.GetMsgTyped()); | |
| 276 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 69 times.
|
69 | assert(m->req_id() == req_id_); |
| 277 | 69 | return m; | |
| 278 | } | ||
| 279 | |||
| 280 | 77498 | CacheTransport::Frame *frame_send() { return &frame_send_; } | |
| 281 | 241636 | CacheTransport::Frame *frame_recv() { return &frame_recv_; } | |
| 282 | 42819 | uint64_t req_id() const { return req_id_; } | |
| 283 | 42588 | uint64_t part_nr() const { return part_nr_; } | |
| 284 | |||
| 285 | private: | ||
| 286 | uint64_t req_id_; | ||
| 287 | uint64_t part_nr_; | ||
| 288 | google::protobuf::MessageLite *msg_req_; | ||
| 289 | CacheTransport::Frame frame_send_; | ||
| 290 | CacheTransport::Frame frame_recv_; | ||
| 291 | }; // class RpcJob | ||
| 292 | |||
| 293 | struct RpcInFlight { | ||
| 294 | 42588 | RpcInFlight() : rpc_job(NULL), signal(NULL) { } | |
| 295 | 42588 | RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { } | |
| 296 | |||
| 297 | RpcJob *rpc_job; | ||
| 298 | Signal *signal; | ||
| 299 | }; | ||
| 300 | |||
| 301 | static void *MainRead(void *data); | ||
| 302 | static int ConnectLocator(const std::string &locator, bool print_error); | ||
| 303 | static bool SpawnPlugin(const std::vector<std::string> &cmd_line); | ||
| 304 | |||
| 305 | explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds); | ||
| 306 | 64474 | int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); } | |
| 307 | void CallRemotely(RpcJob *rpc_job); | ||
| 308 | int ChangeRefcount(const shash::Any &id, int change_by); | ||
| 309 | int DoOpen(const shash::Any &id); | ||
| 310 | shash::Any GetHandle(int fd); | ||
| 311 | int Flush(bool do_commit, Transaction *transaction); | ||
| 312 | |||
| 313 | pid_t pid_plugin_; | ||
| 314 | FdTable<ReadOnlyHandle> fd_table_; | ||
| 315 | CacheTransport transport_; | ||
| 316 | int64_t session_id_; | ||
| 317 | uint32_t max_object_size_; | ||
| 318 | bool spawned_; | ||
| 319 | bool terminated_; | ||
| 320 | pthread_rwlock_t rwlock_fd_table_; | ||
| 321 | atomic_int64 next_request_id_; | ||
| 322 | |||
| 323 | /** | ||
| 324 | * Serialize concurrent write access to the session fd | ||
| 325 | */ | ||
| 326 | pthread_mutex_t lock_send_fd_; | ||
| 327 | std::vector<RpcInFlight> inflight_rpcs_; | ||
| 328 | pthread_mutex_t lock_inflight_rpcs_; | ||
| 329 | pthread_t thread_read_; | ||
| 330 | uint64_t capabilities_; | ||
| 331 | }; // class ExternalCacheManager | ||
| 332 | |||
| 333 | |||
| 334 | class ExternalQuotaManager : public QuotaManager { | ||
| 335 | public: | ||
| 336 | static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr); | ||
| 337 | virtual bool HasCapability(Capabilities capability); | ||
| 338 | |||
| 339 | ✗ | virtual void Insert(const shash::Any &hash, const uint64_t size, | |
| 340 | ✗ | const std::string &description) { } | |
| 341 | |||
| 342 | ✗ | virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, | |
| 343 | ✗ | const std::string &description) { } | |
| 344 | |||
| 345 | ✗ | virtual bool Pin(const shash::Any &hash, const uint64_t size, | |
| 346 | const std::string &description, const bool is_catalog) { | ||
| 347 | ✗ | return is_catalog; | |
| 348 | } | ||
| 349 | |||
| 350 | ✗ | virtual void Unpin(const shash::Any &hash) { } | |
| 351 | ✗ | virtual void Touch(const shash::Any &hash) { } | |
| 352 | ✗ | virtual void Remove(const shash::Any &file) { } | |
| 353 | virtual bool Cleanup(const uint64_t leave_size); | ||
| 354 | |||
| 355 | virtual void RegisterBackChannel(int back_channel[2], | ||
| 356 | const std::string &channel_id); | ||
| 357 | virtual void UnregisterBackChannel(int back_channel[2], | ||
| 358 | const std::string &channel_id); | ||
| 359 | |||
| 360 | virtual std::vector<std::string> List(); | ||
| 361 | virtual std::vector<std::string> ListPinned(); | ||
| 362 | virtual std::vector<std::string> ListCatalogs(); | ||
| 363 | virtual std::vector<std::string> ListVolatile(); | ||
| 364 | ✗ | virtual uint64_t GetMaxFileSize() { return uint64_t(-1); } | |
| 365 | virtual uint64_t GetCapacity(); | ||
| 366 | virtual uint64_t GetSize(); | ||
| 367 | virtual uint64_t GetSizePinned(); | ||
| 368 | ✗ | virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT | |
| 369 | virtual uint64_t GetCleanupRate(uint64_t period_s); | ||
| 370 | |||
| 371 | ✗ | virtual void Spawn() { } | |
| 372 | 21 | virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); } | |
| 373 | ✗ | virtual uint32_t GetProtocolRevision() { return 0; } | |
| 374 | |||
| 375 | private: | ||
| 376 | struct QuotaInfo { | ||
| 377 | 108 | QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { } | |
| 378 | uint64_t size; | ||
| 379 | uint64_t used; | ||
| 380 | uint64_t pinned; | ||
| 381 | uint64_t no_shrink; | ||
| 382 | }; | ||
| 383 | |||
| 384 | 402 | explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr) | |
| 385 | 402 | : cache_mgr_(cache_mgr) { } | |
| 386 | int GetInfo(QuotaInfo *quota_info); | ||
| 387 | bool DoListing(cvmfs::EnumObjectType type, | ||
| 388 | std::vector<cvmfs::MsgListRecord> *result); | ||
| 389 | |||
| 390 | ExternalCacheManager *cache_mgr_; | ||
| 391 | }; | ||
| 392 | |||
| 393 | #endif // CVMFS_CACHE_EXTERN_H_ | ||
| 394 |