GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_plugin/channel.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 39 42 92.9%
Branches: 9 16 56.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4 #ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_
5 #define CVMFS_CACHE_PLUGIN_CHANNEL_H_
6
7 #include <pthread.h>
8 #include <stdint.h>
9
10 #include <cassert>
11 #include <map>
12 #include <set>
13 #include <string>
14 #include <vector>
15
16 #include "cache.pb.h"
17 #include "cache_transport.h"
18 #include "crypto/hash.h"
19 #include "manifest.h"
20 #include "smallhash.h"
21 #include "util/atomic.h"
22 #include "util/murmur.hxx"
23 #include "util/single_copy.h"
24
25 /**
26 * A SessionCtx stores the session information related to the current cache
27 * plugin callback in thread-local-storage. Singleton.
28 *
29 * TODO(jblomer): merge code with ClientCtx
30 */
31 class SessionCtx : SingleCopy {
32 public:
33 struct ThreadLocalStorage {
34 225 ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance)
35 225 : id(id)
36 225 , reponame(reponame)
37 225 , client_instance(client_instance)
38 225 , is_set(true) { }
39
40 uint64_t id;
41 char *reponame;
42 char *client_instance;
43 bool is_set; ///< either not yet set or deliberately unset
44 };
45
46 static SessionCtx *GetInstance();
47 static void CleanupInstance();
48 ~SessionCtx();
49
50 void Set(uint64_t id, char *reponame, char *client_instance);
51 void Unset();
52 void Get(uint64_t *id, char **reponame, char **client_instance);
53 bool IsSet();
54
55 private:
56 static SessionCtx *instance_;
57 static void TlsDestructor(void *data);
58
59 SessionCtx();
60
61 pthread_key_t thread_local_storage_;
62 pthread_mutex_t *lock_tls_blocks_;
63 std::vector<ThreadLocalStorage *> tls_blocks_;
64 };
65
66
67 class CachePlugin {
68 public:
69 static const unsigned kPbProtocolVersion = 1;
70 static const uint64_t kSizeUnknown;
71
72 struct ObjectInfo {
73 495 ObjectInfo()
74 495 : id()
75 495 , size(kSizeUnknown)
76 495 , object_type(cvmfs::OBJECT_REGULAR)
77 495 , pinned(false) { }
78 shash::Any id;
79 uint64_t size;
80 cvmfs::EnumObjectType object_type;
81 bool pinned;
82 std::string description;
83 };
84
85 struct Info {
86 60 Info() : size_bytes(0), used_bytes(0), pinned_bytes(0), no_shrink(-1) { }
87 uint64_t size_bytes;
88 uint64_t used_bytes;
89 uint64_t pinned_bytes;
90 int64_t no_shrink;
91 };
92
93 bool Listen(const std::string &locator);
94 virtual ~CachePlugin();
95 void ProcessRequests(unsigned num_workers);
96 bool IsRunning();
97 void Terminate();
98 void WaitFor();
99 void AskToDetach();
100
101 unsigned max_object_size() const { return max_object_size_; }
102 uint64_t capabilities() const { return capabilities_; }
103
104 protected:
105 explicit CachePlugin(uint64_t capabilities);
106
107 virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id,
108 int32_t change_by) = 0;
109 virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id,
110 ObjectInfo *info) = 0;
111 virtual cvmfs::EnumStatus Pread(const shash::Any &id,
112 uint64_t offset,
113 uint32_t *size,
114 unsigned char *buffer) = 0;
115 virtual cvmfs::EnumStatus StartTxn(const shash::Any &id,
116 const uint64_t txn_id,
117 const ObjectInfo &info) = 0;
118 virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id,
119 unsigned char *buffer,
120 uint32_t size) = 0;
121 virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0;
122 virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0;
123
124 virtual cvmfs::EnumStatus GetInfo(Info *info) = 0;
125 virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to,
126 uint64_t *used_bytes) = 0;
127 virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id,
128 cvmfs::EnumObjectType type) = 0;
129 virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item) = 0;
130 virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0;
131
132 virtual cvmfs::EnumStatus LoadBreadcrumb(
133 const std::string &fqrn, manifest::Breadcrumb *breadcrumb) = 0;
134 virtual cvmfs::EnumStatus StoreBreadcrumb(
135 const std::string &fqrn, const manifest::Breadcrumb &breadcrumb) = 0;
136
137 private:
138 static const unsigned kDefaultMaxObjectSize = 256 * 1024; // 256kB
139 static const unsigned kListingSize = 4 * 1024 * 1024; // 4MB
140 static const char kSignalTerminate = 'q';
141 static const char kSignalDetach = 'd';
142
143 struct UniqueRequest {
144 87210 UniqueRequest() : session_id(-1), req_id(-1) { }
145 9135 UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { }
146 18585 bool operator==(const UniqueRequest &other) const {
147 18585 return (this->session_id == other.session_id)
148
3/4
✓ Branch 0 taken 9450 times.
✓ Branch 1 taken 9135 times.
✓ Branch 2 taken 9450 times.
✗ Branch 3 not taken.
18585 && (this->req_id == other.req_id);
149 }
150 bool operator!=(const UniqueRequest &other) const {
151 return !(*this == other);
152 }
153
154 int64_t session_id;
155 int64_t req_id;
156 };
157
158 /**
159 * The char pointers are prepared on Handshake and removed when the session
160 * closes. They are created to be consumed by the cvmcache_get_session() API.
161 */
162 struct SessionInfo {
163 270 SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { }
164 SessionInfo(uint64_t id, const std::string &name);
165
166 uint64_t id;
167 std::string name;
168 char *reponame;
169 char *client_instance;
170 };
171
172 /**
173 * RAII form of the SessionCtx. On construction, automatically sets the
174 * session context if the session id is found. On destruction, unsets the
175 * session information.
176 */
177 class SessionCtxGuard {
178 public:
179 51285 SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) {
180 51285 char *reponame = NULL;
181 51285 char *client_instance = NULL;
182 const std::map<uint64_t, SessionInfo>::const_iterator iter =
183
1/2
✓ Branch 1 taken 51285 times.
✗ Branch 2 not taken.
51285 plugin->sessions_.find(session_id);
184
1/2
✓ Branch 3 taken 51285 times.
✗ Branch 4 not taken.
51285 if (iter != plugin->sessions_.end()) {
185 51285 reponame = iter->second.reponame;
186 51285 client_instance = iter->second.client_instance;
187 }
188
1/2
✓ Branch 1 taken 51285 times.
✗ Branch 2 not taken.
51285 SessionCtx *session_ctx = SessionCtx::GetInstance();
189
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51285 times.
51285 assert(session_ctx);
190
1/2
✓ Branch 1 taken 51285 times.
✗ Branch 2 not taken.
51285 session_ctx->Set(session_id, reponame, client_instance);
191 51285 }
192
193 51285 ~SessionCtxGuard() {
194 51285 SessionCtx *session_ctx = SessionCtx::GetInstance();
195
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 51285 times.
51285 assert(session_ctx);
196 51285 session_ctx->Unset();
197 51285 }
198 };
199
200 static void *MainProcessRequests(void *data);
201
202 270 inline uint64_t NextSessionId() {
203 270 return atomic_xadd64(&next_session_id_, 1);
204 }
205 105 inline uint64_t NextTxnId() { return atomic_xadd64(&next_txn_id_, 1); }
206 90 inline uint64_t NextLstId() { return atomic_xadd64(&next_lst_id_, 1); }
207 9345 static inline uint32_t HashUniqueRequest(const UniqueRequest &req) {
208 9345 return MurmurHash2(&req, sizeof(req), 0x07387a4f);
209 }
210
211 bool HandleRequest(int fd_con);
212 void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport);
213 void HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
214 CacheTransport *transport);
215 void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
216 CacheTransport *transport);
217 void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport);
218 void HandleStore(cvmfs::MsgStoreReq *msg_req,
219 CacheTransport::Frame *frame,
220 CacheTransport *transport);
221 void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
222 CacheTransport *transport);
223 void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport);
224 void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport);
225 void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport);
226 void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
227 CacheTransport *transport);
228 void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
229 CacheTransport *transport);
230 void HandleIoctl(cvmfs::MsgIoctl *msg_req);
231 void SendDetachRequests();
232
233 void NotifySupervisor(char signal);
234
235 void LogSessionError(uint64_t session_id,
236 cvmfs::EnumStatus status,
237 const std::string &msg);
238 void LogSessionInfo(uint64_t session_id, const std::string &msg);
239
240 bool is_local_;
241 uint64_t capabilities_;
242 int fd_socket_;
243 int fd_socket_lock_;
244 atomic_int32 running_;
245 unsigned num_workers_;
246 unsigned max_object_size_;
247 /**
248 * Number of clients undergoing a reload, i.e. they promise to come back
249 * and open a new connection soon.
250 */
251 uint64_t num_inlimbo_clients_;
252 std::string name_;
253 atomic_int64 next_session_id_;
254 atomic_int64 next_txn_id_;
255 atomic_int64 next_lst_id_;
256 SmallHashDynamic<UniqueRequest, uint64_t> txn_ids_;
257 std::set<int> connections_;
258 std::map<uint64_t, SessionInfo> sessions_;
259 pthread_t thread_io_;
260 int pipe_ctrl_[2];
261 }; // class CachePlugin
262
263 #endif // CVMFS_CACHE_PLUGIN_CHANNEL_H_
264