GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/quota_posix.h
Date: 2025-02-09 02:34:19
Exec Total Coverage
Lines: 30 31 96.8%
Branches: 0 0 -%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_QUOTA_POSIX_H_
6 #define CVMFS_QUOTA_POSIX_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10 #include <sys/types.h>
11 #include <unistd.h>
12
13 #include <map>
14 #include <string>
15 #include <vector>
16
17 #include "crypto/hash.h"
18 #include "duplex_sqlite3.h"
19 #include "gtest/gtest_prod.h"
20 #include "quota.h"
21 #include "statistics.h"
22 #include "util/single_copy.h"
23 #include "util/string.h"
24
25 namespace perf {
26 class Recorder;
27 }
28
29 /**
30 * Works with the PosixCacheManager. Uses an SQlite database for cache contents
31 * tracking. Tracking is asynchronously.
32 *
33 * TODO(jblomer): split into client, server, and protocol classes.
34 */
35 class PosixQuotaManager : public QuotaManager {
36 FRIEND_TEST(T_QuotaManager, BindReturnPipe);
37 FRIEND_TEST(T_QuotaManager, Cleanup);
38 FRIEND_TEST(T_QuotaManager, Contains);
39 FRIEND_TEST(T_QuotaManager, InitDatabase);
40 FRIEND_TEST(T_QuotaManager, MakeReturnPipe);
41
42 public:
43 static PosixQuotaManager *Create(const std::string &cache_workspace,
44 const uint64_t limit, const uint64_t cleanup_threshold,
45 const bool rebuild_database);
46 static PosixQuotaManager *CreateShared(
47 const std::string &exe_path,
48 const std::string &cache_workspace,
49 const uint64_t limit,
50 const uint64_t cleanup_threshold,
51 bool foreground);
52 static int MainCacheManager(int argc, char **argv);
53
54 virtual ~PosixQuotaManager();
55 virtual bool HasCapability(Capabilities capability) { return true; }
56
57 virtual void Insert(const shash::Any &hash, const uint64_t size,
58 const std::string &description);
59 virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
60 const std::string &description);
61 virtual bool Pin(const shash::Any &hash, const uint64_t size,
62 const std::string &description, const bool is_catalog);
63 virtual void Unpin(const shash::Any &hash);
64 virtual void Touch(const shash::Any &hash);
65 virtual void Remove(const shash::Any &file);
66 virtual bool Cleanup(const uint64_t leave_size);
67
68 virtual void RegisterBackChannel(int back_channel[2],
69 const std::string &channel_id);
70 virtual void UnregisterBackChannel(int back_channel[2],
71 const std::string &channel_id);
72
73 virtual std::vector<std::string> List();
74 virtual std::vector<std::string> ListPinned();
75 virtual std::vector<std::string> ListCatalogs();
76 virtual std::vector<std::string> ListVolatile();
77 virtual uint64_t GetMaxFileSize();
78 virtual uint64_t GetCapacity();
79 virtual uint64_t GetSize();
80 virtual uint64_t GetSizePinned();
81 virtual bool SetLimit(uint64_t limit);
82 virtual uint64_t GetCleanupRate(uint64_t period_s);
83
84 virtual void Spawn();
85 virtual pid_t GetPid();
86 virtual uint32_t GetProtocolRevision();
87
88 void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte);
89 1 void SetCacheMgrPid(pid_t pid_) { cachemgr_pid_ = pid_;};
90
91
92 private:
93 /**
94 * Loaded catalogs are pinned in the LRU and have to be treated differently.
95 */
96 enum FileTypes {
97 kFileRegular = 0,
98 kFileCatalog,
99 };
100
101 /**
102 * List of RPCs that can be sent to the cache manager.
103 */
104 enum CommandType {
105 kTouch = 0,
106 kInsert,
107 kReserve,
108 kPin,
109 kUnpin,
110 kRemove,
111 kCleanup,
112 kList,
113 kListPinned,
114 kListCatalogs,
115 kStatus,
116 kLimits,
117 kPid,
118 kPinRegular,
119 kRegisterBackChannel,
120 kUnregisterBackChannel,
121 kGetProtocolRevision,
122 kInsertVolatile,
123 // as of protocol revision 2
124 kListVolatile,
125 kCleanupRate,
126 kSetLimit,
127 };
128
129 /**
130 * That could be done in more elegant way. However, we might have a situation
131 * with old cache manager serving new clients (or vice versa) and we don't
132 * want to change the memory layout of LruCommand.
133 */
134 struct LruCommand {
135 CommandType command_type;
136 uint64_t size; /**< Careful! Last 3 bits store hash algorithm */
137 int return_pipe; /**< For cleanup, listing, and reservations */
138 unsigned char digest[shash::kMaxDigestSize];
139 /**
140 * Maximum 512-sizeof(LruCommand) in order to guarantee atomic pipe
141 * operations.
142 */
143 uint16_t desc_length;
144
145 151161 LruCommand()
146 151161 : command_type(static_cast<CommandType>(0))
147 151161 , size(0)
148 151161 , return_pipe(-1)
149 151161 , desc_length(0)
150 {
151 151161 memset(digest, 0, shash::kMaxDigestSize);
152 151161 }
153
154 100056 void SetSize(const uint64_t new_size) {
155 100056 uint64_t mask = 7;
156 100056 mask = ~(mask << (64-3));
157 100056 size = (new_size & mask) | size;
158 100056 }
159
160 300179 uint64_t GetSize() const {
161 300179 uint64_t mask = 7;
162 300179 mask = ~(mask << (64-3));
163 300179 return size & mask;
164 }
165
166 150105 void StoreHash(const shash::Any &hash) {
167 150105 memcpy(digest, hash.digest, hash.GetDigestSize());
168 // Exclude MD5
169 150105 uint64_t algo_flags = hash.algorithm - 1;
170 150105 algo_flags = algo_flags << (64-3);
171 150105 size |= algo_flags;
172 150105 }
173
174 150068 shash::Any RetrieveHash() const {
175 150068 uint64_t algo_flags = size >> (64-3);
176 150068 shash::Any result(static_cast<shash::Algorithms>(algo_flags+1));
177 150068 memcpy(result.digest, digest, result.GetDigestSize());
178 150068 return result;
179 }
180 };
181
182 /**
183 * Used for batch queries in DoCleanup()
184 */
185 struct EvictCandidate {
186 uint64_t size;
187 uint64_t acseq;
188 shash::Any hash;
189 52010 EvictCandidate(const shash::Any &h, uint64_t s, uint64_t a)
190 52010 : size(s), acseq(a), hash(h) {}
191 };
192
193 /**
194 * Magic number to make reading PIDs from lockfiles more robust and versionable
195 */
196 static const unsigned kLockFileMagicNumber = 142857;
197
198 /**
199 * Maximum page cache per thread (Bytes).
200 */
201 static const unsigned kSqliteMemPerThread = 2*1024*1024;
202
203 /**
204 * Collect a number of insert and touch operations before processing them
205 * as sqlite commands.
206 */
207 static const unsigned kCommandBufferSize = 32;
208
209 /**
210 * Batch size for database operations during DoCleanup()
211 */
212 static const unsigned kEvictBatchSize = 1000;
213
214 /**
215 * Make sure that the amount of data transferred through the RPC pipe is
216 * within the OS's guarantees for atomicity.
217 */
218 static const unsigned kMaxDescription = 512-sizeof(LruCommand);
219
220 /**
221 * Alarm when more than 75% of the cache fraction allowed for pinned files
222 * (50%) is filled with pinned files
223 */
224 static const unsigned kHighPinWatermark = 75;
225
226 /**
227 * The last bit in the sequence number indicates if an entry is volatile.
228 * Such sequence numbers are negative and they are preferred during cleanup.
229 * Volatile entries are used for instance for ALICE conditions data.
230 */
231 static const uint64_t kVolatileFlag = 1ULL << 63;
232
233 bool InitDatabase(const bool rebuild_database);
234 bool RebuildDatabase();
235 void CloseDatabase();
236 bool Contains(const std::string &hash_str);
237 bool DoCleanup(const uint64_t leave_size);
238 bool EmptyTrash(const std::vector<std::string> &trash);
239
240 void MakeReturnPipe(int pipe[2]);
241 int BindReturnPipe(int pipe_wronly);
242 void UnbindReturnPipe(int pipe_wronly);
243 void UnlinkReturnPipe(int pipe_wronly);
244 void CloseReturnPipe(int pipe[2]);
245 void CleanupPipes();
246
247 void CheckFreeSpace();
248 void CheckHighPinWatermark();
249 void ProcessCommandBunch(const unsigned num,
250 const LruCommand *commands,
251 const char *descriptions);
252 static void *MainCommandServer(void *data);
253
254 void DoInsert(const shash::Any &hash, const uint64_t size,
255 const std::string &description, const CommandType command_type);
256 std::vector<std::string> DoList(const CommandType list_command);
257 void GetSharedStatus(uint64_t *gauge, uint64_t *pinned);
258 bool SetSharedLimit(uint64_t limit);
259 void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold);
260
261 static void ParseDirectories(const std::string cache_workspace,
262 std::string *cache_dir,
263 std::string *workspace_dir);
264 PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold,
265 const std::string &cache_workspace);
266
267 /**
268 * Indicates if the cache manager is a shared process or a thread within the
269 * same process (exclusive cache manager)
270 */
271 bool shared_;
272
273 /**
274 * True once the program switches into multi-threaded mode or the quota manager
275 * process has been forked resp.
276 */
277 bool spawned_;
278
279 /**
280 * Soft limit in bytes, start cleanup when reached.
281 */
282 uint64_t limit_;
283
284 /**
285 * Cleanup until cleanup_threshold_ are left in the cache.
286 */
287 uint64_t cleanup_threshold_;
288
289 /**
290 * Current size of cache.
291 */
292 uint64_t gauge_;
293
294 /**
295 * Size of pinned files in bytes (usually file catalogs).
296 */
297 uint64_t pinned_;
298
299 /**
300 * Current access sequence number. Gets increased on every access/insert
301 * operation.
302 */
303 uint64_t seq_;
304
305 /**
306 * Should match the directory given to the cache manager.
307 */
308 std::string cache_dir_;
309
310 /**
311 * Directory for the database lock (shared manager) and the pipes (also
312 * shared manager). Usually the same as cache_dir_. Can be different if
313 * CVMFS_WORKSPACE or CVMFS_CACHE_WORKSPACE is set.
314 */
315 std::string workspace_dir_;
316
317 /**
318 * Pinned content hashes and their size.
319 */
320 std::map<shash::Any, uint64_t> pinned_chunks_;
321
322 /**
323 * Used to send RPCs to the quota manager thread or process.
324 */
325 int pipe_lru_[2];
326
327 /**
328 * In exclusive mode, controls the quota manager thread.
329 */
330 pthread_t thread_lru_;
331
332 /**
333 * Ensures exclusive cache database access through POSIX file lock.
334 */
335 int fd_lock_cachedb_;
336
337 /**
338 * If this is true, the unlink operations that correspond to a cleanup run
339 * will be performed in a detached, asynchronous process.
340 */
341 bool async_delete_;
342
343
344 /**
345 * Record pid of current cache manager in order to check if its process
346 * disappeared.
347 */
348 pid_t cachemgr_pid_;
349
350 /**
351 * Keeps track of the number of cleanups over time. Use by
352 * `cvmfs_talk cleanup rate`
353 */
354 perf::MultiRecorder cleanup_recorder_;
355
356 sqlite3 *database_;
357 sqlite3_stmt *stmt_touch_;
358 sqlite3_stmt *stmt_unpin_;
359 sqlite3_stmt *stmt_block_;
360 sqlite3_stmt *stmt_unblock_;
361 sqlite3_stmt *stmt_new_;
362 sqlite3_stmt *stmt_lru_;
363 sqlite3_stmt *stmt_size_;
364 sqlite3_stmt *stmt_rm_;
365 sqlite3_stmt *stmt_rm_batch_;
366 sqlite3_stmt *stmt_list_;
367 sqlite3_stmt *stmt_list_pinned_; /**< Loaded catalogs are pinned. */
368 sqlite3_stmt *stmt_list_catalogs_;
369 sqlite3_stmt *stmt_list_volatile_;
370
371 /**
372 * Used in the destructor to steer closing of the database and so on.
373 */
374 bool initialized_;
375 }; // class PosixQuotaManager
376
377 #endif // CVMFS_QUOTA_POSIX_H_
378