GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_plugin/channel.h
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 42 45 93.3%
Branches: 9 16 56.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_
5 #define CVMFS_CACHE_PLUGIN_CHANNEL_H_
6
7 #include <pthread.h>
8 #include <stdint.h>
9
10 #include <cassert>
11 #include <map>
12 #include <set>
13 #include <string>
14 #include <vector>
15
16 #include "cache.pb.h"
17 #include "cache_transport.h"
18 #include "crypto/hash.h"
19 #include "manifest.h"
20 #include "smallhash.h"
21 #include "util/atomic.h"
22 #include "util/murmur.hxx"
23 #include "util/single_copy.h"
24
25 /**
26 * A SessionCtx stores the session information related to the current cache
27 * plugin callback in thread-local-storage. Singleton.
28 *
29 * TODO(jblomer): merge code with ClientCtx
30 */
31 class SessionCtx : SingleCopy {
32 public:
33 struct ThreadLocalStorage {
34 15 ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance)
35 15 : id(id)
36 15 , reponame(reponame)
37 15 , client_instance(client_instance)
38 15 , is_set(true)
39 15 { }
40
41 uint64_t id;
42 char *reponame;
43 char *client_instance;
44 bool is_set; ///< either not yet set or deliberately unset
45 };
46
47 static SessionCtx *GetInstance();
48 static void CleanupInstance();
49 ~SessionCtx();
50
51 void Set(uint64_t id, char *reponame, char *client_instance);
52 void Unset();
53 void Get(uint64_t *id, char **reponame, char **client_instance);
54 bool IsSet();
55
56 private:
57 static SessionCtx *instance_;
58 static void TlsDestructor(void *data);
59
60 SessionCtx();
61
62 pthread_key_t thread_local_storage_;
63 pthread_mutex_t *lock_tls_blocks_;
64 std::vector<ThreadLocalStorage *> tls_blocks_;
65 };
66
67
68 class CachePlugin {
69 public:
70 static const unsigned kPbProtocolVersion = 1;
71 static const uint64_t kSizeUnknown;
72
73 struct ObjectInfo {
74 33 ObjectInfo()
75 33 : id()
76 33 , size(kSizeUnknown)
77 33 , object_type(cvmfs::OBJECT_REGULAR)
78 33 , pinned(false) { }
79 shash::Any id;
80 uint64_t size;
81 cvmfs::EnumObjectType object_type;
82 bool pinned;
83 std::string description;
84 };
85
86 struct Info {
87 4 Info() : size_bytes(0), used_bytes(0), pinned_bytes(0), no_shrink(-1) { }
88 uint64_t size_bytes;
89 uint64_t used_bytes;
90 uint64_t pinned_bytes;
91 int64_t no_shrink;
92 };
93
94 bool Listen(const std::string &locator);
95 virtual ~CachePlugin();
96 void ProcessRequests(unsigned num_workers);
97 bool IsRunning();
98 void Terminate();
99 void WaitFor();
100 void AskToDetach();
101
102 unsigned max_object_size() const { return max_object_size_; }
103 uint64_t capabilities() const { return capabilities_; }
104
105 protected:
106 explicit CachePlugin(uint64_t capabilities);
107
108 virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id,
109 int32_t change_by) = 0;
110 virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id,
111 ObjectInfo *info) = 0;
112 virtual cvmfs::EnumStatus Pread(const shash::Any &id,
113 uint64_t offset,
114 uint32_t *size,
115 unsigned char *buffer) = 0;
116 virtual cvmfs::EnumStatus StartTxn(const shash::Any &id,
117 const uint64_t txn_id,
118 const ObjectInfo &info) = 0;
119 virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id,
120 unsigned char *buffer,
121 uint32_t size) = 0;
122 virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0;
123 virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0;
124
125 virtual cvmfs::EnumStatus GetInfo(Info *info) = 0;
126 virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to,
127 uint64_t *used_bytes) = 0;
128 virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id,
129 cvmfs::EnumObjectType type) = 0;
130 virtual cvmfs::EnumStatus ListingNext(int64_t lst_id,
131 ObjectInfo *item) = 0;
132 virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0;
133
134 virtual cvmfs::EnumStatus LoadBreadcrumb(
135 const std::string &fqrn, manifest::Breadcrumb *breadcrumb) = 0;
136 virtual cvmfs::EnumStatus StoreBreadcrumb(
137 const std::string &fqrn, const manifest::Breadcrumb &breadcrumb) = 0;
138
139 private:
140 static const unsigned kDefaultMaxObjectSize = 256 * 1024; // 256kB
141 static const unsigned kListingSize = 4 * 1024 * 1024; // 4MB
142 static const char kSignalTerminate = 'q';
143 static const char kSignalDetach = 'd';
144
145 struct UniqueRequest {
146 5814 UniqueRequest() : session_id(-1), req_id(-1) { }
147 609 UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { }
148 1239 bool operator ==(const UniqueRequest &other) const {
149
2/2
✓ Branch 0 taken 630 times.
✓ Branch 1 taken 609 times.
1869 return (this->session_id == other.session_id) &&
150
1/2
✓ Branch 0 taken 630 times.
✗ Branch 1 not taken.
1869 (this->req_id == other.req_id);
151 }
152 bool operator !=(const UniqueRequest &other) const {
153 return !(*this == other);
154 }
155
156 int64_t session_id;
157 int64_t req_id;
158 };
159
160 /**
161 * The char pointers are prepared on Handshake and removed when the session
162 * closes. They are created to be consumed by the cvmcache_get_session() API.
163 */
164 struct SessionInfo {
165 18 SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { }
166 SessionInfo(uint64_t id, const std::string &name);
167
168 uint64_t id;
169 std::string name;
170 char *reponame;
171 char *client_instance;
172 };
173
174 /**
175 * RAII form of the SessionCtx. On construction, automatically sets the
176 * session context if the session id is found. On destruction, unsets the
177 * session information.
178 */
179 class SessionCtxGuard {
180 public:
181 3419 SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) {
182 3419 char *reponame = NULL;
183 3419 char *client_instance = NULL;
184 std::map<uint64_t, SessionInfo>::const_iterator iter =
185
1/2
✓ Branch 1 taken 3419 times.
✗ Branch 2 not taken.
3419 plugin->sessions_.find(session_id);
186
1/2
✓ Branch 3 taken 3419 times.
✗ Branch 4 not taken.
3419 if (iter != plugin->sessions_.end()) {
187 3419 reponame = iter->second.reponame;
188 3419 client_instance = iter->second.client_instance;
189 }
190
1/2
✓ Branch 1 taken 3419 times.
✗ Branch 2 not taken.
3419 SessionCtx *session_ctx = SessionCtx::GetInstance();
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3419 times.
3419 assert(session_ctx);
192
1/2
✓ Branch 1 taken 3419 times.
✗ Branch 2 not taken.
3419 session_ctx->Set(session_id, reponame, client_instance);
193 3419 }
194
195 3419 ~SessionCtxGuard() {
196 3419 SessionCtx *session_ctx = SessionCtx::GetInstance();
197
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3419 times.
3419 assert(session_ctx);
198 3419 session_ctx->Unset();
199 3419 }
200 };
201
202 static void *MainProcessRequests(void *data);
203
204 18 inline uint64_t NextSessionId() {
205 18 return atomic_xadd64(&next_session_id_, 1);
206 }
207 7 inline uint64_t NextTxnId() {
208 7 return atomic_xadd64(&next_txn_id_, 1);
209 }
210 6 inline uint64_t NextLstId() {
211 6 return atomic_xadd64(&next_lst_id_, 1);
212 }
213 623 static inline uint32_t HashUniqueRequest(const UniqueRequest &req) {
214 623 return MurmurHash2(&req, sizeof(req), 0x07387a4f);
215 }
216
217 bool HandleRequest(int fd_con);
218 void HandleHandshake(cvmfs::MsgHandshake *msg_req,
219 CacheTransport *transport);
220 void HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
221 CacheTransport *transport);
222 void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
223 CacheTransport *transport);
224 void HandleRead(cvmfs::MsgReadReq *msg_req,
225 CacheTransport *transport);
226 void HandleStore(cvmfs::MsgStoreReq *msg_req,
227 CacheTransport::Frame *frame,
228 CacheTransport *transport);
229 void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
230 CacheTransport *transport);
231 void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport);
232 void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport);
233 void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport);
234 void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
235 CacheTransport *transport);
236 void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
237 CacheTransport *transport);
238 void HandleIoctl(cvmfs::MsgIoctl *msg_req);
239 void SendDetachRequests();
240
241 void NotifySupervisor(char signal);
242
243 void LogSessionError(uint64_t session_id,
244 cvmfs::EnumStatus status,
245 const std::string &msg);
246 void LogSessionInfo(uint64_t session_id, const std::string &msg);
247
248 bool is_local_;
249 uint64_t capabilities_;
250 int fd_socket_;
251 int fd_socket_lock_;
252 atomic_int32 running_;
253 unsigned num_workers_;
254 unsigned max_object_size_;
255 /**
256 * Number of clients undergoing a reload, i.e. they promise to come back
257 * and open a new connection soon.
258 */
259 uint64_t num_inlimbo_clients_;
260 std::string name_;
261 atomic_int64 next_session_id_;
262 atomic_int64 next_txn_id_;
263 atomic_int64 next_lst_id_;
264 SmallHashDynamic<UniqueRequest, uint64_t> txn_ids_;
265 std::set<int> connections_;
266 std::map<uint64_t, SessionInfo> sessions_;
267 pthread_t thread_io_;
268 int pipe_ctrl_[2];
269 }; // class CachePlugin
270
271 #endif // CVMFS_CACHE_PLUGIN_CHANNEL_H_
272