CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
channel.h
Go to the documentation of this file.
1 
4 #ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_
5 #define CVMFS_CACHE_PLUGIN_CHANNEL_H_
6 
7 #include <pthread.h>
8 #include <stdint.h>
9 
10 #include <cassert>
11 #include <map>
12 #include <set>
13 #include <string>
14 #include <vector>
15 
16 #include "cache.pb.h"
17 #include "cache_transport.h"
18 #include "crypto/hash.h"
19 #include "manifest.h"
20 #include "smallhash.h"
21 #include "util/atomic.h"
22 #include "util/murmur.hxx"
23 #include "util/single_copy.h"
24 
32  public:
34  ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance)
35  : id(id)
36  , reponame(reponame)
37  , client_instance(client_instance)
38  , is_set(true) { }
39 
40  uint64_t id;
41  char *reponame;
43  bool is_set;
44  };
45 
46  static SessionCtx *GetInstance();
47  static void CleanupInstance();
48  ~SessionCtx();
49 
50  void Set(uint64_t id, char *reponame, char *client_instance);
51  void Unset();
52  void Get(uint64_t *id, char **reponame, char **client_instance);
53  bool IsSet();
54 
55  private:
57  static void TlsDestructor(void *data);
58 
59  SessionCtx();
60 
61  pthread_key_t thread_local_storage_;
62  pthread_mutex_t *lock_tls_blocks_;
63  std::vector<ThreadLocalStorage *> tls_blocks_;
64 };
65 
66 
67 class CachePlugin {
68  public:
69  static const unsigned kPbProtocolVersion = 1;
70  static const uint64_t kSizeUnknown;
71 
72  struct ObjectInfo {
74  : id()
76  , object_type(cvmfs::OBJECT_REGULAR)
77  , pinned(false) { }
79  uint64_t size;
80  cvmfs::EnumObjectType object_type;
81  bool pinned;
82  std::string description;
83  };
84 
85  struct Info {
87  uint64_t size_bytes;
88  uint64_t used_bytes;
89  uint64_t pinned_bytes;
90  int64_t no_shrink;
91  };
92 
93  bool Listen(const std::string &locator);
94  virtual ~CachePlugin();
95  void ProcessRequests(unsigned num_workers);
96  bool IsRunning();
97  void Terminate();
98  void WaitFor();
99  void AskToDetach();
100 
101  unsigned max_object_size() const { return max_object_size_; }
102  uint64_t capabilities() const { return capabilities_; }
103 
104  protected:
105  explicit CachePlugin(uint64_t capabilities);
106 
107  virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id,
108  int32_t change_by) = 0;
109  virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id,
110  ObjectInfo *info) = 0;
111  virtual cvmfs::EnumStatus Pread(const shash::Any &id,
112  uint64_t offset,
113  uint32_t *size,
114  unsigned char *buffer) = 0;
115  virtual cvmfs::EnumStatus StartTxn(const shash::Any &id,
116  const uint64_t txn_id,
117  const ObjectInfo &info) = 0;
118  virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id,
119  unsigned char *buffer,
120  uint32_t size) = 0;
121  virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0;
122  virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0;
123 
124  virtual cvmfs::EnumStatus GetInfo(Info *info) = 0;
125  virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to,
126  uint64_t *used_bytes) = 0;
127  virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id,
128  cvmfs::EnumObjectType type) = 0;
129  virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item) = 0;
130  virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0;
131 
132  virtual cvmfs::EnumStatus LoadBreadcrumb(
133  const std::string &fqrn, manifest::Breadcrumb *breadcrumb) = 0;
134  virtual cvmfs::EnumStatus StoreBreadcrumb(
135  const std::string &fqrn, const manifest::Breadcrumb &breadcrumb) = 0;
136 
137  private:
138  static const unsigned kDefaultMaxObjectSize = 256 * 1024; // 256kB
139  static const unsigned kListingSize = 4 * 1024 * 1024; // 4MB
140  static const char kSignalTerminate = 'q';
141  static const char kSignalDetach = 'd';
142 
143  struct UniqueRequest {
144  UniqueRequest() : session_id(-1), req_id(-1) { }
145  UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { }
146  bool operator==(const UniqueRequest &other) const {
147  return (this->session_id == other.session_id)
148  && (this->req_id == other.req_id);
149  }
150  bool operator!=(const UniqueRequest &other) const {
151  return !(*this == other);
152  }
153 
154  int64_t session_id;
155  int64_t req_id;
156  };
157 
162  struct SessionInfo {
163  SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { }
164  SessionInfo(uint64_t id, const std::string &name);
165 
166  uint64_t id;
167  std::string name;
168  char *reponame;
170  };
171 
178  public:
179  SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) {
180  char *reponame = NULL;
181  char *client_instance = NULL;
182  std::map<uint64_t, SessionInfo>::const_iterator
183  iter = plugin->sessions_.find(session_id);
184  if (iter != plugin->sessions_.end()) {
185  reponame = iter->second.reponame;
186  client_instance = iter->second.client_instance;
187  }
188  SessionCtx *session_ctx = SessionCtx::GetInstance();
189  assert(session_ctx);
190  session_ctx->Set(session_id, reponame, client_instance);
191  }
192 
194  SessionCtx *session_ctx = SessionCtx::GetInstance();
195  assert(session_ctx);
196  session_ctx->Unset();
197  }
198  };
199 
200  static void *MainProcessRequests(void *data);
201 
202  inline uint64_t NextSessionId() {
203  return atomic_xadd64(&next_session_id_, 1);
204  }
205  inline uint64_t NextTxnId() { return atomic_xadd64(&next_txn_id_, 1); }
206  inline uint64_t NextLstId() { return atomic_xadd64(&next_lst_id_, 1); }
207  static inline uint32_t HashUniqueRequest(const UniqueRequest &req) {
208  return MurmurHash2(&req, sizeof(req), 0x07387a4f);
209  }
210 
211  bool HandleRequest(int fd_con);
212  void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport);
213  void HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
214  CacheTransport *transport);
215  void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
216  CacheTransport *transport);
217  void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport);
218  void HandleStore(cvmfs::MsgStoreReq *msg_req,
219  CacheTransport::Frame *frame,
220  CacheTransport *transport);
221  void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
222  CacheTransport *transport);
223  void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport);
224  void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport);
225  void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport);
226  void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
227  CacheTransport *transport);
228  void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
229  CacheTransport *transport);
230  void HandleIoctl(cvmfs::MsgIoctl *msg_req);
231  void SendDetachRequests();
232 
233  void NotifySupervisor(char signal);
234 
235  void LogSessionError(uint64_t session_id,
236  cvmfs::EnumStatus status,
237  const std::string &msg);
238  void LogSessionInfo(uint64_t session_id, const std::string &msg);
239 
240  bool is_local_;
241  uint64_t capabilities_;
245  unsigned num_workers_;
252  std::string name_;
257  std::set<int> connections_;
258  std::map<uint64_t, SessionInfo> sessions_;
259  pthread_t thread_io_;
260  int pipe_ctrl_[2];
261 }; // class CachePlugin
262 
263 #endif // CVMFS_CACHE_PLUGIN_CHANNEL_H_
uint64_t pinned_bytes
Definition: channel.h:89
void Get(uint64_t *id, char **reponame, char **client_instance)
Definition: channel.cc:71
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
Definition: channel.cc:197
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
int64_t atomic_int64
Definition: atomic.h:18
atomic_int64 next_session_id_
Definition: channel.h:253
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:369
static void CleanupInstance()
Definition: channel.cc:32
void Set(uint64_t id, char *reponame, char *client_instance)
Definition: channel.cc:96
static const uint64_t kSizeUnknown
Definition: channel.h:70
bool is_set
either not yet set or deliberately unset
Definition: channel.h:43
bool IsRunning()
Definition: channel.cc:674
uint64_t NextLstId()
Definition: channel.h:206
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
Definition: channel.cc:297
std::set< int > connections_
Definition: channel.h:257
virtual ~CachePlugin()
Definition: channel.cc:187
uint64_t capabilities_
Definition: channel.h:241
std::map< uint64_t, SessionInfo > sessions_
Definition: channel.h:258
uint64_t capabilities() const
Definition: channel.h:102
SessionCtxGuard(uint64_t session_id, CachePlugin *plugin)
Definition: channel.h:179
bool IsSet()
Definition: channel.cc:86
void ProcessRequests(unsigned num_workers)
Definition: channel.cc:869
cvmfs::EnumObjectType object_type
Definition: channel.h:80
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
static SessionCtx * instance_
Definition: channel.h:56
void Unset()
Definition: channel.cc:133
static const unsigned kDefaultMaxObjectSize
Definition: channel.h:138
assert((mem||(size==0))&&"Out Of Memory")
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static uint32_t HashUniqueRequest(const UniqueRequest &req)
Definition: channel.h:207
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
Definition: channel.cc:541
bool is_local_
Definition: channel.h:240
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
Definition: channel.cc:167
uint64_t num_inlimbo_clients_
Definition: channel.h:251
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
Definition: channel.cc:435
int32_t atomic_int32
Definition: atomic.h:17
bool Listen(const std::string &locator)
Definition: channel.cc:677
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
void Terminate()
Definition: channel.cc:892
uint64_t NextTxnId()
Definition: channel.h:205
void AskToDetach()
Definition: channel.cc:161
static const unsigned kListingSize
Definition: channel.h:139
static void TlsDestructor(void *data)
Definition: channel.cc:115
void NotifySupervisor(char signal)
Definition: channel.cc:860
unsigned max_object_size_
Definition: channel.h:246
ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance)
Definition: channel.h:34
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int64_t no_shrink
Definition: channel.h:90
static const char kSignalDetach
Definition: channel.h:141
std::string description
Definition: channel.h:82
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
pthread_t thread_io_
Definition: channel.h:259
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
Definition: channel.cc:745
int fd_socket_lock_
Definition: channel.h:243
std::vector< ThreadLocalStorage * > tls_blocks_
Definition: channel.h:63
static const unsigned kPbProtocolVersion
Definition: channel.h:69
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
Definition: channel.cc:559
pthread_mutex_t * lock_tls_blocks_
Definition: channel.h:62
SessionCtx()
Definition: channel.cc:38
int fd_socket_
Definition: channel.h:242
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
unsigned max_object_size() const
Definition: channel.h:101
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
Definition: channel.cc:315
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:398
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
Definition: channel.cc:251
void SendDetachRequests()
Definition: channel.cc:878
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
Definition: channel.h:256
atomic_int64 next_lst_id_
Definition: channel.h:255
uint64_t used_bytes
Definition: channel.h:88
UniqueRequest(int64_t s, int64_t r)
Definition: channel.h:145
atomic_int32 running_
Definition: channel.h:244
unsigned num_workers_
Definition: channel.h:245
~SessionCtx()
Definition: channel.cc:46
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
Definition: channel.cc:586
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
Definition: channel.cc:759
pthread_key_t thread_local_storage_
Definition: channel.h:61
bool operator==(const UniqueRequest &other) const
Definition: channel.h:146
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
void WaitFor()
Definition: channel.cc:902
static const char kSignalTerminate
Definition: channel.h:140
static void size_t size
Definition: smalloc.h:54
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
std::string name_
Definition: channel.h:252
bool operator!=(const UniqueRequest &other) const
Definition: channel.h:150
atomic_int64 next_txn_id_
Definition: channel.h:254
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:275
int pipe_ctrl_[2]
Definition: channel.h:260
void LogSessionInfo(uint64_t session_id, const std::string &msg)
Definition: channel.cc:734
uint64_t NextSessionId()
Definition: channel.h:202
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:226
uint32_t MurmurHash2(const void *key, int len, uint32_t seed)
Definition: murmur.hxx:23
bool HandleRequest(int fd_con)
Definition: channel.cc:460
uint64_t size_bytes
Definition: channel.h:87
static SessionCtx * GetInstance()
Definition: channel.cc:59