CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
channel.h
Go to the documentation of this file.
1 
4 #ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_
5 #define CVMFS_CACHE_PLUGIN_CHANNEL_H_
6 
7 #include <pthread.h>
8 #include <stdint.h>
9 
10 #include <cassert>
11 #include <map>
12 #include <set>
13 #include <string>
14 #include <vector>
15 
16 #include "cache.pb.h"
17 #include "cache_transport.h"
18 #include "crypto/hash.h"
19 #include "manifest.h"
20 #include "smallhash.h"
21 #include "util/atomic.h"
22 #include "util/murmur.hxx"
23 #include "util/single_copy.h"
24 
32  public:
34  ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance)
35  : id(id)
36  , reponame(reponame)
37  , client_instance(client_instance)
38  , is_set(true)
39  { }
40 
41  uint64_t id;
42  char *reponame;
44  bool is_set;
45  };
46 
47  static SessionCtx *GetInstance();
48  static void CleanupInstance();
49  ~SessionCtx();
50 
51  void Set(uint64_t id, char *reponame, char *client_instance);
52  void Unset();
53  void Get(uint64_t *id, char **reponame, char **client_instance);
54  bool IsSet();
55 
56  private:
58  static void TlsDestructor(void *data);
59 
60  SessionCtx();
61 
62  pthread_key_t thread_local_storage_;
63  pthread_mutex_t *lock_tls_blocks_;
64  std::vector<ThreadLocalStorage *> tls_blocks_;
65 };
66 
67 
68 class CachePlugin {
69  public:
70  static const unsigned kPbProtocolVersion = 1;
71  static const uint64_t kSizeUnknown;
72 
73  struct ObjectInfo {
75  : id()
77  , object_type(cvmfs::OBJECT_REGULAR)
78  , pinned(false) { }
80  uint64_t size;
81  cvmfs::EnumObjectType object_type;
82  bool pinned;
83  std::string description;
84  };
85 
86  struct Info {
88  uint64_t size_bytes;
89  uint64_t used_bytes;
90  uint64_t pinned_bytes;
91  int64_t no_shrink;
92  };
93 
94  bool Listen(const std::string &locator);
95  virtual ~CachePlugin();
96  void ProcessRequests(unsigned num_workers);
97  bool IsRunning();
98  void Terminate();
99  void WaitFor();
100  void AskToDetach();
101 
102  unsigned max_object_size() const { return max_object_size_; }
103  uint64_t capabilities() const { return capabilities_; }
104 
105  protected:
106  explicit CachePlugin(uint64_t capabilities);
107 
108  virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id,
109  int32_t change_by) = 0;
110  virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id,
111  ObjectInfo *info) = 0;
112  virtual cvmfs::EnumStatus Pread(const shash::Any &id,
113  uint64_t offset,
114  uint32_t *size,
115  unsigned char *buffer) = 0;
116  virtual cvmfs::EnumStatus StartTxn(const shash::Any &id,
117  const uint64_t txn_id,
118  const ObjectInfo &info) = 0;
119  virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id,
120  unsigned char *buffer,
121  uint32_t size) = 0;
122  virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0;
123  virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0;
124 
125  virtual cvmfs::EnumStatus GetInfo(Info *info) = 0;
126  virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to,
127  uint64_t *used_bytes) = 0;
128  virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id,
129  cvmfs::EnumObjectType type) = 0;
130  virtual cvmfs::EnumStatus ListingNext(int64_t lst_id,
131  ObjectInfo *item) = 0;
132  virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0;
133 
134  virtual cvmfs::EnumStatus LoadBreadcrumb(
135  const std::string &fqrn, manifest::Breadcrumb *breadcrumb) = 0;
136  virtual cvmfs::EnumStatus StoreBreadcrumb(
137  const std::string &fqrn, const manifest::Breadcrumb &breadcrumb) = 0;
138 
139  private:
140  static const unsigned kDefaultMaxObjectSize = 256 * 1024; // 256kB
141  static const unsigned kListingSize = 4 * 1024 * 1024; // 4MB
142  static const char kSignalTerminate = 'q';
143  static const char kSignalDetach = 'd';
144 
145  struct UniqueRequest {
146  UniqueRequest() : session_id(-1), req_id(-1) { }
147  UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { }
148  bool operator ==(const UniqueRequest &other) const {
149  return (this->session_id == other.session_id) &&
150  (this->req_id == other.req_id);
151  }
152  bool operator !=(const UniqueRequest &other) const {
153  return !(*this == other);
154  }
155 
156  int64_t session_id;
157  int64_t req_id;
158  };
159 
164  struct SessionInfo {
165  SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { }
166  SessionInfo(uint64_t id, const std::string &name);
167 
168  uint64_t id;
169  std::string name;
170  char *reponame;
172  };
173 
180  public:
181  SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) {
182  char *reponame = NULL;
183  char *client_instance = NULL;
184  std::map<uint64_t, SessionInfo>::const_iterator iter =
185  plugin->sessions_.find(session_id);
186  if (iter != plugin->sessions_.end()) {
187  reponame = iter->second.reponame;
188  client_instance = iter->second.client_instance;
189  }
190  SessionCtx *session_ctx = SessionCtx::GetInstance();
191  assert(session_ctx);
192  session_ctx->Set(session_id, reponame, client_instance);
193  }
194 
196  SessionCtx *session_ctx = SessionCtx::GetInstance();
197  assert(session_ctx);
198  session_ctx->Unset();
199  }
200  };
201 
202  static void *MainProcessRequests(void *data);
203 
204  inline uint64_t NextSessionId() {
205  return atomic_xadd64(&next_session_id_, 1);
206  }
207  inline uint64_t NextTxnId() {
208  return atomic_xadd64(&next_txn_id_, 1);
209  }
210  inline uint64_t NextLstId() {
211  return atomic_xadd64(&next_lst_id_, 1);
212  }
213  static inline uint32_t HashUniqueRequest(const UniqueRequest &req) {
214  return MurmurHash2(&req, sizeof(req), 0x07387a4f);
215  }
216 
217  bool HandleRequest(int fd_con);
218  void HandleHandshake(cvmfs::MsgHandshake *msg_req,
219  CacheTransport *transport);
220  void HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
221  CacheTransport *transport);
222  void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
223  CacheTransport *transport);
224  void HandleRead(cvmfs::MsgReadReq *msg_req,
225  CacheTransport *transport);
226  void HandleStore(cvmfs::MsgStoreReq *msg_req,
227  CacheTransport::Frame *frame,
228  CacheTransport *transport);
229  void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
230  CacheTransport *transport);
231  void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport);
232  void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport);
233  void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport);
234  void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
235  CacheTransport *transport);
236  void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
237  CacheTransport *transport);
238  void HandleIoctl(cvmfs::MsgIoctl *msg_req);
239  void SendDetachRequests();
240 
241  void NotifySupervisor(char signal);
242 
243  void LogSessionError(uint64_t session_id,
244  cvmfs::EnumStatus status,
245  const std::string &msg);
246  void LogSessionInfo(uint64_t session_id, const std::string &msg);
247 
248  bool is_local_;
249  uint64_t capabilities_;
253  unsigned num_workers_;
260  std::string name_;
265  std::set<int> connections_;
266  std::map<uint64_t, SessionInfo> sessions_;
267  pthread_t thread_io_;
268  int pipe_ctrl_[2];
269 }; // class CachePlugin
270 
271 #endif // CVMFS_CACHE_PLUGIN_CHANNEL_H_
uint64_t pinned_bytes
Definition: channel.h:90
void Get(uint64_t *id, char **reponame, char **client_instance)
Definition: channel.cc:71
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
Definition: channel.cc:200
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
int64_t atomic_int64
Definition: atomic.h:18
atomic_int64 next_session_id_
Definition: channel.h:261
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:383
static void CleanupInstance()
Definition: channel.cc:32
void Set(uint64_t id, char *reponame, char *client_instance)
Definition: channel.cc:96
static const uint64_t kSizeUnknown
Definition: channel.h:71
bool is_set
either not yet set or deliberately unset
Definition: channel.h:44
bool IsRunning()
Definition: channel.cc:696
uint64_t NextLstId()
Definition: channel.h:210
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
Definition: channel.cc:309
std::set< int > connections_
Definition: channel.h:265
virtual ~CachePlugin()
Definition: channel.cc:190
uint64_t capabilities_
Definition: channel.h:249
std::map< uint64_t, SessionInfo > sessions_
Definition: channel.h:266
uint64_t capabilities() const
Definition: channel.h:103
SessionCtxGuard(uint64_t session_id, CachePlugin *plugin)
Definition: channel.h:181
bool IsSet()
Definition: channel.cc:86
void ProcessRequests(unsigned num_workers)
Definition: channel.cc:897
cvmfs::EnumObjectType object_type
Definition: channel.h:81
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
static SessionCtx * instance_
Definition: channel.h:57
void Unset()
Definition: channel.cc:133
static const unsigned kDefaultMaxObjectSize
Definition: channel.h:140
assert((mem||(size==0))&&"Out Of Memory")
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static uint32_t HashUniqueRequest(const UniqueRequest &req)
Definition: channel.h:213
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
Definition: channel.cc:561
bool is_local_
Definition: channel.h:248
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
Definition: channel.cc:169
uint64_t num_inlimbo_clients_
Definition: channel.h:259
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
Definition: channel.cc:453
int32_t atomic_int32
Definition: atomic.h:17
bool Listen(const std::string &locator)
Definition: channel.cc:701
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
void Terminate()
Definition: channel.cc:920
uint64_t NextTxnId()
Definition: channel.h:207
void AskToDetach()
Definition: channel.cc:163
static const unsigned kListingSize
Definition: channel.h:141
static void TlsDestructor(void *data)
Definition: channel.cc:115
void NotifySupervisor(char signal)
Definition: channel.cc:888
unsigned max_object_size_
Definition: channel.h:254
ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance)
Definition: channel.h:34
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int64_t no_shrink
Definition: channel.h:91
static const char kSignalDetach
Definition: channel.h:143
std::string description
Definition: channel.h:83
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
pthread_t thread_io_
Definition: channel.h:267
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
Definition: channel.cc:769
int fd_socket_lock_
Definition: channel.h:251
std::vector< ThreadLocalStorage * > tls_blocks_
Definition: channel.h:64
static const unsigned kPbProtocolVersion
Definition: channel.h:70
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
Definition: channel.cc:581
pthread_mutex_t * lock_tls_blocks_
Definition: channel.h:63
SessionCtx()
Definition: channel.cc:38
int fd_socket_
Definition: channel.h:250
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
unsigned max_object_size() const
Definition: channel.h:102
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
Definition: channel.cc:327
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:414
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
Definition: channel.cc:259
void SendDetachRequests()
Definition: channel.cc:906
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
Definition: channel.h:264
atomic_int64 next_lst_id_
Definition: channel.h:263
uint64_t used_bytes
Definition: channel.h:89
UniqueRequest(int64_t s, int64_t r)
Definition: channel.h:147
atomic_int32 running_
Definition: channel.h:252
unsigned num_workers_
Definition: channel.h:253
~SessionCtx()
Definition: channel.cc:46
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
Definition: channel.cc:610
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
Definition: channel.cc:786
pthread_key_t thread_local_storage_
Definition: channel.h:62
bool operator==(const UniqueRequest &other) const
Definition: channel.h:148
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
void WaitFor()
Definition: channel.cc:930
static const char kSignalTerminate
Definition: channel.h:142
static void size_t size
Definition: smalloc.h:54
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
std::string name_
Definition: channel.h:260
bool operator!=(const UniqueRequest &other) const
Definition: channel.h:152
atomic_int64 next_txn_id_
Definition: channel.h:262
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:285
int pipe_ctrl_[2]
Definition: channel.h:268
void LogSessionInfo(uint64_t session_id, const std::string &msg)
Definition: channel.cc:758
uint64_t NextSessionId()
Definition: channel.h:204
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:231
uint32_t MurmurHash2(const void *key, int len, uint32_t seed)
Definition: murmur.hxx:23
bool HandleRequest(int fd_con)
Definition: channel.cc:480
uint64_t size_bytes
Definition: channel.h:88
static SessionCtx * GetInstance()
Definition: channel.cc:59