GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/quota_posix.h
Date: 2026-05-10 02:36:07
Exec Total Coverage
Lines: 30 31 96.8%
Branches: 0 0 -%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_QUOTA_POSIX_H_
6 #define CVMFS_QUOTA_POSIX_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12
13 #include <map>
14 #include <string>
15 #include <vector>
16
17 #include "crypto/hash.h"
18 #include "duplex_sqlite3.h"
19 #include "duplex_testing.h"
20 #include "quota.h"
21 #include "statistics.h"
22 #include "util/single_copy.h"
23
24 namespace perf {
25 class Recorder;
26 }
27
28 /**
29 * Works with the PosixCacheManager. Uses an SQlite database for cache contents
30 * tracking. Tracking is asynchronously.
31 *
32 * TODO(jblomer): split into client, server, and protocol classes.
33 */
34 class PosixQuotaManager : public QuotaManager {
35 FRIEND_TEST(T_QuotaManager, BindReturnPipe);
36 FRIEND_TEST(T_QuotaManager, Cleanup);
37 FRIEND_TEST(T_QuotaManager, CleanupLru);
38 FRIEND_TEST(T_QuotaManager, Contains);
39 FRIEND_TEST(T_QuotaManager, InitDatabase);
40 FRIEND_TEST(T_QuotaManager, MakeReturnPipe);
41 FRIEND_TEST(T_QuotaManager, ReadPipeStringUsesExactLength);
42 FRIEND_TEST(T_QuotaManager, RegisterMountpointUsesCorrectDescriptionBuffer);
43 FRIEND_TEST(T_QuotaManager, SetCleanupPolicyUsesCorrectDescriptionBuffer);
44
45 public:
46 static PosixQuotaManager *Create(const std::string &cache_workspace,
47 const uint64_t limit,
48 const uint64_t cleanup_threshold,
49 const bool rebuild_database);
50 static PosixQuotaManager *CreateShared(const std::string &exe_path,
51 const std::string &cache_workspace,
52 const uint64_t limit,
53 const uint64_t cleanup_threshold,
54 bool foreground);
55 static int MainCacheManager(int argc, char **argv);
56
57 virtual ~PosixQuotaManager();
58 virtual bool HasCapability(Capabilities capability) { return true; }
59
60 virtual void Insert(const shash::Any &hash, const uint64_t size,
61 const std::string &description);
62 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
63 const std::string &description);
64 virtual bool Pin(const shash::Any &hash, const uint64_t size,
65 const std::string &description, const bool is_catalog);
66 virtual void Unpin(const shash::Any &hash);
67 virtual void Touch(const shash::Any &hash);
68 virtual void Remove(const shash::Any &file);
69 virtual bool Cleanup(const uint64_t leave_size);
70
71 virtual void RegisterBackChannel(int back_channel[2],
72 const std::string &channel_id);
73 virtual void UnregisterBackChannel(int back_channel[2],
74 const std::string &channel_id);
75
76 virtual std::vector<std::string> List();
77 virtual std::vector<std::string> ListPinned();
78 virtual std::vector<std::string> ListCatalogs();
79 virtual std::vector<std::string> ListVolatile();
80 virtual uint64_t GetMaxFileSize();
81 virtual uint64_t GetCapacity();
82 virtual uint64_t GetSize();
83 virtual uint64_t GetSizePinned();
84 virtual bool SetLimit(uint64_t limit);
85 virtual uint64_t GetCleanupRate(uint64_t period_s);
86
87 virtual void Spawn();
88 virtual pid_t GetPid();
89 virtual uint32_t GetProtocolRevision();
90 virtual void RegisterMountpoint(const std::string &mountpoint);
91 virtual void SetCleanupPolicy(bool cleanup_unused_first);
92 virtual std::string GetMountpoints();
93 virtual std::string GetGroupHashes();
94
95 void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte);
96 43 void SetCacheMgrPid(pid_t pid_) { cachemgr_pid_ = pid_; };
97
98
99 private:
100 /**
101 * Loaded catalogs are pinned in the LRU and have to be treated differently.
102 */
103 enum FileTypes {
104 kFileRegular = 0,
105 kFileCatalog,
106 };
107
108 /**
109 * List of RPCs that can be sent to the cache manager.
110 */
111 enum CommandType {
112 kTouch = 0,
113 kInsert,
114 kReserve,
115 kPin,
116 kUnpin,
117 kRemove,
118 kCleanup,
119 kList,
120 kListPinned,
121 kListCatalogs,
122 kStatus,
123 kLimits,
124 kPid,
125 kPinRegular,
126 kRegisterBackChannel,
127 kUnregisterBackChannel,
128 kGetProtocolRevision,
129 kInsertVolatile,
130 // as of protocol revision 2
131 kListVolatile,
132 kCleanupRate,
133 kSetLimit,
134 // as of protocol revision 3
135 kRegisterMountpoint,
136 kGetMountpoints,
137 kGetGroupHashes,
138 kSetCleanupPolicy,
139 };
140
141 /**
142 * That could be done in more elegant way. However, we might have a situation
143 * with old cache manager serving new clients (or vice versa) and we don't
144 * want to change the memory layout of LruCommand.
145 */
146 struct LruCommand {
147 CommandType command_type;
148 uint64_t size; /**< Careful! Last 3 bits store hash algorithm */
149 int return_pipe; /**< For cleanup, listing, and reservations */
150 unsigned char digest[shash::kMaxDigestSize];
151 /**
152 * Maximum 512-sizeof(LruCommand) in order to guarantee atomic pipe
153 * operations.
154 */
155 uint16_t desc_length;
156
157 6503805 LruCommand()
158 6503805 : command_type(static_cast<CommandType>(0))
159 6503805 , size(0)
160 6503805 , return_pipe(-1)
161 6503805 , desc_length(0) {
162 6503805 memset(digest, 0, shash::kMaxDigestSize);
163 6503805 }
164
165 4302317 void SetSize(const uint64_t new_size) {
166 4302317 uint64_t mask = 7;
167 4302317 mask = ~(mask << (64 - 3));
168 4302317 size = (new_size & mask) | size;
169 4302317 }
170
171 12908041 uint64_t GetSize() const {
172 12908041 uint64_t mask = 7;
173 12908041 mask = ~(mask << (64 - 3));
174 12908041 return size & mask;
175 }
176
177 6453575 void StoreHash(const shash::Any &hash) {
178 6453575 memcpy(digest, hash.digest, hash.GetDigestSize());
179 // Exclude MD5
180 6453575 uint64_t algo_flags = hash.algorithm - 1;
181 6453575 algo_flags = algo_flags << (64 - 3);
182 6453575 size |= algo_flags;
183 6453575 }
184
185 6453010 shash::Any RetrieveHash() const {
186 6453010 const uint64_t algo_flags = size >> (64 - 3);
187 6453010 shash::Any result(static_cast<shash::Algorithms>(algo_flags + 1));
188 6453010 memcpy(result.digest, digest, result.GetDigestSize());
189 6453010 return result;
190 }
191 };
192
193 /**
194 * Used for batch queries in DoCleanup()
195 */
196 struct EvictCandidate {
197 uint64_t size;
198 uint64_t acseq;
199 shash::Any hash;
200 2236430 EvictCandidate(const shash::Any &h, uint64_t s, uint64_t a)
201 2236430 : size(s), acseq(a), hash(h) { }
202 };
203
204 /**
205 * Magic number to make reading PIDs from lockfiles more robust and
206 * versionable
207 */
208 static const unsigned kLockFileMagicNumber = 142857;
209
210 /**
211 * Maximum page cache per thread (Bytes).
212 */
213 static const unsigned kSqliteMemPerThread = 2 * 1024 * 1024;
214
215 /**
216 * Collect a number of insert and touch operations before processing them
217 * as sqlite commands.
218 */
219 static const unsigned kCommandBufferSize = 32;
220
221 /**
222 * Batch size for database operations during DoCleanup()
223 */
224 static const unsigned kEvictBatchSize = 1000;
225
226 /**
227 * Make sure that the amount of data transferred through the RPC pipe is
228 * within the OS's guarantees for atomicity.
229 */
230 static const unsigned kMaxDescription = 512 - sizeof(LruCommand);
231
232 /**
233 * Alarm when more than 75% of the cache fraction allowed for pinned files
234 * (50%) is filled with pinned files
235 */
236 static const unsigned kHighPinWatermark = 75;
237
238 /**
239 * The last bit in the sequence number indicates if an entry is volatile.
240 * Such sequence numbers are negative and they are preferred during cleanup.
241 * Volatile entries are used for instance for ALICE conditions data.
242 */
243 static const uint64_t kVolatileFlag = 1ULL << 63;
244
245 bool InitDatabase(const bool rebuild_database);
246 bool RebuildDatabase();
247 void CloseDatabase();
248 bool Contains(const std::string &hash_str);
249 bool DoCleanup(const uint64_t leave_size);
250 bool EmptyTrash(const std::vector<std::string> &trash);
251
252 void MakeReturnPipe(int pipe[2]);
253 int BindReturnPipe(int pipe_wronly);
254 void UnbindReturnPipe(int pipe_wronly);
255 void UnlinkReturnPipe(int pipe_wronly);
256 void CloseReturnPipe(int pipe[2]);
257 void CleanupPipes();
258
259 void CheckFreeSpace();
260 void CheckHighPinWatermark();
261 void ProcessCommandBunch(const unsigned num,
262 const LruCommand *commands,
263 const char *descriptions);
264 static void *MainCommandServer(void *data);
265
266 void DoInsert(const shash::Any &hash, const uint64_t size,
267 const std::string &description, const CommandType command_type);
268 std::vector<std::string> DoList(const CommandType list_command);
269 void GetSharedStatus(uint64_t *gauge, uint64_t *pinned);
270 bool SetSharedLimit(uint64_t limit);
271 void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold);
272
273 static void ParseDirectories(const std::string cache_workspace,
274 std::string *cache_dir,
275 std::string *workspace_dir);
276 PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold,
277 const std::string &cache_workspace);
278 void SkipEviction(const EvictCandidate &candidate);
279 std::string ReadPipeString(int fd, size_t size);
280
281 /**
282 * Indicates if the cache manager is a shared process or a thread within the
283 * same process (exclusive cache manager)
284 */
285 bool shared_;
286
287 /**
288 * True once the program switches into multi-threaded mode or the quota
289 * manager process has been forked resp.
290 */
291 bool spawned_;
292
293 /**
294 * Soft limit in bytes, start cleanup when reached.
295 */
296 uint64_t limit_;
297
298 /**
299 * Cleanup until cleanup_threshold_ are left in the cache.
300 */
301 uint64_t cleanup_threshold_;
302
303 /**
304 * Current size of cache.
305 */
306 uint64_t gauge_;
307
308 /**
309 * Size of pinned files in bytes (usually file catalogs).
310 */
311 uint64_t pinned_;
312
313 /**
314 * Current access sequence number. Gets increased on every access/insert
315 * operation.
316 */
317 uint64_t seq_;
318
319 /**
320 * Should match the directory given to the cache manager.
321 */
322 std::string cache_dir_;
323
324 /**
325 * Directory for the database lock (shared manager) and the pipes (also
326 * shared manager). Usually the same as cache_dir_. Can be different if
327 * CVMFS_WORKSPACE or CVMFS_CACHE_WORKSPACE is set.
328 */
329 std::string workspace_dir_;
330
331 /**
332 * Pinned content hashes and their size.
333 */
334 std::map<shash::Any, uint64_t> pinned_chunks_;
335
336 /**
337 * Used to send RPCs to the quota manager thread or process.
338 */
339 int pipe_lru_[2];
340
341 /**
342 * In exclusive mode, controls the quota manager thread.
343 */
344 pthread_t thread_lru_;
345
346 /**
347 * Ensures exclusive cache database access through POSIX file lock.
348 */
349 int fd_lock_cachedb_;
350
351 /**
352 * If this is true, the unlink operations that correspond to a cleanup run
353 * will be performed in a detached, asynchronous process.
354 */
355 bool async_delete_;
356
357
358 /**
359 * Record pid of current cache manager in order to check if its process
360 * disappeared.
361 */
362 pid_t cachemgr_pid_;
363
364 /**
365 * Keeps track of the number of cleanups over time. Use by
366 * `cvmfs_talk cleanup rate`
367 */
368 perf::MultiRecorder cleanup_recorder_;
369
370 sqlite3 *database_;
371 sqlite3_stmt *stmt_touch_;
372 sqlite3_stmt *stmt_unpin_;
373 sqlite3_stmt *stmt_block_;
374 sqlite3_stmt *stmt_unblock_;
375 sqlite3_stmt *stmt_new_;
376 sqlite3_stmt *stmt_lru_;
377 sqlite3_stmt *stmt_size_;
378 sqlite3_stmt *stmt_rm_;
379 sqlite3_stmt *stmt_rm_batch_;
380 sqlite3_stmt *stmt_list_;
381 sqlite3_stmt *stmt_list_pinned_; /**< Loaded catalogs are pinned. */
382 sqlite3_stmt *stmt_list_catalogs_;
383 sqlite3_stmt *stmt_list_volatile_;
384
385 /**
386 * Used in the destructor to steer closing of the database and so on.
387 */
388 bool initialized_;
389
390 /**
391 * Used in DoCleanup to exclude currently used files from eviction
392 */
393 // TODO(gchr): it would be faster if it was a std::set. Needs a comparison
394 // operator for shash::Any
395 pthread_mutex_t *lock_open_files_;
396 std::vector<shash::Short> open_files_;
397
398 bool cleanup_unused_first_;
399 std::vector<std::string> mountpoints_;
400
401 std::vector<shash::Short> CollectAllOpenHashes();
402
403 struct CollectorHandler {
404 std::vector<shash::Short> &of;
405 const std::vector<std::string> &mp;
406 pthread_mutex_t *l;
407 size_t i;
408 };
409
410 static void *CollectMountpointsHashes(void *data);
411 }; // class PosixQuotaManager
412
413 #endif // CVMFS_QUOTA_POSIX_H_
414
415