GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/quota_posix.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 27 28 96.4%
Branches: 0 0 -%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_QUOTA_POSIX_H_
6 #define CVMFS_QUOTA_POSIX_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12
13 #include <map>
14 #include <string>
15 #include <vector>
16
17 #include "crypto/hash.h"
18 #include "duplex_sqlite3.h"
19 #include "gtest/gtest_prod.h"
20 #include "quota.h"
21 #include "statistics.h"
22 #include "util/single_copy.h"
23 #include "util/string.h"
24
25 namespace perf {
26 class Recorder;
27 }
28
29 /**
30 * Works with the PosixCacheManager. Uses an SQlite database for cache contents
31 * tracking. Tracking is asynchronously.
32 *
33 * TODO(jblomer): split into client, server, and protocol classes.
34 */
35 class PosixQuotaManager : public QuotaManager {
36 FRIEND_TEST(T_QuotaManager, BindReturnPipe);
37 FRIEND_TEST(T_QuotaManager, Cleanup);
38 FRIEND_TEST(T_QuotaManager, Contains);
39 FRIEND_TEST(T_QuotaManager, InitDatabase);
40 FRIEND_TEST(T_QuotaManager, MakeReturnPipe);
41
42 public:
43 static PosixQuotaManager *Create(const std::string &cache_workspace,
44 const uint64_t limit, const uint64_t cleanup_threshold,
45 const bool rebuild_database);
46 static PosixQuotaManager *CreateShared(
47 const std::string &exe_path,
48 const std::string &cache_workspace,
49 const uint64_t limit,
50 const uint64_t cleanup_threshold,
51 bool foreground);
52 static int MainCacheManager(int argc, char **argv);
53
54 virtual ~PosixQuotaManager();
55 virtual bool HasCapability(Capabilities capability) { return true; }
56
57 virtual void Insert(const shash::Any &hash, const uint64_t size,
58 const std::string &description);
59 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
60 const std::string &description);
61 virtual bool Pin(const shash::Any &hash, const uint64_t size,
62 const std::string &description, const bool is_catalog);
63 virtual void Unpin(const shash::Any &hash);
64 virtual void Touch(const shash::Any &hash);
65 virtual void Remove(const shash::Any &file);
66 virtual bool Cleanup(const uint64_t leave_size);
67
68 virtual void RegisterBackChannel(int back_channel[2],
69 const std::string &channel_id);
70 virtual void UnregisterBackChannel(int back_channel[2],
71 const std::string &channel_id);
72
73 virtual std::vector<std::string> List();
74 virtual std::vector<std::string> ListPinned();
75 virtual std::vector<std::string> ListCatalogs();
76 virtual std::vector<std::string> ListVolatile();
77 virtual uint64_t GetMaxFileSize();
78 virtual uint64_t GetCapacity();
79 virtual uint64_t GetSize();
80 virtual uint64_t GetSizePinned();
81 virtual uint64_t GetCleanupRate(uint64_t period_s);
82
83 virtual void Spawn();
84 virtual pid_t GetPid();
85 virtual uint32_t GetProtocolRevision();
86
87 private:
88 /**
89 * Loaded catalogs are pinned in the LRU and have to be treated differently.
90 */
91 enum FileTypes {
92 kFileRegular = 0,
93 kFileCatalog,
94 };
95
96 /**
97 * List of RPCs that can be sent to the cache manager.
98 */
99 enum CommandType {
100 kTouch = 0,
101 kInsert,
102 kReserve,
103 kPin,
104 kUnpin,
105 kRemove,
106 kCleanup,
107 kList,
108 kListPinned,
109 kListCatalogs,
110 kStatus,
111 kLimits,
112 kPid,
113 kPinRegular,
114 kRegisterBackChannel,
115 kUnregisterBackChannel,
116 kGetProtocolRevision,
117 kInsertVolatile,
118 // as of protocol revision 2
119 kListVolatile,
120 kCleanupRate,
121 };
122
123 /**
124 * That could be done in more elegant way. However, we might have a situation
125 * with old cache manager serving new clients (or vice versa) and we don't
126 * want to change the memory layout of LruCommand.
127 */
128 struct LruCommand {
129 CommandType command_type;
130 uint64_t size; /**< Careful! Last 3 bits store hash algorithm */
131 int return_pipe; /**< For cleanup, listing, and reservations */
132 unsigned char digest[shash::kMaxDigestSize];
133 /**
134 * Maximum 512-sizeof(LruCommand) in order to guarantee atomic pipe
135 * operations.
136 */
137 uint16_t desc_length;
138
139 1147 LruCommand()
140 1147 : command_type(static_cast<CommandType>(0))
141 1147 , size(0)
142 1147 , return_pipe(-1)
143 1147 , desc_length(0)
144 {
145 1147 memset(digest, 0, shash::kMaxDigestSize);
146 1147 }
147
148 69 void SetSize(const uint64_t new_size) {
149 69 uint64_t mask = 7;
150 69 mask = ~(mask << (64-3));
151 69 size = (new_size & mask) | size;
152 69 }
153
154 214 uint64_t GetSize() const {
155 214 uint64_t mask = 7;
156 214 mask = ~(mask << (64-3));
157 214 return size & mask;
158 }
159
160 124 void StoreHash(const shash::Any &hash) {
161 124 memcpy(digest, hash.digest, hash.GetDigestSize());
162 // Exclude MD5
163 124 uint64_t algo_flags = hash.algorithm - 1;
164 124 algo_flags = algo_flags << (64-3);
165 124 size |= algo_flags;
166 124 }
167
168 86 shash::Any RetrieveHash() const {
169 86 uint64_t algo_flags = size >> (64-3);
170 86 shash::Any result(static_cast<shash::Algorithms>(algo_flags+1));
171 86 memcpy(result.digest, digest, result.GetDigestSize());
172 86 return result;
173 }
174 };
175
176 /**
177 * Maximum page cache per thread (Bytes).
178 */
179 static const unsigned kSqliteMemPerThread = 2*1024*1024;
180
181 /**
182 * Collect a number of insert and touch operations before processing them
183 * as sqlite commands.
184 */
185 static const unsigned kCommandBufferSize = 32;
186
187 /**
188 * Make sure that the amount of data transferred through the RPC pipe is
189 * within the OS's guarantees for atomicity.
190 */
191 static const unsigned kMaxDescription = 512-sizeof(LruCommand);
192
193 /**
194 * Alarm when more than 75% of the cache fraction allowed for pinned files
195 * (50%) is filled with pinned files
196 */
197 static const unsigned kHighPinWatermark = 75;
198
199 /**
200 * The last bit in the sequence number indicates if an entry is volatile.
201 * Such sequence numbers are negative and they are preferred during cleanup.
202 * Volatile entries are used for instance for ALICE conditions data.
203 */
204 static const uint64_t kVolatileFlag = 1ULL << 63;
205
206 bool InitDatabase(const bool rebuild_database);
207 bool RebuildDatabase();
208 void CloseDatabase();
209 bool Contains(const std::string &hash_str);
210 bool DoCleanup(const uint64_t leave_size);
211
212 void MakeReturnPipe(int pipe[2]);
213 int BindReturnPipe(int pipe_wronly);
214 void UnbindReturnPipe(int pipe_wronly);
215 void UnlinkReturnPipe(int pipe_wronly);
216 void CloseReturnPipe(int pipe[2]);
217 void CleanupPipes();
218
219 void CheckFreeSpace();
220 void CheckHighPinWatermark();
221 void ProcessCommandBunch(const unsigned num,
222 const LruCommand *commands,
223 const char *descriptions);
224 static void *MainCommandServer(void *data);
225
226 void DoInsert(const shash::Any &hash, const uint64_t size,
227 const std::string &description, const CommandType command_type);
228 std::vector<std::string> DoList(const CommandType list_command);
229 void GetSharedStatus(uint64_t *gauge, uint64_t *pinned);
230 void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold);
231
232 static void ParseDirectories(const std::string cache_workspace,
233 std::string *cache_dir,
234 std::string *workspace_dir);
235 PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold,
236 const std::string &cache_workspace);
237
238 /**
239 * Indicates if the cache manager is a shared process or a thread within the
240 * same process (exclusive cache manager)
241 */
242 bool shared_;
243
244 /**
245 * True once the program switches into multi-threaded mode or the quota manager
246 * process has been forked resp.
247 */
248 bool spawned_;
249
250 /**
251 * Soft limit in bytes, start cleanup when reached.
252 */
253 uint64_t limit_;
254
255 /**
256 * Cleanup until cleanup_threshold_ are left in the cache.
257 */
258 uint64_t cleanup_threshold_;
259
260 /**
261 * Current size of cache.
262 */
263 uint64_t gauge_;
264
265 /**
266 * Size of pinned files in bytes (usually file catalogs).
267 */
268 uint64_t pinned_;
269
270 /**
271 * Current access sequence number. Gets increased on every access/insert
272 * operation.
273 */
274 uint64_t seq_;
275
276 /**
277 * Should match the directory given to the cache manager.
278 */
279 std::string cache_dir_;
280
281 /**
282 * Directory for the database lock (shared manager) and the pipes (also
283 * shared manager). Usually the same as cache_dir_. Can be different if
284 * CVMFS_WORKSPACE or CVMFS_CACHE_WORKSPACE is set.
285 */
286 std::string workspace_dir_;
287
288 /**
289 * Pinned content hashes and their size.
290 */
291 std::map<shash::Any, uint64_t> pinned_chunks_;
292
293 /**
294 * Used to send RPCs to the quota manager thread or process.
295 */
296 int pipe_lru_[2];
297
298 /**
299 * In exclusive mode, controls the quota manager thread.
300 */
301 pthread_t thread_lru_;
302
303 /**
304 * Ensures exclusive cache database access through POSIX file lock.
305 */
306 int fd_lock_cachedb_;
307
308 /**
309 * If this is true, the unlink operations that correspond to a cleanup run
310 * will be performed in a detached, asynchronous process.
311 */
312 bool async_delete_;
313
314 /**
315 * Keeps track of the number of cleanups over time. Use by
316 * `cvmfs_talk cleanup rate`
317 */
318 perf::MultiRecorder cleanup_recorder_;
319
320 sqlite3 *database_;
321 sqlite3_stmt *stmt_touch_;
322 sqlite3_stmt *stmt_unpin_;
323 sqlite3_stmt *stmt_block_;
324 sqlite3_stmt *stmt_unblock_;
325 sqlite3_stmt *stmt_new_;
326 sqlite3_stmt *stmt_lru_;
327 sqlite3_stmt *stmt_size_;
328 sqlite3_stmt *stmt_rm_;
329 sqlite3_stmt *stmt_list_;
330 sqlite3_stmt *stmt_list_pinned_; /**< Loaded catalogs are pinned. */
331 sqlite3_stmt *stmt_list_catalogs_;
332 sqlite3_stmt *stmt_list_volatile_;
333
334 /**
335 * Used in the destructor to steer closing of the database and so on.
336 */
337 bool initialized_;
338 }; // class PosixQuotaManager
339
340 #endif // CVMFS_QUOTA_POSIX_H_
341