Directory: | cvmfs/ |
---|---|
File: | cvmfs/cache_plugin/channel.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 39 | 42 | 92.9% |
Branches: | 9 | 16 | 56.2% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | #ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_ | ||
5 | #define CVMFS_CACHE_PLUGIN_CHANNEL_H_ | ||
6 | |||
7 | #include <pthread.h> | ||
8 | #include <stdint.h> | ||
9 | |||
10 | #include <cassert> | ||
11 | #include <map> | ||
12 | #include <set> | ||
13 | #include <string> | ||
14 | #include <vector> | ||
15 | |||
16 | #include "cache.pb.h" | ||
17 | #include "cache_transport.h" | ||
18 | #include "crypto/hash.h" | ||
19 | #include "manifest.h" | ||
20 | #include "smallhash.h" | ||
21 | #include "util/atomic.h" | ||
22 | #include "util/murmur.hxx" | ||
23 | #include "util/single_copy.h" | ||
24 | |||
25 | /** | ||
26 | * A SessionCtx stores the session information related to the current cache | ||
27 | * plugin callback in thread-local-storage. Singleton. | ||
28 | * | ||
29 | * TODO(jblomer): merge code with ClientCtx | ||
30 | */ | ||
31 | class SessionCtx : SingleCopy { | ||
32 | public: | ||
33 | struct ThreadLocalStorage { | ||
34 | 225 | ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance) | |
35 | 225 | : id(id) | |
36 | 225 | , reponame(reponame) | |
37 | 225 | , client_instance(client_instance) | |
38 | 225 | , is_set(true) { } | |
39 | |||
40 | uint64_t id; | ||
41 | char *reponame; | ||
42 | char *client_instance; | ||
43 | bool is_set; ///< either not yet set or deliberately unset | ||
44 | }; | ||
45 | |||
46 | static SessionCtx *GetInstance(); | ||
47 | static void CleanupInstance(); | ||
48 | ~SessionCtx(); | ||
49 | |||
50 | void Set(uint64_t id, char *reponame, char *client_instance); | ||
51 | void Unset(); | ||
52 | void Get(uint64_t *id, char **reponame, char **client_instance); | ||
53 | bool IsSet(); | ||
54 | |||
55 | private: | ||
56 | static SessionCtx *instance_; | ||
57 | static void TlsDestructor(void *data); | ||
58 | |||
59 | SessionCtx(); | ||
60 | |||
61 | pthread_key_t thread_local_storage_; | ||
62 | pthread_mutex_t *lock_tls_blocks_; | ||
63 | std::vector<ThreadLocalStorage *> tls_blocks_; | ||
64 | }; | ||
65 | |||
66 | |||
67 | class CachePlugin { | ||
68 | public: | ||
69 | static const unsigned kPbProtocolVersion = 1; | ||
70 | static const uint64_t kSizeUnknown; | ||
71 | |||
72 | struct ObjectInfo { | ||
73 | 495 | ObjectInfo() | |
74 | 495 | : id() | |
75 | 495 | , size(kSizeUnknown) | |
76 | 495 | , object_type(cvmfs::OBJECT_REGULAR) | |
77 | 495 | , pinned(false) { } | |
78 | shash::Any id; | ||
79 | uint64_t size; | ||
80 | cvmfs::EnumObjectType object_type; | ||
81 | bool pinned; | ||
82 | std::string description; | ||
83 | }; | ||
84 | |||
85 | struct Info { | ||
86 | 60 | Info() : size_bytes(0), used_bytes(0), pinned_bytes(0), no_shrink(-1) { } | |
87 | uint64_t size_bytes; | ||
88 | uint64_t used_bytes; | ||
89 | uint64_t pinned_bytes; | ||
90 | int64_t no_shrink; | ||
91 | }; | ||
92 | |||
93 | bool Listen(const std::string &locator); | ||
94 | virtual ~CachePlugin(); | ||
95 | void ProcessRequests(unsigned num_workers); | ||
96 | bool IsRunning(); | ||
97 | void Terminate(); | ||
98 | void WaitFor(); | ||
99 | void AskToDetach(); | ||
100 | |||
101 | ✗ | unsigned max_object_size() const { return max_object_size_; } | |
102 | uint64_t capabilities() const { return capabilities_; } | ||
103 | |||
104 | protected: | ||
105 | explicit CachePlugin(uint64_t capabilities); | ||
106 | |||
107 | virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, | ||
108 | int32_t change_by) = 0; | ||
109 | virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, | ||
110 | ObjectInfo *info) = 0; | ||
111 | virtual cvmfs::EnumStatus Pread(const shash::Any &id, | ||
112 | uint64_t offset, | ||
113 | uint32_t *size, | ||
114 | unsigned char *buffer) = 0; | ||
115 | virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, | ||
116 | const uint64_t txn_id, | ||
117 | const ObjectInfo &info) = 0; | ||
118 | virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, | ||
119 | unsigned char *buffer, | ||
120 | uint32_t size) = 0; | ||
121 | virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0; | ||
122 | virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0; | ||
123 | |||
124 | virtual cvmfs::EnumStatus GetInfo(Info *info) = 0; | ||
125 | virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, | ||
126 | uint64_t *used_bytes) = 0; | ||
127 | virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, | ||
128 | cvmfs::EnumObjectType type) = 0; | ||
129 | virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item) = 0; | ||
130 | virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0; | ||
131 | |||
132 | virtual cvmfs::EnumStatus LoadBreadcrumb( | ||
133 | const std::string &fqrn, manifest::Breadcrumb *breadcrumb) = 0; | ||
134 | virtual cvmfs::EnumStatus StoreBreadcrumb( | ||
135 | const std::string &fqrn, const manifest::Breadcrumb &breadcrumb) = 0; | ||
136 | |||
137 | private: | ||
138 | static const unsigned kDefaultMaxObjectSize = 256 * 1024; // 256kB | ||
139 | static const unsigned kListingSize = 4 * 1024 * 1024; // 4MB | ||
140 | static const char kSignalTerminate = 'q'; | ||
141 | static const char kSignalDetach = 'd'; | ||
142 | |||
143 | struct UniqueRequest { | ||
144 | 87210 | UniqueRequest() : session_id(-1), req_id(-1) { } | |
145 | 9135 | UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { } | |
146 | 18585 | bool operator==(const UniqueRequest &other) const { | |
147 | 18585 | return (this->session_id == other.session_id) | |
148 |
3/4✓ Branch 0 taken 9450 times.
✓ Branch 1 taken 9135 times.
✓ Branch 2 taken 9450 times.
✗ Branch 3 not taken.
|
18585 | && (this->req_id == other.req_id); |
149 | } | ||
150 | ✗ | bool operator!=(const UniqueRequest &other) const { | |
151 | ✗ | return !(*this == other); | |
152 | } | ||
153 | |||
154 | int64_t session_id; | ||
155 | int64_t req_id; | ||
156 | }; | ||
157 | |||
158 | /** | ||
159 | * The char pointers are prepared on Handshake and removed when the session | ||
160 | * closes. They are created to be consumed by the cvmcache_get_session() API. | ||
161 | */ | ||
162 | struct SessionInfo { | ||
163 | 270 | SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { } | |
164 | SessionInfo(uint64_t id, const std::string &name); | ||
165 | |||
166 | uint64_t id; | ||
167 | std::string name; | ||
168 | char *reponame; | ||
169 | char *client_instance; | ||
170 | }; | ||
171 | |||
172 | /** | ||
173 | * RAII form of the SessionCtx. On construction, automatically sets the | ||
174 | * session context if the session id is found. On destruction, unsets the | ||
175 | * session information. | ||
176 | */ | ||
177 | class SessionCtxGuard { | ||
178 | public: | ||
179 | 51285 | SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) { | |
180 | 51285 | char *reponame = NULL; | |
181 | 51285 | char *client_instance = NULL; | |
182 | const std::map<uint64_t, SessionInfo>::const_iterator iter = | ||
183 |
1/2✓ Branch 1 taken 51285 times.
✗ Branch 2 not taken.
|
51285 | plugin->sessions_.find(session_id); |
184 |
1/2✓ Branch 3 taken 51285 times.
✗ Branch 4 not taken.
|
51285 | if (iter != plugin->sessions_.end()) { |
185 | 51285 | reponame = iter->second.reponame; | |
186 | 51285 | client_instance = iter->second.client_instance; | |
187 | } | ||
188 |
1/2✓ Branch 1 taken 51285 times.
✗ Branch 2 not taken.
|
51285 | SessionCtx *session_ctx = SessionCtx::GetInstance(); |
189 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51285 times.
|
51285 | assert(session_ctx); |
190 |
1/2✓ Branch 1 taken 51285 times.
✗ Branch 2 not taken.
|
51285 | session_ctx->Set(session_id, reponame, client_instance); |
191 | 51285 | } | |
192 | |||
193 | 51285 | ~SessionCtxGuard() { | |
194 | 51285 | SessionCtx *session_ctx = SessionCtx::GetInstance(); | |
195 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 51285 times.
|
51285 | assert(session_ctx); |
196 | 51285 | session_ctx->Unset(); | |
197 | 51285 | } | |
198 | }; | ||
199 | |||
200 | static void *MainProcessRequests(void *data); | ||
201 | |||
202 | 270 | inline uint64_t NextSessionId() { | |
203 | 270 | return atomic_xadd64(&next_session_id_, 1); | |
204 | } | ||
205 | 105 | inline uint64_t NextTxnId() { return atomic_xadd64(&next_txn_id_, 1); } | |
206 | 90 | inline uint64_t NextLstId() { return atomic_xadd64(&next_lst_id_, 1); } | |
207 | 9345 | static inline uint32_t HashUniqueRequest(const UniqueRequest &req) { | |
208 | 9345 | return MurmurHash2(&req, sizeof(req), 0x07387a4f); | |
209 | } | ||
210 | |||
211 | bool HandleRequest(int fd_con); | ||
212 | void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport); | ||
213 | void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, | ||
214 | CacheTransport *transport); | ||
215 | void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, | ||
216 | CacheTransport *transport); | ||
217 | void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport); | ||
218 | void HandleStore(cvmfs::MsgStoreReq *msg_req, | ||
219 | CacheTransport::Frame *frame, | ||
220 | CacheTransport *transport); | ||
221 | void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, | ||
222 | CacheTransport *transport); | ||
223 | void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport); | ||
224 | void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport); | ||
225 | void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport); | ||
226 | void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, | ||
227 | CacheTransport *transport); | ||
228 | void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, | ||
229 | CacheTransport *transport); | ||
230 | void HandleIoctl(cvmfs::MsgIoctl *msg_req); | ||
231 | void SendDetachRequests(); | ||
232 | |||
233 | void NotifySupervisor(char signal); | ||
234 | |||
235 | void LogSessionError(uint64_t session_id, | ||
236 | cvmfs::EnumStatus status, | ||
237 | const std::string &msg); | ||
238 | void LogSessionInfo(uint64_t session_id, const std::string &msg); | ||
239 | |||
240 | bool is_local_; | ||
241 | uint64_t capabilities_; | ||
242 | int fd_socket_; | ||
243 | int fd_socket_lock_; | ||
244 | atomic_int32 running_; | ||
245 | unsigned num_workers_; | ||
246 | unsigned max_object_size_; | ||
247 | /** | ||
248 | * Number of clients undergoing a reload, i.e. they promise to come back | ||
249 | * and open a new connection soon. | ||
250 | */ | ||
251 | uint64_t num_inlimbo_clients_; | ||
252 | std::string name_; | ||
253 | atomic_int64 next_session_id_; | ||
254 | atomic_int64 next_txn_id_; | ||
255 | atomic_int64 next_lst_id_; | ||
256 | SmallHashDynamic<UniqueRequest, uint64_t> txn_ids_; | ||
257 | std::set<int> connections_; | ||
258 | std::map<uint64_t, SessionInfo> sessions_; | ||
259 | pthread_t thread_io_; | ||
260 | int pipe_ctrl_[2]; | ||
261 | }; // class CachePlugin | ||
262 | |||
263 | #endif // CVMFS_CACHE_PLUGIN_CHANNEL_H_ | ||
264 |