Directory: | cvmfs/ |
---|---|
File: | cvmfs/cache_plugin/channel.h |
Date: | 2025-04-20 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 42 | 45 | 93.3% |
Branches: | 9 | 16 | 56.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | #ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_ | ||
5 | #define CVMFS_CACHE_PLUGIN_CHANNEL_H_ | ||
6 | |||
7 | #include <pthread.h> | ||
8 | #include <stdint.h> | ||
9 | |||
10 | #include <cassert> | ||
11 | #include <map> | ||
12 | #include <set> | ||
13 | #include <string> | ||
14 | #include <vector> | ||
15 | |||
16 | #include "cache.pb.h" | ||
17 | #include "cache_transport.h" | ||
18 | #include "crypto/hash.h" | ||
19 | #include "manifest.h" | ||
20 | #include "smallhash.h" | ||
21 | #include "util/atomic.h" | ||
22 | #include "util/murmur.hxx" | ||
23 | #include "util/single_copy.h" | ||
24 | |||
25 | /** | ||
26 | * A SessionCtx stores the session information related to the current cache | ||
27 | * plugin callback in thread-local-storage. Singleton. | ||
28 | * | ||
29 | * TODO(jblomer): merge code with ClientCtx | ||
30 | */ | ||
31 | class SessionCtx : SingleCopy { | ||
32 | public: | ||
33 | struct ThreadLocalStorage { | ||
34 | 15 | ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance) | |
35 | 15 | : id(id) | |
36 | 15 | , reponame(reponame) | |
37 | 15 | , client_instance(client_instance) | |
38 | 15 | , is_set(true) | |
39 | 15 | { } | |
40 | |||
41 | uint64_t id; | ||
42 | char *reponame; | ||
43 | char *client_instance; | ||
44 | bool is_set; ///< either not yet set or deliberately unset | ||
45 | }; | ||
46 | |||
47 | static SessionCtx *GetInstance(); | ||
48 | static void CleanupInstance(); | ||
49 | ~SessionCtx(); | ||
50 | |||
51 | void Set(uint64_t id, char *reponame, char *client_instance); | ||
52 | void Unset(); | ||
53 | void Get(uint64_t *id, char **reponame, char **client_instance); | ||
54 | bool IsSet(); | ||
55 | |||
56 | private: | ||
57 | static SessionCtx *instance_; | ||
58 | static void TlsDestructor(void *data); | ||
59 | |||
60 | SessionCtx(); | ||
61 | |||
62 | pthread_key_t thread_local_storage_; | ||
63 | pthread_mutex_t *lock_tls_blocks_; | ||
64 | std::vector<ThreadLocalStorage *> tls_blocks_; | ||
65 | }; | ||
66 | |||
67 | |||
68 | class CachePlugin { | ||
69 | public: | ||
70 | static const unsigned kPbProtocolVersion = 1; | ||
71 | static const uint64_t kSizeUnknown; | ||
72 | |||
73 | struct ObjectInfo { | ||
74 | 33 | ObjectInfo() | |
75 | 33 | : id() | |
76 | 33 | , size(kSizeUnknown) | |
77 | 33 | , object_type(cvmfs::OBJECT_REGULAR) | |
78 | 33 | , pinned(false) { } | |
79 | shash::Any id; | ||
80 | uint64_t size; | ||
81 | cvmfs::EnumObjectType object_type; | ||
82 | bool pinned; | ||
83 | std::string description; | ||
84 | }; | ||
85 | |||
86 | struct Info { | ||
87 | 4 | Info() : size_bytes(0), used_bytes(0), pinned_bytes(0), no_shrink(-1) { } | |
88 | uint64_t size_bytes; | ||
89 | uint64_t used_bytes; | ||
90 | uint64_t pinned_bytes; | ||
91 | int64_t no_shrink; | ||
92 | }; | ||
93 | |||
94 | bool Listen(const std::string &locator); | ||
95 | virtual ~CachePlugin(); | ||
96 | void ProcessRequests(unsigned num_workers); | ||
97 | bool IsRunning(); | ||
98 | void Terminate(); | ||
99 | void WaitFor(); | ||
100 | void AskToDetach(); | ||
101 | |||
102 | ✗ | unsigned max_object_size() const { return max_object_size_; } | |
103 | uint64_t capabilities() const { return capabilities_; } | ||
104 | |||
105 | protected: | ||
106 | explicit CachePlugin(uint64_t capabilities); | ||
107 | |||
108 | virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, | ||
109 | int32_t change_by) = 0; | ||
110 | virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, | ||
111 | ObjectInfo *info) = 0; | ||
112 | virtual cvmfs::EnumStatus Pread(const shash::Any &id, | ||
113 | uint64_t offset, | ||
114 | uint32_t *size, | ||
115 | unsigned char *buffer) = 0; | ||
116 | virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, | ||
117 | const uint64_t txn_id, | ||
118 | const ObjectInfo &info) = 0; | ||
119 | virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, | ||
120 | unsigned char *buffer, | ||
121 | uint32_t size) = 0; | ||
122 | virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0; | ||
123 | virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0; | ||
124 | |||
125 | virtual cvmfs::EnumStatus GetInfo(Info *info) = 0; | ||
126 | virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, | ||
127 | uint64_t *used_bytes) = 0; | ||
128 | virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, | ||
129 | cvmfs::EnumObjectType type) = 0; | ||
130 | virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, | ||
131 | ObjectInfo *item) = 0; | ||
132 | virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0; | ||
133 | |||
134 | virtual cvmfs::EnumStatus LoadBreadcrumb( | ||
135 | const std::string &fqrn, manifest::Breadcrumb *breadcrumb) = 0; | ||
136 | virtual cvmfs::EnumStatus StoreBreadcrumb( | ||
137 | const std::string &fqrn, const manifest::Breadcrumb &breadcrumb) = 0; | ||
138 | |||
139 | private: | ||
140 | static const unsigned kDefaultMaxObjectSize = 256 * 1024; // 256kB | ||
141 | static const unsigned kListingSize = 4 * 1024 * 1024; // 4MB | ||
142 | static const char kSignalTerminate = 'q'; | ||
143 | static const char kSignalDetach = 'd'; | ||
144 | |||
145 | struct UniqueRequest { | ||
146 | 5814 | UniqueRequest() : session_id(-1), req_id(-1) { } | |
147 | 609 | UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { } | |
148 | 1239 | bool operator ==(const UniqueRequest &other) const { | |
149 |
2/2✓ Branch 0 taken 630 times.
✓ Branch 1 taken 609 times.
|
1869 | return (this->session_id == other.session_id) && |
150 |
1/2✓ Branch 0 taken 630 times.
✗ Branch 1 not taken.
|
1869 | (this->req_id == other.req_id); |
151 | } | ||
152 | ✗ | bool operator !=(const UniqueRequest &other) const { | |
153 | ✗ | return !(*this == other); | |
154 | } | ||
155 | |||
156 | int64_t session_id; | ||
157 | int64_t req_id; | ||
158 | }; | ||
159 | |||
160 | /** | ||
161 | * The char pointers are prepared on Handshake and removed when the session | ||
162 | * closes. They are created to be consumed by the cvmcache_get_session() API. | ||
163 | */ | ||
164 | struct SessionInfo { | ||
165 | 18 | SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { } | |
166 | SessionInfo(uint64_t id, const std::string &name); | ||
167 | |||
168 | uint64_t id; | ||
169 | std::string name; | ||
170 | char *reponame; | ||
171 | char *client_instance; | ||
172 | }; | ||
173 | |||
174 | /** | ||
175 | * RAII form of the SessionCtx. On construction, automatically sets the | ||
176 | * session context if the session id is found. On destruction, unsets the | ||
177 | * session information. | ||
178 | */ | ||
179 | class SessionCtxGuard { | ||
180 | public: | ||
181 | 3419 | SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) { | |
182 | 3419 | char *reponame = NULL; | |
183 | 3419 | char *client_instance = NULL; | |
184 | std::map<uint64_t, SessionInfo>::const_iterator iter = | ||
185 |
1/2✓ Branch 1 taken 3419 times.
✗ Branch 2 not taken.
|
3419 | plugin->sessions_.find(session_id); |
186 |
1/2✓ Branch 3 taken 3419 times.
✗ Branch 4 not taken.
|
3419 | if (iter != plugin->sessions_.end()) { |
187 | 3419 | reponame = iter->second.reponame; | |
188 | 3419 | client_instance = iter->second.client_instance; | |
189 | } | ||
190 |
1/2✓ Branch 1 taken 3419 times.
✗ Branch 2 not taken.
|
3419 | SessionCtx *session_ctx = SessionCtx::GetInstance(); |
191 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3419 times.
|
3419 | assert(session_ctx); |
192 |
1/2✓ Branch 1 taken 3419 times.
✗ Branch 2 not taken.
|
3419 | session_ctx->Set(session_id, reponame, client_instance); |
193 | 3419 | } | |
194 | |||
195 | 3419 | ~SessionCtxGuard() { | |
196 | 3419 | SessionCtx *session_ctx = SessionCtx::GetInstance(); | |
197 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3419 times.
|
3419 | assert(session_ctx); |
198 | 3419 | session_ctx->Unset(); | |
199 | 3419 | } | |
200 | }; | ||
201 | |||
202 | static void *MainProcessRequests(void *data); | ||
203 | |||
204 | 18 | inline uint64_t NextSessionId() { | |
205 | 18 | return atomic_xadd64(&next_session_id_, 1); | |
206 | } | ||
207 | 7 | inline uint64_t NextTxnId() { | |
208 | 7 | return atomic_xadd64(&next_txn_id_, 1); | |
209 | } | ||
210 | 6 | inline uint64_t NextLstId() { | |
211 | 6 | return atomic_xadd64(&next_lst_id_, 1); | |
212 | } | ||
213 | 623 | static inline uint32_t HashUniqueRequest(const UniqueRequest &req) { | |
214 | 623 | return MurmurHash2(&req, sizeof(req), 0x07387a4f); | |
215 | } | ||
216 | |||
217 | bool HandleRequest(int fd_con); | ||
218 | void HandleHandshake(cvmfs::MsgHandshake *msg_req, | ||
219 | CacheTransport *transport); | ||
220 | void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, | ||
221 | CacheTransport *transport); | ||
222 | void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, | ||
223 | CacheTransport *transport); | ||
224 | void HandleRead(cvmfs::MsgReadReq *msg_req, | ||
225 | CacheTransport *transport); | ||
226 | void HandleStore(cvmfs::MsgStoreReq *msg_req, | ||
227 | CacheTransport::Frame *frame, | ||
228 | CacheTransport *transport); | ||
229 | void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, | ||
230 | CacheTransport *transport); | ||
231 | void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport); | ||
232 | void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport); | ||
233 | void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport); | ||
234 | void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, | ||
235 | CacheTransport *transport); | ||
236 | void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, | ||
237 | CacheTransport *transport); | ||
238 | void HandleIoctl(cvmfs::MsgIoctl *msg_req); | ||
239 | void SendDetachRequests(); | ||
240 | |||
241 | void NotifySupervisor(char signal); | ||
242 | |||
243 | void LogSessionError(uint64_t session_id, | ||
244 | cvmfs::EnumStatus status, | ||
245 | const std::string &msg); | ||
246 | void LogSessionInfo(uint64_t session_id, const std::string &msg); | ||
247 | |||
248 | bool is_local_; | ||
249 | uint64_t capabilities_; | ||
250 | int fd_socket_; | ||
251 | int fd_socket_lock_; | ||
252 | atomic_int32 running_; | ||
253 | unsigned num_workers_; | ||
254 | unsigned max_object_size_; | ||
255 | /** | ||
256 | * Number of clients undergoing a reload, i.e. they promise to come back | ||
257 | * and open a new connection soon. | ||
258 | */ | ||
259 | uint64_t num_inlimbo_clients_; | ||
260 | std::string name_; | ||
261 | atomic_int64 next_session_id_; | ||
262 | atomic_int64 next_txn_id_; | ||
263 | atomic_int64 next_lst_id_; | ||
264 | SmallHashDynamic<UniqueRequest, uint64_t> txn_ids_; | ||
265 | std::set<int> connections_; | ||
266 | std::map<uint64_t, SessionInfo> sessions_; | ||
267 | pthread_t thread_io_; | ||
268 | int pipe_ctrl_[2]; | ||
269 | }; // class CachePlugin | ||
270 | |||
271 | #endif // CVMFS_CACHE_PLUGIN_CHANNEL_H_ | ||
272 |