CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_extern.h
Go to the documentation of this file.
1 
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6 
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10 
11 #include <pthread.h>
12 #include <stdint.h>
13 #include <unistd.h>
14 
15 #include <cassert>
16 #include <string>
17 #include <vector>
18 
19 #include "cache.h"
20 #include "cache_transport.h"
21 #include "crypto/hash.h"
22 #include "fd_table.h"
23 #include "gtest/gtest_prod.h"
24 #include "quota.h"
25 #include "util/atomic.h"
26 #include "util/concurrency.h"
27 #include "util/single_copy.h"
28 
29 
31  FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
32  friend class ExternalQuotaManager;
33 
34  public:
35  static const unsigned kPbProtocolVersion = 1;
39  class PluginHandle {
40  friend class ExternalCacheManager;
41 
42  public:
44  bool IsValid() const { return fd_connection_ >= 0; }
45  int fd_connection() const { return fd_connection_; }
46  std::string error_msg() const { return error_msg_; }
47 
48  private:
53 
54  std::string error_msg_;
55  };
56 
57  static PluginHandle *CreatePlugin(const std::string &locator,
58  const std::vector<std::string> &cmd_line);
59 
60  static ExternalCacheManager *Create(int fd_connection,
61  unsigned max_open_fds,
62  const std::string &ident);
63  virtual ~ExternalCacheManager();
64 
66  virtual std::string Describe();
68 
69  virtual int Open(const LabeledObject &object);
70  virtual int64_t GetSize(int fd);
71  virtual int Close(int fd);
72  virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
73  virtual int Dup(int fd);
74  virtual int Readahead(int fd);
75 
76 #ifdef __APPLE__
77  virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
78 #else
79  virtual uint32_t SizeOfTxn() {
80  return sizeof(Transaction) + max_object_size_;
81  }
82 #endif
83  virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
84  virtual void CtrlTxn(const Label &label, const int flags, void *txn);
85  virtual int64_t Write(const void *buf, uint64_t size, void *txn);
86  virtual int Reset(void *txn);
87  virtual int AbortTxn(void *txn);
88  virtual int OpenFromTxn(void *txn);
89  virtual int CommitTxn(void *txn);
90 
91  virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
92  virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
93 
94  virtual void Spawn();
95 
96  int64_t session_id() const { return session_id_; }
97  uint32_t max_object_size() const { return max_object_size_; }
98  uint64_t capabilities() const { return capabilities_; }
99  pid_t pid_plugin() const { return pid_plugin_; }
100 
101  protected:
102  virtual void *DoSaveState();
103  virtual int DoRestoreState(void *data);
104  virtual bool DoFreeState(void *data);
105 
106  private:
116  static const unsigned kMaxSupportedObjectSize = 512 * 1024;
120  static const unsigned kMinSupportedObjectSize = 4 * 1024;
121 
122  struct Transaction {
123  explicit Transaction(const shash::Any &id)
124  : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
125  , buf_pos(0)
126  , size(0)
128  , label()
129  , open_fds(0)
130  , flushed(false)
131  , committed(false)
132  , label_modified(false)
133  , transaction_id(0)
134  , id(id) { }
135 
140  unsigned char *buffer;
141  unsigned buf_pos;
142  uint64_t size;
143  uint64_t expected_size;
144  Label label;
145  int open_fds;
146  bool flushed;
147  bool committed;
149  uint64_t transaction_id;
151  }; // class Transaction
152 
153  struct ReadOnlyHandle {
155  explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
156  bool operator==(const ReadOnlyHandle &other) const {
157  return this->id == other.id;
158  }
159  bool operator!=(const ReadOnlyHandle &other) const {
160  return this->id != other.id;
161  }
163  }; // class ReadOnlyHandle
164 
165  class RpcJob {
166  public:
167  explicit RpcJob(cvmfs::MsgRefcountReq *msg)
168  : req_id_(msg->req_id())
169  , part_nr_(0)
170  , msg_req_(msg)
171  , frame_send_(msg) { }
172  explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
173  : req_id_(msg->req_id())
174  , part_nr_(0)
175  , msg_req_(msg)
176  , frame_send_(msg) { }
177  explicit RpcJob(cvmfs::MsgReadReq *msg)
178  : req_id_(msg->req_id())
179  , part_nr_(0)
180  , msg_req_(msg)
181  , frame_send_(msg) { }
182  explicit RpcJob(cvmfs::MsgStoreReq *msg)
183  : req_id_(msg->req_id())
184  , part_nr_(msg->part_nr())
185  , msg_req_(msg)
186  , frame_send_(msg) { }
187  explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
188  : req_id_(msg->req_id())
189  , part_nr_(0)
190  , msg_req_(msg)
191  , frame_send_(msg) { }
192  explicit RpcJob(cvmfs::MsgInfoReq *msg)
193  : req_id_(msg->req_id())
194  , part_nr_(0)
195  , msg_req_(msg)
196  , frame_send_(msg) { }
197  explicit RpcJob(cvmfs::MsgShrinkReq *msg)
198  : req_id_(msg->req_id())
199  , part_nr_(0)
200  , msg_req_(msg)
201  , frame_send_(msg) { }
202  explicit RpcJob(cvmfs::MsgListReq *msg)
203  : req_id_(msg->req_id())
204  , part_nr_(0)
205  , msg_req_(msg)
206  , frame_send_(msg) { }
207  explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
208  : req_id_(msg->req_id())
209  , part_nr_(0)
210  , msg_req_(msg)
211  , frame_send_(msg) { }
212  explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
213  : req_id_(msg->req_id())
214  , part_nr_(0)
215  , msg_req_(msg)
216  , frame_send_(msg) { }
217 
218  void set_attachment_send(void *data, unsigned size) {
219  frame_send_.set_attachment(data, size);
220  }
221 
222  void set_attachment_recv(void *data, unsigned size) {
223  frame_recv_.set_attachment(data, size);
224  }
225 
226  google::protobuf::MessageLite *msg_req() { return msg_req_; }
227  // Type checking has been already performed
228  cvmfs::MsgRefcountReply *msg_refcount_reply() {
229  cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
231  assert(m->req_id() == req_id_);
232  return m;
233  }
234  cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
235  cvmfs::MsgObjectInfoReply
236  *m = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
238  assert(m->req_id() == req_id_);
239  return m;
240  }
241  cvmfs::MsgReadReply *msg_read_reply() {
242  cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
244  assert(m->req_id() == req_id_);
245  return m;
246  }
247  cvmfs::MsgStoreReply *msg_store_reply() {
248  cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
250  assert(m->req_id() == req_id_);
251  assert(m->part_nr() == part_nr_);
252  return m;
253  }
254  cvmfs::MsgInfoReply *msg_info_reply() {
255  cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
257  assert(m->req_id() == req_id_);
258  return m;
259  }
260  cvmfs::MsgShrinkReply *msg_shrink_reply() {
261  cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
263  assert(m->req_id() == req_id_);
264  return m;
265  }
266  cvmfs::MsgListReply *msg_list_reply() {
267  cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
269  assert(m->req_id() == req_id_);
270  return m;
271  }
272  cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
273  cvmfs::MsgBreadcrumbReply
274  *m = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
276  assert(m->req_id() == req_id_);
277  return m;
278  }
279 
282  uint64_t req_id() const { return req_id_; }
283  uint64_t part_nr() const { return part_nr_; }
284 
285  private:
286  uint64_t req_id_;
287  uint64_t part_nr_;
288  google::protobuf::MessageLite *msg_req_;
291  }; // class RpcJob
292 
293  struct RpcInFlight {
294  RpcInFlight() : rpc_job(NULL), signal(NULL) { }
295  RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
296 
299  };
300 
301  static void *MainRead(void *data);
302  static int ConnectLocator(const std::string &locator, bool print_error);
303  static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
304 
305  explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
306  int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
307  void CallRemotely(RpcJob *rpc_job);
308  int ChangeRefcount(const shash::Any &id, int change_by);
309  int DoOpen(const shash::Any &id);
310  shash::Any GetHandle(int fd);
311  int Flush(bool do_commit, Transaction *transaction);
312 
313  pid_t pid_plugin_;
316  int64_t session_id_;
318  bool spawned_;
320  pthread_rwlock_t rwlock_fd_table_;
322 
326  pthread_mutex_t lock_send_fd_;
327  std::vector<RpcInFlight> inflight_rpcs_;
328  pthread_mutex_t lock_inflight_rpcs_;
329  pthread_t thread_read_;
330  uint64_t capabilities_;
331 }; // class ExternalCacheManager
332 
333 
335  public:
337  virtual bool HasCapability(Capabilities capability);
338 
339  virtual void Insert(const shash::Any &hash, const uint64_t size,
340  const std::string &description) { }
341 
342  virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
343  const std::string &description) { }
344 
345  virtual bool Pin(const shash::Any &hash, const uint64_t size,
346  const std::string &description, const bool is_catalog) {
347  return is_catalog;
348  }
349 
350  virtual void Unpin(const shash::Any &hash) { }
351  virtual void Touch(const shash::Any &hash) { }
352  virtual void Remove(const shash::Any &file) { }
353  virtual bool Cleanup(const uint64_t leave_size);
354 
355  virtual void RegisterBackChannel(int back_channel[2],
356  const std::string &channel_id);
357  virtual void UnregisterBackChannel(int back_channel[2],
358  const std::string &channel_id);
359 
360  virtual std::vector<std::string> List();
361  virtual std::vector<std::string> ListPinned();
362  virtual std::vector<std::string> ListCatalogs();
363  virtual std::vector<std::string> ListVolatile();
364  virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
365  virtual uint64_t GetCapacity();
366  virtual uint64_t GetSize();
367  virtual uint64_t GetSizePinned();
368  virtual bool SetLimit(uint64_t limit) { return false; } // NOLINT
369  virtual uint64_t GetCleanupRate(uint64_t period_s);
370 
371  virtual void Spawn() { }
372  virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
373  virtual uint32_t GetProtocolRevision() { return 0; }
374 
375  private:
376  struct QuotaInfo {
377  QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
378  uint64_t size;
379  uint64_t used;
380  uint64_t pinned;
381  uint64_t no_shrink;
382  };
383 
385  : cache_mgr_(cache_mgr) { }
386  int GetInfo(QuotaInfo *quota_info);
387  bool DoListing(cvmfs::EnumObjectType type,
388  std::vector<cvmfs::MsgListRecord> *result);
389 
391 };
392 
393 #endif // CVMFS_CACHE_EXTERN_H_
CacheTransport transport_
Definition: cache_extern.h:315
int Flush(bool do_commit, Transaction *transaction)
pthread_mutex_t lock_inflight_rpcs_
Definition: cache_extern.h:328
virtual ~ExternalCacheManager()
virtual uint64_t GetSizePinned()
int ChangeRefcount(const shash::Any &id, int change_by)
void set_attachment_recv(void *data, unsigned size)
Definition: cache_extern.h:222
const manifest::Manifest * manifest() const
Definition: repository.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
Definition: cache_extern.h:345
int64_t session_id() const
Definition: cache_extern.h:96
virtual uint64_t GetCleanupRate(uint64_t period_s)
int64_t atomic_int64
Definition: atomic.h:18
bool DoListing(cvmfs::EnumObjectType type, std::vector< cvmfs::MsgListRecord > *result)
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
RpcInFlight(RpcJob *r, Signal *s)
Definition: cache_extern.h:295
virtual CacheManagerIds id()
Definition: cache_extern.h:65
virtual int Readahead(int fd)
static const unsigned kMaxSupportedObjectSize
Definition: cache_extern.h:116
virtual void Touch(const shash::Any &hash)
Definition: cache_extern.h:351
virtual bool Cleanup(const uint64_t leave_size)
RpcJob(cvmfs::MsgListReq *msg)
Definition: cache_extern.h:202
pthread_rwlock_t rwlock_fd_table_
Definition: cache_extern.h:320
cvmfs::MsgShrinkReply * msg_shrink_reply()
Definition: cache_extern.h:260
cvmfs::MsgRefcountReply * msg_refcount_reply()
Definition: cache_extern.h:228
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: cache_extern.h:339
virtual uint64_t GetMaxFileSize()
Definition: cache_extern.h:364
Capabilities
Definition: quota.h:47
cvmfs::MsgReadReply * msg_read_reply()
Definition: cache_extern.h:241
RpcJob(cvmfs::MsgReadReq *msg)
Definition: cache_extern.h:177
RpcJob(cvmfs::MsgObjectInfoReq *msg)
Definition: cache_extern.h:172
virtual int AbortTxn(void *txn)
Definition: cache_extern.cc:76
virtual std::string Describe()
Transaction(const shash::Any &id)
Definition: cache_extern.h:123
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_extern.cc:85
virtual std::vector< std::string > ListCatalogs()
std::vector< RpcInFlight > inflight_rpcs_
Definition: cache_extern.h:327
static void * MainRead(void *data)
virtual uint32_t SizeOfTxn()
Definition: cache_extern.h:79
uint64_t capabilities() const
Definition: cache_extern.h:98
atomic_int64 next_request_id_
Definition: cache_extern.h:321
RpcJob(cvmfs::MsgStoreReq *msg)
Definition: cache_extern.h:182
virtual int Reset(void *txn)
assert((mem||(size==0))&&"Out Of Memory")
uint32_t max_object_size() const
Definition: cache_extern.h:97
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
FRIEND_TEST(T_ExternalCacheManager, TransactionAbort)
virtual void Remove(const shash::Any &file)
Definition: cache_extern.h:352
RpcJob(cvmfs::MsgStoreAbortReq *msg)
Definition: cache_extern.h:187
virtual std::vector< std::string > ListPinned()
virtual std::vector< std::string > List()
void set_attachment_send(void *data, unsigned size)
Definition: cache_extern.h:218
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: cache_extern.h:342
bool operator!=(const ReadOnlyHandle &other) const
Definition: cache_extern.h:159
cvmfs::MsgListReply * msg_list_reply()
Definition: cache_extern.h:266
ExternalCacheManager(int fd_connection, unsigned max_open_fds)
RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
Definition: cache_extern.h:212
virtual void Unpin(const shash::Any &hash)
Definition: cache_extern.h:350
virtual int Close(int fd)
virtual pid_t GetPid()
Definition: cache_extern.h:372
ExternalCacheManager * cache_mgr_
Definition: cache_extern.h:390
cvmfs::MsgStoreReply * msg_store_reply()
Definition: cache_extern.h:247
FdTable< ReadOnlyHandle > fd_table_
Definition: cache_extern.h:314
cvmfs::MsgBreadcrumbReply * msg_breadcrumb_reply()
Definition: cache_extern.h:272
virtual bool SetLimit(uint64_t limit)
Definition: cache_extern.h:368
void Transaction()
CacheManagerIds
Definition: cache.h:24
virtual void * DoSaveState()
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
virtual int Dup(int fd)
RpcJob(cvmfs::MsgRefcountReq *msg)
Definition: cache_extern.h:167
google::protobuf::MessageLite * msg_req_
Definition: cache_extern.h:288
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
RpcJob(cvmfs::MsgInfoReq *msg)
Definition: cache_extern.h:192
CacheTransport::Frame * frame_send()
Definition: cache_extern.h:280
static const shash::Any kInvalidHandle
Definition: cache_extern.h:111
CacheTransport::Frame frame_send_
Definition: cache_extern.h:289
cvmfs::MsgObjectInfoReply * msg_object_info_reply()
Definition: cache_extern.h:234
uint64_t req_id() const
Definition: cache_extern.h:282
int GetInfo(QuotaInfo *quota_info)
virtual uint32_t GetProtocolRevision()
Definition: cache_extern.h:373
google::protobuf::MessageLite * GetMsgTyped()
uint64_t part_nr() const
Definition: cache_extern.h:283
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
ExternalQuotaManager(ExternalCacheManager *cache_mgr)
Definition: cache_extern.h:384
static bool SpawnPlugin(const std::vector< std::string > &cmd_line)
cvmfs::MsgInfoReply * msg_info_reply()
Definition: cache_extern.h:254
QuotaManager * quota_mgr()
Definition: cache.h:191
bool operator==(const ReadOnlyHandle &other) const
Definition: cache_extern.h:156
pid_t pid_plugin() const
Definition: cache_extern.h:99
CacheTransport::Frame * frame_recv()
Definition: cache_extern.h:281
shash::Any GetHandle(int fd)
std::string error_msg() const
Definition: cache_extern.h:46
virtual int64_t GetSize(int fd)
static ExternalCacheManager * Create(int fd_connection, unsigned max_open_fds, const std::string &ident)
virtual void Spawn()
pthread_mutex_t lock_send_fd_
Definition: cache_extern.h:326
static const unsigned kMinSupportedObjectSize
Definition: cache_extern.h:120
static ExternalQuotaManager * Create(ExternalCacheManager *cache_mgr)
virtual bool DoFreeState(void *data)
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
ReadOnlyHandle(const shash::Any &h)
Definition: cache_extern.h:155
CacheTransport::Frame frame_recv_
Definition: cache_extern.h:290
RpcJob(cvmfs::MsgShrinkReq *msg)
Definition: cache_extern.h:197
virtual bool HasCapability(Capabilities capability)
void set_attachment(void *attachment, uint32_t att_size)
google::protobuf::MessageLite * msg_req()
Definition: cache_extern.h:226
void CallRemotely(RpcJob *rpc_job)
Definition: cache_extern.cc:93
virtual std::vector< std::string > ListVolatile()
virtual int OpenFromTxn(void *txn)
static int ConnectLocator(const std::string &locator, bool print_error)
static PluginHandle * CreatePlugin(const std::string &locator, const std::vector< std::string > &cmd_line)
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
virtual uint64_t GetCapacity()
static void size_t size
Definition: smalloc.h:54
virtual int CommitTxn(void *txn)
virtual void Spawn()
Definition: cache_extern.h:371
virtual int DoRestoreState(void *data)
RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
Definition: cache_extern.h:207
static const unsigned kPbProtocolVersion
Definition: cache_extern.h:35
virtual uint64_t GetSize()
int DoOpen(const shash::Any &id)
static const uint64_t kSizeUnknown
Definition: cache.h:74
virtual int Open(const LabeledObject &object)