1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
#ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_ |
5 |
|
|
#define CVMFS_CACHE_PLUGIN_CHANNEL_H_ |
6 |
|
|
|
7 |
|
|
#include <pthread.h> |
8 |
|
|
#include <stdint.h> |
9 |
|
|
|
10 |
|
|
#include <cassert> |
11 |
|
|
#include <map> |
12 |
|
|
#include <set> |
13 |
|
|
#include <string> |
14 |
|
|
#include <vector> |
15 |
|
|
|
16 |
|
|
#include "atomic.h" |
17 |
|
|
#include "cache.pb.h" |
18 |
|
|
#include "cache_transport.h" |
19 |
|
|
#include "hash.h" |
20 |
|
|
#include "murmur.h" |
21 |
|
|
#include "smallhash.h" |
22 |
|
|
#include "util/single_copy.h" |
23 |
|
|
|
24 |
|
|
/** |
25 |
|
|
* A SessionCtx stores the session information related to the current cache |
26 |
|
|
* plugin callback in thread-local-storage. Singleton. |
27 |
|
|
* |
28 |
|
|
* TODO(jblomer): merge code with ClientCtx |
29 |
|
|
*/ |
30 |
|
|
class SessionCtx : SingleCopy { |
31 |
|
|
public: |
32 |
|
|
struct ThreadLocalStorage { |
33 |
|
14 |
ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance) |
34 |
|
|
: id(id) |
35 |
|
|
, reponame(reponame) |
36 |
|
|
, client_instance(client_instance) |
37 |
|
14 |
, is_set(true) |
38 |
|
14 |
{ } |
39 |
|
|
|
40 |
|
|
uint64_t id; |
41 |
|
|
char *reponame; |
42 |
|
|
char *client_instance; |
43 |
|
|
bool is_set; ///< either not yet set or deliberately unset |
44 |
|
|
}; |
45 |
|
|
|
46 |
|
|
static SessionCtx *GetInstance(); |
47 |
|
|
static void CleanupInstance(); |
48 |
|
|
~SessionCtx(); |
49 |
|
|
|
50 |
|
|
void Set(uint64_t id, char *reponame, char *client_instance); |
51 |
|
|
void Unset(); |
52 |
|
|
void Get(uint64_t *id, char **reponame, char **client_instance); |
53 |
|
|
bool IsSet(); |
54 |
|
|
|
55 |
|
|
private: |
56 |
|
|
static SessionCtx *instance_; |
57 |
|
|
static void TlsDestructor(void *data); |
58 |
|
|
|
59 |
|
|
SessionCtx(); |
60 |
|
|
|
61 |
|
|
pthread_key_t thread_local_storage_; |
62 |
|
|
pthread_mutex_t *lock_tls_blocks_; |
63 |
|
|
std::vector<ThreadLocalStorage *> tls_blocks_; |
64 |
|
|
}; |
65 |
|
|
|
66 |
|
|
|
67 |
|
|
class CachePlugin { |
68 |
|
|
public: |
69 |
|
|
static const unsigned kPbProtocolVersion = 1; |
70 |
|
|
static const uint64_t kSizeUnknown; |
71 |
|
|
|
72 |
|
31 |
struct ObjectInfo { |
73 |
|
31 |
ObjectInfo() |
74 |
|
|
: id() |
75 |
|
|
, size(kSizeUnknown) |
76 |
|
|
, object_type(cvmfs::OBJECT_REGULAR) |
77 |
|
31 |
, pinned(false) { } |
78 |
|
|
shash::Any id; |
79 |
|
|
uint64_t size; |
80 |
|
|
cvmfs::EnumObjectType object_type; |
81 |
|
|
bool pinned; |
82 |
|
|
std::string description; |
83 |
|
|
}; |
84 |
|
|
|
85 |
|
|
struct Info { |
86 |
|
4 |
Info() : size_bytes(0), used_bytes(0), pinned_bytes(0), no_shrink(-1) { } |
87 |
|
|
uint64_t size_bytes; |
88 |
|
|
uint64_t used_bytes; |
89 |
|
|
uint64_t pinned_bytes; |
90 |
|
|
int64_t no_shrink; |
91 |
|
|
}; |
92 |
|
|
|
93 |
|
|
bool Listen(const std::string &locator); |
94 |
|
|
virtual ~CachePlugin(); |
95 |
|
|
void ProcessRequests(unsigned num_workers); |
96 |
|
|
bool IsRunning(); |
97 |
|
|
void Terminate(); |
98 |
|
|
void WaitFor(); |
99 |
|
|
void AskToDetach(); |
100 |
|
|
|
101 |
|
|
unsigned max_object_size() const { return max_object_size_; } |
102 |
|
|
uint64_t capabilities() const { return capabilities_; } |
103 |
|
|
|
104 |
|
|
protected: |
105 |
|
|
explicit CachePlugin(uint64_t capabilities); |
106 |
|
|
|
107 |
|
|
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, |
108 |
|
|
int32_t change_by) = 0; |
109 |
|
|
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, |
110 |
|
|
ObjectInfo *info) = 0; |
111 |
|
|
virtual cvmfs::EnumStatus Pread(const shash::Any &id, |
112 |
|
|
uint64_t offset, |
113 |
|
|
uint32_t *size, |
114 |
|
|
unsigned char *buffer) = 0; |
115 |
|
|
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, |
116 |
|
|
const uint64_t txn_id, |
117 |
|
|
const ObjectInfo &info) = 0; |
118 |
|
|
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, |
119 |
|
|
unsigned char *buffer, |
120 |
|
|
uint32_t size) = 0; |
121 |
|
|
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0; |
122 |
|
|
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0; |
123 |
|
|
|
124 |
|
|
virtual cvmfs::EnumStatus GetInfo(Info *info) = 0; |
125 |
|
|
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, |
126 |
|
|
uint64_t *used_bytes) = 0; |
127 |
|
|
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, |
128 |
|
|
cvmfs::EnumObjectType type) = 0; |
129 |
|
|
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, |
130 |
|
|
ObjectInfo *item) = 0; |
131 |
|
|
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0; |
132 |
|
|
|
133 |
|
|
private: |
134 |
|
|
static const unsigned kDefaultMaxObjectSize = 256 * 1024; // 256kB |
135 |
|
|
static const unsigned kListingSize = 4 * 1024 * 1024; // 4MB |
136 |
|
|
static const char kSignalTerminate = 'q'; |
137 |
|
|
static const char kSignalDetach = 'd'; |
138 |
|
|
|
139 |
|
5440 |
struct UniqueRequest { |
140 |
|
5472 |
UniqueRequest() : session_id(-1), req_id(-1) { } |
141 |
|
609 |
UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { } |
142 |
|
1239 |
bool operator ==(const UniqueRequest &other) const { |
143 |
|
|
return (this->session_id == other.session_id) && |
144 |
✓✓✓✗
|
1239 |
(this->req_id == other.req_id); |
145 |
|
|
} |
146 |
|
|
bool operator !=(const UniqueRequest &other) const { |
147 |
|
|
return !(*this == other); |
148 |
|
|
} |
149 |
|
|
|
150 |
|
|
int64_t session_id; |
151 |
|
|
int64_t req_id; |
152 |
|
|
}; |
153 |
|
|
|
154 |
|
|
/** |
155 |
|
|
* The char pointers are prepared on Handshake and removed when the session |
156 |
|
|
* closes. They are created to be consumed by the cvmcache_get_session() API. |
157 |
|
|
*/ |
158 |
|
119 |
struct SessionInfo { |
159 |
|
17 |
SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { } |
160 |
|
|
SessionInfo(uint64_t id, const std::string &name); |
161 |
|
|
|
162 |
|
|
uint64_t id; |
163 |
|
|
std::string name; |
164 |
|
|
char *reponame; |
165 |
|
|
char *client_instance; |
166 |
|
|
}; |
167 |
|
|
|
168 |
|
|
/** |
169 |
|
|
* RAII form of the SessionCtx. On construction, automatically sets the |
170 |
|
|
* session context if the session id is found. On destruction, unsets the |
171 |
|
|
* session information. |
172 |
|
|
*/ |
173 |
|
|
class SessionCtxGuard { |
174 |
|
|
public: |
175 |
|
3414 |
SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) { |
176 |
|
3414 |
char *reponame = NULL; |
177 |
|
3414 |
char *client_instance = NULL; |
178 |
|
|
std::map<uint64_t, SessionInfo>::const_iterator iter = |
179 |
|
3414 |
plugin->sessions_.find(session_id); |
180 |
✓✗ |
3414 |
if (iter != plugin->sessions_.end()) { |
181 |
|
3414 |
reponame = iter->second.reponame; |
182 |
|
3414 |
client_instance = iter->second.client_instance; |
183 |
|
|
} |
184 |
|
3414 |
SessionCtx *session_ctx = SessionCtx::GetInstance(); |
185 |
✗✓ |
3414 |
assert(session_ctx); |
186 |
|
3414 |
session_ctx->Set(session_id, reponame, client_instance); |
187 |
|
3414 |
} |
188 |
|
|
|
189 |
|
3414 |
~SessionCtxGuard() { |
190 |
|
3414 |
SessionCtx *session_ctx = SessionCtx::GetInstance(); |
191 |
✗✓ |
3414 |
assert(session_ctx); |
192 |
|
3414 |
session_ctx->Unset(); |
193 |
|
3414 |
} |
194 |
|
|
}; |
195 |
|
|
|
196 |
|
|
static void *MainProcessRequests(void *data); |
197 |
|
|
|
198 |
|
17 |
inline uint64_t NextSessionId() { |
199 |
|
17 |
return atomic_xadd64(&next_session_id_, 1); |
200 |
|
|
} |
201 |
|
7 |
inline uint64_t NextTxnId() { |
202 |
|
7 |
return atomic_xadd64(&next_txn_id_, 1); |
203 |
|
|
} |
204 |
|
6 |
inline uint64_t NextLstId() { |
205 |
|
6 |
return atomic_xadd64(&next_lst_id_, 1); |
206 |
|
|
} |
207 |
|
623 |
static inline uint32_t HashUniqueRequest(const UniqueRequest &req) { |
208 |
|
623 |
return MurmurHash2(&req, sizeof(req), 0x07387a4f); |
209 |
|
|
} |
210 |
|
|
|
211 |
|
|
bool HandleRequest(int fd_con); |
212 |
|
|
void HandleHandshake(cvmfs::MsgHandshake *msg_req, |
213 |
|
|
CacheTransport *transport); |
214 |
|
|
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, |
215 |
|
|
CacheTransport *transport); |
216 |
|
|
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, |
217 |
|
|
CacheTransport *transport); |
218 |
|
|
void HandleRead(cvmfs::MsgReadReq *msg_req, |
219 |
|
|
CacheTransport *transport); |
220 |
|
|
void HandleStore(cvmfs::MsgStoreReq *msg_req, |
221 |
|
|
CacheTransport::Frame *frame, |
222 |
|
|
CacheTransport *transport); |
223 |
|
|
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, |
224 |
|
|
CacheTransport *transport); |
225 |
|
|
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport); |
226 |
|
|
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport); |
227 |
|
|
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport); |
228 |
|
|
void HandleIoctl(cvmfs::MsgIoctl *msg_req); |
229 |
|
|
void SendDetachRequests(); |
230 |
|
|
|
231 |
|
|
void NotifySupervisor(char signal); |
232 |
|
|
|
233 |
|
|
void LogSessionError(uint64_t session_id, |
234 |
|
|
cvmfs::EnumStatus status, |
235 |
|
|
const std::string &msg); |
236 |
|
|
void LogSessionInfo(uint64_t session_id, const std::string &msg); |
237 |
|
|
|
238 |
|
|
bool is_local_; |
239 |
|
|
uint64_t capabilities_; |
240 |
|
|
int fd_socket_; |
241 |
|
|
int fd_socket_lock_; |
242 |
|
|
atomic_int32 running_; |
243 |
|
|
unsigned num_workers_; |
244 |
|
|
unsigned max_object_size_; |
245 |
|
|
/** |
246 |
|
|
* Number of clients undergoing a reload, i.e. they promise to come back |
247 |
|
|
* and open a new connection soon. |
248 |
|
|
*/ |
249 |
|
|
uint64_t num_inlimbo_clients_; |
250 |
|
|
std::string name_; |
251 |
|
|
atomic_int64 next_session_id_; |
252 |
|
|
atomic_int64 next_txn_id_; |
253 |
|
|
atomic_int64 next_lst_id_; |
254 |
|
|
SmallHashDynamic<UniqueRequest, uint64_t> txn_ids_; |
255 |
|
|
std::set<int> connections_; |
256 |
|
|
std::map<uint64_t, SessionInfo> sessions_; |
257 |
|
|
pthread_t thread_io_; |
258 |
|
|
int pipe_ctrl_[2]; |
259 |
|
|
}; // class CachePlugin |
260 |
|
|
|
261 |
|
|
#endif // CVMFS_CACHE_PLUGIN_CHANNEL_H_ |