CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_extern.h
Go to the documentation of this file.
1 
4 #ifndef CVMFS_CACHE_EXTERN_H_
5 #define CVMFS_CACHE_EXTERN_H_
6 
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10 
11 #include <pthread.h>
12 #include <stdint.h>
13 #include <unistd.h>
14 
15 #include <cassert>
16 #include <string>
17 #include <vector>
18 
19 #include "cache.h"
20 #include "cache_transport.h"
21 #include "crypto/hash.h"
22 #include "fd_table.h"
23 #include "gtest/gtest_prod.h"
24 #include "quota.h"
25 #include "util/atomic.h"
26 #include "util/concurrency.h"
27 #include "util/single_copy.h"
28 
29 
31  FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
32  friend class ExternalQuotaManager;
33 
34  public:
35  static const unsigned kPbProtocolVersion = 1;
39  class PluginHandle {
40  friend class ExternalCacheManager;
41  public:
43  bool IsValid() const { return fd_connection_ >= 0; }
44  int fd_connection() const { return fd_connection_; }
45  std::string error_msg() const { return error_msg_; }
46 
47  private:
52 
53  std::string error_msg_;
54  };
55 
56  static PluginHandle *CreatePlugin(
57  const std::string &locator,
58  const std::vector<std::string> &cmd_line);
59 
60  static ExternalCacheManager *Create(int fd_connection,
61  unsigned max_open_fds,
62  const std::string &ident);
63  virtual ~ExternalCacheManager();
64 
66  virtual std::string Describe();
68 
69  virtual int Open(const LabeledObject &object);
70  virtual int64_t GetSize(int fd);
71  virtual int Close(int fd);
72  virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
73  virtual int Dup(int fd);
74  virtual int Readahead(int fd);
75 
76 #ifdef __APPLE__
77  virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
78 #else
79  virtual uint32_t SizeOfTxn() {
80  return sizeof(Transaction) + max_object_size_;
81  }
82 #endif
83  virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
84  virtual void CtrlTxn(const Label &label,
85  const int flags,
86  void *txn);
87  virtual int64_t Write(const void *buf, uint64_t size, void *txn);
88  virtual int Reset(void *txn);
89  virtual int AbortTxn(void *txn);
90  virtual int OpenFromTxn(void *txn);
91  virtual int CommitTxn(void *txn);
92 
93  virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn);
94  virtual bool StoreBreadcrumb(const manifest::Manifest &manifest);
95 
96  virtual void Spawn();
97 
98  int64_t session_id() const { return session_id_; }
99  uint32_t max_object_size() const { return max_object_size_; }
100  uint64_t capabilities() const { return capabilities_; }
101  pid_t pid_plugin() const { return pid_plugin_; }
102 
103  protected:
104  virtual void *DoSaveState();
105  virtual int DoRestoreState(void *data);
106  virtual bool DoFreeState(void *data);
107 
108  private:
118  static const unsigned kMaxSupportedObjectSize = 512 * 1024;
122  static const unsigned kMinSupportedObjectSize = 4 * 1024;
123 
124  struct Transaction {
125  explicit Transaction(const shash::Any &id)
126  : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
127  , buf_pos(0)
128  , size(0)
130  , label()
131  , open_fds(0)
132  , flushed(false)
133  , committed(false)
134  , label_modified(false)
135  , transaction_id(0)
136  , id(id)
137  { }
138 
143  unsigned char *buffer;
144  unsigned buf_pos;
145  uint64_t size;
146  uint64_t expected_size;
147  Label label;
148  int open_fds;
149  bool flushed;
150  bool committed;
152  uint64_t transaction_id;
154  }; // class Transaction
155 
156  struct ReadOnlyHandle {
158  explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
159  bool operator ==(const ReadOnlyHandle &other) const {
160  return this->id == other.id;
161  }
162  bool operator !=(const ReadOnlyHandle &other) const {
163  return this->id != other.id;
164  }
166  }; // class ReadOnlyHandle
167 
168  class RpcJob {
169  public:
170  explicit RpcJob(cvmfs::MsgRefcountReq *msg)
171  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
172  explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
173  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
174  explicit RpcJob(cvmfs::MsgReadReq *msg)
175  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
176  explicit RpcJob(cvmfs::MsgStoreReq *msg)
177  : req_id_(msg->req_id()), part_nr_(msg->part_nr()), msg_req_(msg),
178  frame_send_(msg) { }
179  explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
180  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg),
181  frame_send_(msg) { }
182  explicit RpcJob(cvmfs::MsgInfoReq *msg)
183  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
184  explicit RpcJob(cvmfs::MsgShrinkReq *msg)
185  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
186  explicit RpcJob(cvmfs::MsgListReq *msg)
187  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
188  explicit RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
189  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
190  explicit RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
191  : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
192 
193  void set_attachment_send(void *data, unsigned size) {
194  frame_send_.set_attachment(data, size);
195  }
196 
197  void set_attachment_recv(void *data, unsigned size) {
198  frame_recv_.set_attachment(data, size);
199  }
200 
201  google::protobuf::MessageLite *msg_req() { return msg_req_; }
202  // Type checking has been already performed
203  cvmfs::MsgRefcountReply *msg_refcount_reply() {
204  cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
206  assert(m->req_id() == req_id_);
207  return m;
208  }
209  cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
210  cvmfs::MsgObjectInfoReply *m =
211  reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
213  assert(m->req_id() == req_id_);
214  return m;
215  }
216  cvmfs::MsgReadReply *msg_read_reply() {
217  cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
219  assert(m->req_id() == req_id_);
220  return m;
221  }
222  cvmfs::MsgStoreReply *msg_store_reply() {
223  cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
225  assert(m->req_id() == req_id_);
226  assert(m->part_nr() == part_nr_);
227  return m;
228  }
229  cvmfs::MsgInfoReply *msg_info_reply() {
230  cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
232  assert(m->req_id() == req_id_);
233  return m;
234  }
235  cvmfs::MsgShrinkReply *msg_shrink_reply() {
236  cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
238  assert(m->req_id() == req_id_);
239  return m;
240  }
241  cvmfs::MsgListReply *msg_list_reply() {
242  cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
244  assert(m->req_id() == req_id_);
245  return m;
246  }
247  cvmfs::MsgBreadcrumbReply *msg_breadcrumb_reply() {
248  cvmfs::MsgBreadcrumbReply *m =
249  reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(
251  assert(m->req_id() == req_id_);
252  return m;
253  }
254 
257  uint64_t req_id() const { return req_id_; }
258  uint64_t part_nr() const { return part_nr_; }
259 
260  private:
261  uint64_t req_id_;
262  uint64_t part_nr_;
263  google::protobuf::MessageLite *msg_req_;
266  }; // class RpcJob
267 
268  struct RpcInFlight {
269  RpcInFlight() : rpc_job(NULL), signal(NULL) { }
270  RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
271 
274  };
275 
276  static void *MainRead(void *data);
277  static int ConnectLocator(const std::string &locator, bool print_error);
278  static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
279 
280  explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
281  int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
282  void CallRemotely(RpcJob *rpc_job);
283  int ChangeRefcount(const shash::Any &id, int change_by);
284  int DoOpen(const shash::Any &id);
285  shash::Any GetHandle(int fd);
286  int Flush(bool do_commit, Transaction *transaction);
287 
288  pid_t pid_plugin_;
291  int64_t session_id_;
293  bool spawned_;
295  pthread_rwlock_t rwlock_fd_table_;
297 
301  pthread_mutex_t lock_send_fd_;
302  std::vector<RpcInFlight> inflight_rpcs_;
303  pthread_mutex_t lock_inflight_rpcs_;
304  pthread_t thread_read_;
305  uint64_t capabilities_;
306 }; // class ExternalCacheManager
307 
308 
310  public:
312  virtual bool HasCapability(Capabilities capability);
313 
314  virtual void Insert(const shash::Any &hash, const uint64_t size,
315  const std::string &description)
316  { }
317 
318  virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
319  const std::string &description)
320  { }
321 
322  virtual bool Pin(const shash::Any &hash, const uint64_t size,
323  const std::string &description, const bool is_catalog)
324  { return is_catalog; }
325 
326  virtual void Unpin(const shash::Any &hash) { }
327  virtual void Touch(const shash::Any &hash) { }
328  virtual void Remove(const shash::Any &file) { }
329  virtual bool Cleanup(const uint64_t leave_size);
330 
331  virtual void RegisterBackChannel(int back_channel[2],
332  const std::string &channel_id);
333  virtual void UnregisterBackChannel(int back_channel[2],
334  const std::string &channel_id);
335 
336  virtual std::vector<std::string> List();
337  virtual std::vector<std::string> ListPinned();
338  virtual std::vector<std::string> ListCatalogs();
339  virtual std::vector<std::string> ListVolatile();
340  virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
341  virtual uint64_t GetCapacity();
342  virtual uint64_t GetSize();
343  virtual uint64_t GetSizePinned();
344  virtual uint64_t GetCleanupRate(uint64_t period_s);
345 
346  virtual void Spawn() { }
347  virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
348  virtual uint32_t GetProtocolRevision() { return 0; }
349 
350  private:
351  struct QuotaInfo {
352  QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
353  uint64_t size;
354  uint64_t used;
355  uint64_t pinned;
356  uint64_t no_shrink;
357  };
358 
360  : cache_mgr_(cache_mgr) { }
361  int GetInfo(QuotaInfo *quota_info);
362  bool DoListing(cvmfs::EnumObjectType type,
363  std::vector<cvmfs::MsgListRecord> *result);
364 
366 };
367 
368 #endif // CVMFS_CACHE_EXTERN_H_
CacheTransport transport_
Definition: cache_extern.h:290
int Flush(bool do_commit, Transaction *transaction)
pthread_mutex_t lock_inflight_rpcs_
Definition: cache_extern.h:303
virtual ~ExternalCacheManager()
virtual uint64_t GetSizePinned()
int ChangeRefcount(const shash::Any &id, int change_by)
void set_attachment_recv(void *data, unsigned size)
Definition: cache_extern.h:197
const manifest::Manifest * manifest() const
Definition: repository.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
Definition: cache_extern.h:322
int64_t session_id() const
Definition: cache_extern.h:98
virtual uint64_t GetCleanupRate(uint64_t period_s)
int64_t atomic_int64
Definition: atomic.h:18
bool DoListing(cvmfs::EnumObjectType type, std::vector< cvmfs::MsgListRecord > *result)
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
RpcInFlight(RpcJob *r, Signal *s)
Definition: cache_extern.h:270
virtual CacheManagerIds id()
Definition: cache_extern.h:65
virtual int Readahead(int fd)
static const unsigned kMaxSupportedObjectSize
Definition: cache_extern.h:118
virtual void Touch(const shash::Any &hash)
Definition: cache_extern.h:327
virtual bool Cleanup(const uint64_t leave_size)
RpcJob(cvmfs::MsgListReq *msg)
Definition: cache_extern.h:186
pthread_rwlock_t rwlock_fd_table_
Definition: cache_extern.h:295
cvmfs::MsgShrinkReply * msg_shrink_reply()
Definition: cache_extern.h:235
cvmfs::MsgRefcountReply * msg_refcount_reply()
Definition: cache_extern.h:203
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: cache_extern.h:314
virtual uint64_t GetMaxFileSize()
Definition: cache_extern.h:340
Capabilities
Definition: quota.h:47
cvmfs::MsgReadReply * msg_read_reply()
Definition: cache_extern.h:216
RpcJob(cvmfs::MsgReadReq *msg)
Definition: cache_extern.h:174
RpcJob(cvmfs::MsgObjectInfoReq *msg)
Definition: cache_extern.h:172
virtual int AbortTxn(void *txn)
Definition: cache_extern.cc:76
virtual std::string Describe()
Transaction(const shash::Any &id)
Definition: cache_extern.h:125
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_extern.cc:85
virtual std::vector< std::string > ListCatalogs()
std::vector< RpcInFlight > inflight_rpcs_
Definition: cache_extern.h:302
static void * MainRead(void *data)
virtual uint32_t SizeOfTxn()
Definition: cache_extern.h:79
uint64_t capabilities() const
Definition: cache_extern.h:100
atomic_int64 next_request_id_
Definition: cache_extern.h:296
RpcJob(cvmfs::MsgStoreReq *msg)
Definition: cache_extern.h:176
virtual int Reset(void *txn)
assert((mem||(size==0))&&"Out Of Memory")
uint32_t max_object_size() const
Definition: cache_extern.h:99
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
FRIEND_TEST(T_ExternalCacheManager, TransactionAbort)
virtual void Remove(const shash::Any &file)
Definition: cache_extern.h:328
RpcJob(cvmfs::MsgStoreAbortReq *msg)
Definition: cache_extern.h:179
virtual std::vector< std::string > ListPinned()
virtual std::vector< std::string > List()
void set_attachment_send(void *data, unsigned size)
Definition: cache_extern.h:193
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: cache_extern.h:318
bool operator!=(const ReadOnlyHandle &other) const
Definition: cache_extern.h:162
cvmfs::MsgListReply * msg_list_reply()
Definition: cache_extern.h:241
ExternalCacheManager(int fd_connection, unsigned max_open_fds)
RpcJob(cvmfs::MsgBreadcrumbStoreReq *msg)
Definition: cache_extern.h:190
virtual void Unpin(const shash::Any &hash)
Definition: cache_extern.h:326
virtual int Close(int fd)
virtual pid_t GetPid()
Definition: cache_extern.h:347
ExternalCacheManager * cache_mgr_
Definition: cache_extern.h:365
cvmfs::MsgStoreReply * msg_store_reply()
Definition: cache_extern.h:222
FdTable< ReadOnlyHandle > fd_table_
Definition: cache_extern.h:289
cvmfs::MsgBreadcrumbReply * msg_breadcrumb_reply()
Definition: cache_extern.h:247
void Transaction()
CacheManagerIds
Definition: cache.h:24
virtual void * DoSaveState()
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
virtual int Dup(int fd)
RpcJob(cvmfs::MsgRefcountReq *msg)
Definition: cache_extern.h:170
google::protobuf::MessageLite * msg_req_
Definition: cache_extern.h:263
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
RpcJob(cvmfs::MsgInfoReq *msg)
Definition: cache_extern.h:182
CacheTransport::Frame * frame_send()
Definition: cache_extern.h:255
static const shash::Any kInvalidHandle
Definition: cache_extern.h:113
CacheTransport::Frame frame_send_
Definition: cache_extern.h:264
cvmfs::MsgObjectInfoReply * msg_object_info_reply()
Definition: cache_extern.h:209
uint64_t req_id() const
Definition: cache_extern.h:257
int GetInfo(QuotaInfo *quota_info)
virtual uint32_t GetProtocolRevision()
Definition: cache_extern.h:348
google::protobuf::MessageLite * GetMsgTyped()
uint64_t part_nr() const
Definition: cache_extern.h:258
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
ExternalQuotaManager(ExternalCacheManager *cache_mgr)
Definition: cache_extern.h:359
static bool SpawnPlugin(const std::vector< std::string > &cmd_line)
cvmfs::MsgInfoReply * msg_info_reply()
Definition: cache_extern.h:229
QuotaManager * quota_mgr()
Definition: cache.h:193
bool operator==(const ReadOnlyHandle &other) const
Definition: cache_extern.h:159
pid_t pid_plugin() const
Definition: cache_extern.h:101
CacheTransport::Frame * frame_recv()
Definition: cache_extern.h:256
shash::Any GetHandle(int fd)
std::string error_msg() const
Definition: cache_extern.h:45
virtual int64_t GetSize(int fd)
static ExternalCacheManager * Create(int fd_connection, unsigned max_open_fds, const std::string &ident)
virtual void Spawn()
pthread_mutex_t lock_send_fd_
Definition: cache_extern.h:301
static const unsigned kMinSupportedObjectSize
Definition: cache_extern.h:122
static ExternalQuotaManager * Create(ExternalCacheManager *cache_mgr)
virtual bool DoFreeState(void *data)
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
ReadOnlyHandle(const shash::Any &h)
Definition: cache_extern.h:158
CacheTransport::Frame frame_recv_
Definition: cache_extern.h:265
RpcJob(cvmfs::MsgShrinkReq *msg)
Definition: cache_extern.h:184
virtual bool HasCapability(Capabilities capability)
void set_attachment(void *attachment, uint32_t att_size)
google::protobuf::MessageLite * msg_req()
Definition: cache_extern.h:201
void CallRemotely(RpcJob *rpc_job)
Definition: cache_extern.cc:93
virtual std::vector< std::string > ListVolatile()
virtual int OpenFromTxn(void *txn)
static int ConnectLocator(const std::string &locator, bool print_error)
static PluginHandle * CreatePlugin(const std::string &locator, const std::vector< std::string > &cmd_line)
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
virtual uint64_t GetCapacity()
static void size_t size
Definition: smalloc.h:54
virtual int CommitTxn(void *txn)
virtual void Spawn()
Definition: cache_extern.h:346
virtual int DoRestoreState(void *data)
RpcJob(cvmfs::MsgBreadcrumbLoadReq *msg)
Definition: cache_extern.h:188
static const unsigned kPbProtocolVersion
Definition: cache_extern.h:35
virtual uint64_t GetSize()
int DoOpen(const shash::Any &id)
static const uint64_t kSizeUnknown
Definition: cache.h:74
virtual int Open(const LabeledObject &object)