GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
* |
||
4 |
* This module implements a "managed local cache". |
||
5 |
* This way, we are able to track access times of files in the cache |
||
6 |
* and remove files based on least recently used strategy. |
||
7 |
* |
||
8 |
* We setup another SQLite catalog, a "cache catalog", that helps us |
||
9 |
* in the bookkeeping of files, file sizes and access times. |
||
10 |
* |
||
11 |
* We might choose to not manage the local cache. This is indicated |
||
12 |
* by limit == 0 and everything succeeds in that case. |
||
13 |
*/ |
||
14 |
|||
15 |
#define __STDC_LIMIT_MACROS |
||
16 |
#define __STDC_FORMAT_MACROS |
||
17 |
|||
18 |
#include "cvmfs_config.h" |
||
19 |
#include "quota_posix.h" |
||
20 |
|||
21 |
#include <dirent.h> |
||
22 |
#include <errno.h> |
||
23 |
#include <fcntl.h> |
||
24 |
#include <inttypes.h> |
||
25 |
#include <pthread.h> |
||
26 |
#include <signal.h> |
||
27 |
#include <stdint.h> |
||
28 |
#include <sys/dir.h> |
||
29 |
#include <sys/stat.h> |
||
30 |
#ifndef __APPLE__ |
||
31 |
#include <sys/statfs.h> |
||
32 |
#endif |
||
33 |
#include <sys/statvfs.h> |
||
34 |
#include <sys/types.h> |
||
35 |
#include <sys/wait.h> |
||
36 |
#include <unistd.h> |
||
37 |
|||
38 |
#include <cassert> |
||
39 |
#include <cstdio> |
||
40 |
#include <cstdlib> |
||
41 |
#include <cstring> |
||
42 |
|||
43 |
#include <map> |
||
44 |
#include <set> |
||
45 |
#include <string> |
||
46 |
#include <vector> |
||
47 |
|||
48 |
#include "duplex_sqlite3.h" |
||
49 |
#include "hash.h" |
||
50 |
#include "logging.h" |
||
51 |
#include "monitor.h" |
||
52 |
#include "platform.h" |
||
53 |
#include "smalloc.h" |
||
54 |
#include "statistics.h" |
||
55 |
#include "util/pointer.h" |
||
56 |
#include "util/posix.h" |
||
57 |
#include "util/string.h" |
||
58 |
#include "util_concurrency.h" |
||
59 |
|||
60 |
using namespace std; // NOLINT |
||
61 |
|||
62 |
|||
63 |
91 |
int PosixQuotaManager::BindReturnPipe(int pipe_wronly) { |
|
64 |
✓✓ | 91 |
if (!shared_) |
65 |
89 |
return pipe_wronly; |
|
66 |
|||
67 |
// Connect writer's end |
||
68 |
int result = |
||
69 |
open((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str(), |
||
70 |
2 |
O_WRONLY | O_NONBLOCK); |
|
71 |
✓✓ | 2 |
if (result >= 0) { |
72 |
1 |
Nonblock2Block(result); |
|
73 |
} else { |
||
74 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
75 |
1 |
"failed to bind return pipe (%d)", errno); |
|
76 |
} |
||
77 |
2 |
return result; |
|
78 |
} |
||
79 |
|||
80 |
|||
81 |
33 |
void PosixQuotaManager::CheckHighPinWatermark() { |
|
82 |
33 |
const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100; |
|
83 |
✓✗✓✓ |
33 |
if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) { |
84 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn, |
||
85 |
"high watermark of pinned files (%" PRIu64 "M > %" PRIu64 "M)", |
||
86 |
3 |
pinned_/(1024*1024), watermark/(1024*1024)); |
|
87 |
3 |
BroadcastBackchannels("R"); // clients: please release pinned catalogs |
|
88 |
} |
||
89 |
33 |
} |
|
90 |
|||
91 |
|||
92 |
void PosixQuotaManager::CleanupPipes() { |
||
93 |
DIR *dirp = opendir(workspace_dir_.c_str()); |
||
94 |
assert(dirp != NULL); |
||
95 |
|||
96 |
platform_dirent64 *dent; |
||
97 |
bool found_leftovers = false; |
||
98 |
while ((dent = platform_readdir(dirp)) != NULL) { |
||
99 |
const string name = dent->d_name; |
||
100 |
const string path = workspace_dir_ + "/" + name; |
||
101 |
platform_stat64 info; |
||
102 |
int retval = platform_stat(path.c_str(), &info); |
||
103 |
if (retval != 0) |
||
104 |
continue; |
||
105 |
if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) == "pipe")) { |
||
106 |
if (!found_leftovers) { |
||
107 |
LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogWarn, |
||
108 |
"removing left-over FIFOs from cache directory"); |
||
109 |
} |
||
110 |
found_leftovers = true; |
||
111 |
unlink(path.c_str()); |
||
112 |
} |
||
113 |
} |
||
114 |
closedir(dirp); |
||
115 |
} |
||
116 |
|||
117 |
|||
118 |
/** |
||
119 |
* Cleans up in data cache, until cache size is below leave_size. |
||
120 |
* The actual unlinking is done in a separate process (fork). |
||
121 |
* |
||
122 |
* \return True on success, false otherwise |
||
123 |
*/ |
||
124 |
9 |
bool PosixQuotaManager::Cleanup(const uint64_t leave_size) { |
|
125 |
✗✓ | 9 |
if (!spawned_) |
126 |
return DoCleanup(leave_size); |
||
127 |
|||
128 |
bool result; |
||
129 |
int pipe_cleanup[2]; |
||
130 |
9 |
MakeReturnPipe(pipe_cleanup); |
|
131 |
|||
132 |
9 |
LruCommand cmd; |
|
133 |
9 |
cmd.command_type = kCleanup; |
|
134 |
9 |
cmd.size = leave_size; |
|
135 |
9 |
cmd.return_pipe = pipe_cleanup[1]; |
|
136 |
|||
137 |
9 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
138 |
9 |
ReadHalfPipe(pipe_cleanup[0], &result, sizeof(result)); |
|
139 |
9 |
CloseReturnPipe(pipe_cleanup); |
|
140 |
|||
141 |
9 |
return result; |
|
142 |
} |
||
143 |
|||
144 |
|||
145 |
106 |
void PosixQuotaManager::CloseDatabase() { |
|
146 |
✓✗ | 106 |
if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_); |
147 |
✓✗ | 106 |
if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_); |
148 |
✓✗ | 106 |
if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_); |
149 |
✓✗ | 106 |
if (stmt_list_) sqlite3_finalize(stmt_list_); |
150 |
✓✗ | 106 |
if (stmt_lru_) sqlite3_finalize(stmt_lru_); |
151 |
✓✗ | 106 |
if (stmt_rm_) sqlite3_finalize(stmt_rm_); |
152 |
✓✗ | 106 |
if (stmt_size_) sqlite3_finalize(stmt_size_); |
153 |
✓✗ | 106 |
if (stmt_touch_) sqlite3_finalize(stmt_touch_); |
154 |
✓✗ | 106 |
if (stmt_unpin_) sqlite3_finalize(stmt_unpin_); |
155 |
✓✗ | 106 |
if (stmt_block_) sqlite3_finalize(stmt_block_); |
156 |
✓✗ | 106 |
if (stmt_unblock_) sqlite3_finalize(stmt_unblock_); |
157 |
✓✗ | 106 |
if (stmt_new_) sqlite3_finalize(stmt_new_); |
158 |
✓✗ | 106 |
if (database_) sqlite3_close(database_); |
159 |
106 |
UnlockFile(fd_lock_cachedb_); |
|
160 |
|||
161 |
106 |
stmt_list_catalogs_ = NULL; |
|
162 |
106 |
stmt_list_pinned_ = NULL; |
|
163 |
106 |
stmt_list_volatile_ = NULL; |
|
164 |
106 |
stmt_list_ = NULL; |
|
165 |
106 |
stmt_rm_ = NULL; |
|
166 |
106 |
stmt_size_ = NULL; |
|
167 |
106 |
stmt_touch_ = NULL; |
|
168 |
106 |
stmt_unpin_ = NULL; |
|
169 |
106 |
stmt_block_ = NULL; |
|
170 |
106 |
stmt_unblock_ = NULL; |
|
171 |
106 |
stmt_new_ = NULL; |
|
172 |
106 |
database_ = NULL; |
|
173 |
|||
174 |
106 |
pinned_chunks_.clear(); |
|
175 |
106 |
} |
|
176 |
|||
177 |
|||
178 |
86 |
void PosixQuotaManager::CloseReturnPipe(int pipe[2]) { |
|
179 |
✓✓ | 86 |
if (shared_) { |
180 |
2 |
close(pipe[0]); |
|
181 |
2 |
UnlinkReturnPipe(pipe[1]); |
|
182 |
} else { |
||
183 |
84 |
ClosePipe(pipe); |
|
184 |
} |
||
185 |
86 |
} |
|
186 |
|||
187 |
|||
188 |
87 |
bool PosixQuotaManager::Contains(const string &hash_str) { |
|
189 |
87 |
bool result = false; |
|
190 |
|||
191 |
sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(), |
||
192 |
87 |
SQLITE_STATIC); |
|
193 |
✓✓ | 87 |
if (sqlite3_step(stmt_size_) == SQLITE_ROW) |
194 |
23 |
result = true; |
|
195 |
87 |
sqlite3_reset(stmt_size_); |
|
196 |
LogCvmfs(kLogQuota, kLogDebug, "contains %s returns %d", |
||
197 |
87 |
hash_str.c_str(), result); |
|
198 |
|||
199 |
87 |
return result; |
|
200 |
} |
||
201 |
|||
202 |
|||
203 |
106 |
void PosixQuotaManager::CheckFreeSpace() { |
|
204 |
✓✗✓✓ |
106 |
if ((limit_ == 0) || (gauge_ >= limit_)) |
205 |
1 |
return; |
|
206 |
|||
207 |
struct statvfs vfs_info; |
||
208 |
105 |
int retval = statvfs((cache_dir_ + "/cachedb").c_str(), &vfs_info); |
|
209 |
✗✓ | 105 |
if (retval != 0) { |
210 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn, |
||
211 |
"failed to query %s for free space (%d)", |
||
212 |
cache_dir_.c_str(), errno); |
||
213 |
return; |
||
214 |
} |
||
215 |
105 |
int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize; |
|
216 |
LogCvmfs(kLogQuota, kLogDebug, "free space: %" PRId64 " MB", |
||
217 |
105 |
free_space_byte / (1024 * 1024)); |
|
218 |
|||
219 |
105 |
int64_t required_byte = limit_ - gauge_; |
|
220 |
✗✓ | 105 |
if (free_space_byte < required_byte) { |
221 |
LogCvmfs(kLogQuota, kLogSyslogWarn, |
||
222 |
"too little free space on the file system hosting the cache," |
||
223 |
" %" PRId64 " MB available", |
||
224 |
free_space_byte / (1024 * 1024)); |
||
225 |
} |
||
226 |
} |
||
227 |
|||
228 |
|||
229 |
109 |
PosixQuotaManager *PosixQuotaManager::Create( |
|
230 |
const string &cache_workspace, |
||
231 |
const uint64_t limit, |
||
232 |
const uint64_t cleanup_threshold, |
||
233 |
const bool rebuild_database) |
||
234 |
{ |
||
235 |
✓✓ | 109 |
if (cleanup_threshold >= limit) { |
236 |
LogCvmfs(kLogQuota, kLogDebug, "invalid parameters: limit %" PRIu64 ", " |
||
237 |
2 |
"cleanup_threshold %" PRIu64, limit, cleanup_threshold); |
|
238 |
2 |
return NULL; |
|
239 |
} |
||
240 |
|||
241 |
PosixQuotaManager *quota_manager = |
||
242 |
107 |
new PosixQuotaManager(limit, cleanup_threshold, cache_workspace); |
|
243 |
|||
244 |
// Initialize cache catalog |
||
245 |
✓✓ | 107 |
if (!quota_manager->InitDatabase(rebuild_database)) { |
246 |
✓✗ | 1 |
delete quota_manager; |
247 |
1 |
return NULL; |
|
248 |
} |
||
249 |
106 |
quota_manager->CheckFreeSpace(); |
|
250 |
106 |
MakePipe(quota_manager->pipe_lru_); |
|
251 |
|||
252 |
106 |
quota_manager->protocol_revision_ = kProtocolRevision; |
|
253 |
106 |
quota_manager->initialized_ = true; |
|
254 |
106 |
return quota_manager; |
|
255 |
} |
||
256 |
|||
257 |
|||
258 |
/** |
||
259 |
* Connects to a running shared local quota manager. Creates one if necessary. |
||
260 |
*/ |
||
261 |
4 |
PosixQuotaManager *PosixQuotaManager::CreateShared( |
|
262 |
const std::string &exe_path, |
||
263 |
const std::string &cache_workspace, |
||
264 |
const uint64_t limit, |
||
265 |
const uint64_t cleanup_threshold, |
||
266 |
bool foreground) |
||
267 |
{ |
||
268 |
4 |
string cache_dir; |
|
269 |
4 |
string workspace_dir; |
|
270 |
4 |
ParseDirectories(cache_workspace, &cache_dir, &workspace_dir); |
|
271 |
|||
272 |
// Create lock file: only one fuse client at a time |
||
273 |
4 |
const int fd_lockfile = LockFile(workspace_dir + "/lock_cachemgr"); |
|
274 |
✓✓ | 4 |
if (fd_lockfile < 0) { |
275 |
LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)", |
||
276 |
1 |
(workspace_dir + "/lock_cachemgr").c_str(), errno); |
|
277 |
1 |
return NULL; |
|
278 |
} |
||
279 |
|||
280 |
PosixQuotaManager *quota_mgr = |
||
281 |
3 |
new PosixQuotaManager(limit, cleanup_threshold, cache_workspace); |
|
282 |
3 |
quota_mgr->shared_ = true; |
|
283 |
3 |
quota_mgr->spawned_ = true; |
|
284 |
|||
285 |
// Try to connect to pipe |
||
286 |
3 |
const string fifo_path = workspace_dir + "/cachemgr"; |
|
287 |
3 |
LogCvmfs(kLogQuota, kLogDebug, "trying to connect to existing pipe"); |
|
288 |
3 |
quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK); |
|
289 |
✗✓ | 3 |
if (quota_mgr->pipe_lru_[1] >= 0) { |
290 |
LogCvmfs(kLogQuota, kLogDebug, "connected to existing cache manager pipe"); |
||
291 |
quota_mgr->initialized_ = true; |
||
292 |
Nonblock2Block(quota_mgr->pipe_lru_[1]); |
||
293 |
UnlockFile(fd_lockfile); |
||
294 |
quota_mgr->GetLimits("a_mgr->limit_, "a_mgr->cleanup_threshold_); |
||
295 |
LogCvmfs(kLogQuota, kLogDebug, |
||
296 |
"received limit %" PRIu64 ", threshold %" PRIu64, |
||
297 |
quota_mgr->limit_, quota_mgr->cleanup_threshold_); |
||
298 |
if (FileExists(workspace_dir + "/cachemgr.protocol")) { |
||
299 |
quota_mgr->protocol_revision_ = quota_mgr->GetProtocolRevision(); |
||
300 |
LogCvmfs(kLogQuota, kLogDebug, "connected protocol revision %u", |
||
301 |
quota_mgr->protocol_revision_); |
||
302 |
} else { |
||
303 |
LogCvmfs(kLogQuota, kLogDebug, "connected to ancient cache manager"); |
||
304 |
} |
||
305 |
return quota_mgr; |
||
306 |
} |
||
307 |
3 |
const int connect_error = errno; |
|
308 |
|||
309 |
// Lock file: let existing cache manager finish first |
||
310 |
3 |
const int fd_lockfile_fifo = LockFile(workspace_dir + "/lock_cachemgr.fifo"); |
|
311 |
✗✓ | 3 |
if (fd_lockfile_fifo < 0) { |
312 |
LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)", |
||
313 |
(workspace_dir + "/lock_cachemgr.fifo").c_str(), errno); |
||
314 |
UnlockFile(fd_lockfile); |
||
315 |
delete quota_mgr; |
||
316 |
return NULL; |
||
317 |
} |
||
318 |
3 |
UnlockFile(fd_lockfile_fifo); |
|
319 |
|||
320 |
✗✓ | 3 |
if (connect_error == ENXIO) { |
321 |
LogCvmfs(kLogQuota, kLogDebug, "left-over FIFO found, unlinking"); |
||
322 |
unlink(fifo_path.c_str()); |
||
323 |
} |
||
324 |
|||
325 |
// Creating a new FIFO for the cache manager (to be bound later) |
||
326 |
3 |
int retval = mkfifo(fifo_path.c_str(), 0600); |
|
327 |
✗✓ | 3 |
if (retval != 0) { |
328 |
LogCvmfs(kLogQuota, kLogDebug, "failed to create cache manager FIFO (%d)", |
||
329 |
errno); |
||
330 |
UnlockFile(fd_lockfile); |
||
331 |
delete quota_mgr; |
||
332 |
return NULL; |
||
333 |
} |
||
334 |
|||
335 |
// Create new cache manager |
||
336 |
int pipe_boot[2]; |
||
337 |
int pipe_handshake[2]; |
||
338 |
3 |
MakePipe(pipe_boot); |
|
339 |
3 |
MakePipe(pipe_handshake); |
|
340 |
|||
341 |
3 |
vector<string> command_line; |
|
342 |
3 |
command_line.push_back(exe_path); |
|
343 |
3 |
command_line.push_back("__cachemgr__"); |
|
344 |
3 |
command_line.push_back(cache_workspace); |
|
345 |
3 |
command_line.push_back(StringifyInt(pipe_boot[1])); |
|
346 |
3 |
command_line.push_back(StringifyInt(pipe_handshake[0])); |
|
347 |
3 |
command_line.push_back(StringifyInt(limit)); |
|
348 |
3 |
command_line.push_back(StringifyInt(cleanup_threshold)); |
|
349 |
3 |
command_line.push_back(StringifyInt(foreground)); |
|
350 |
3 |
command_line.push_back(StringifyInt(GetLogSyslogLevel())); |
|
351 |
3 |
command_line.push_back(StringifyInt(GetLogSyslogFacility())); |
|
352 |
3 |
command_line.push_back(GetLogDebugFile() + ":" + GetLogMicroSyslog()); |
|
353 |
|||
354 |
3 |
set<int> preserve_filedes; |
|
355 |
3 |
preserve_filedes.insert(0); |
|
356 |
3 |
preserve_filedes.insert(1); |
|
357 |
3 |
preserve_filedes.insert(2); |
|
358 |
3 |
preserve_filedes.insert(pipe_boot[1]); |
|
359 |
3 |
preserve_filedes.insert(pipe_handshake[0]); |
|
360 |
|||
361 |
3 |
retval = ManagedExec(command_line, preserve_filedes, map<int, int>(), false); |
|
362 |
✗✓ | 3 |
if (!retval) { |
363 |
UnlockFile(fd_lockfile); |
||
364 |
ClosePipe(pipe_boot); |
||
365 |
ClosePipe(pipe_handshake); |
||
366 |
delete quota_mgr; |
||
367 |
LogCvmfs(kLogQuota, kLogDebug, "failed to start cache manager"); |
||
368 |
return NULL; |
||
369 |
} |
||
370 |
|||
371 |
// Wait for cache manager to be ready |
||
372 |
3 |
close(pipe_boot[1]); |
|
373 |
3 |
close(pipe_handshake[0]); |
|
374 |
char buf; |
||
375 |
✓✗ | 3 |
if (read(pipe_boot[0], &buf, 1) != 1) { |
376 |
3 |
UnlockFile(fd_lockfile); |
|
377 |
3 |
close(pipe_boot[0]); |
|
378 |
3 |
close(pipe_handshake[1]); |
|
379 |
✓✗ | 3 |
delete quota_mgr; |
380 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
381 |
3 |
"cache manager did not start"); |
|
382 |
3 |
return NULL; |
|
383 |
} |
||
384 |
close(pipe_boot[0]); |
||
385 |
|||
386 |
// Connect write end |
||
387 |
quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK); |
||
388 |
if (quota_mgr->pipe_lru_[1] < 0) { |
||
389 |
LogCvmfs(kLogQuota, kLogDebug, |
||
390 |
"failed to connect to newly created FIFO (%d)", errno); |
||
391 |
close(pipe_handshake[1]); |
||
392 |
UnlockFile(fd_lockfile); |
||
393 |
delete quota_mgr; |
||
394 |
return NULL; |
||
395 |
} |
||
396 |
|||
397 |
// Finalize handshake |
||
398 |
buf = 'C'; |
||
399 |
if (write(pipe_handshake[1], &buf, 1) != 1) { |
||
400 |
UnlockFile(fd_lockfile); |
||
401 |
close(pipe_handshake[1]); |
||
402 |
LogCvmfs(kLogQuota, kLogDebug, "could not finalize handshake"); |
||
403 |
delete quota_mgr; |
||
404 |
return NULL; |
||
405 |
} |
||
406 |
close(pipe_handshake[1]); |
||
407 |
|||
408 |
Nonblock2Block(quota_mgr->pipe_lru_[1]); |
||
409 |
LogCvmfs(kLogQuota, kLogDebug, "connected to a new cache manager"); |
||
410 |
quota_mgr->protocol_revision_ = kProtocolRevision; |
||
411 |
|||
412 |
UnlockFile(fd_lockfile); |
||
413 |
|||
414 |
quota_mgr->initialized_ = true; |
||
415 |
quota_mgr->GetLimits("a_mgr->limit_, "a_mgr->cleanup_threshold_); |
||
416 |
LogCvmfs(kLogQuota, kLogDebug, "received limit %" PRIu64 ", " |
||
417 |
"threshold %" PRIu64, |
||
418 |
quota_mgr->limit_, quota_mgr->cleanup_threshold_); |
||
419 |
return quota_mgr; |
||
420 |
} |
||
421 |
|||
422 |
|||
423 |
10 |
bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) { |
|
424 |
✓✓ | 10 |
if (gauge_ <= leave_size) |
425 |
2 |
return true; |
|
426 |
|||
427 |
// TODO(jblomer) transaction |
||
428 |
LogCvmfs(kLogQuota, kLogSyslog, |
||
429 |
8 |
"clean up cache until at most %lu KB is used", leave_size/1024); |
|
430 |
8 |
LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_); |
|
431 |
8 |
cleanup_recorder_.Tick(); |
|
432 |
|||
433 |
bool result; |
||
434 |
8 |
string hash_str; |
|
435 |
8 |
vector<string> trash; |
|
436 |
|||
437 |
✓✓ | 15 |
do { |
438 |
16 |
sqlite3_reset(stmt_lru_); |
|
439 |
✓✓ | 16 |
if (sqlite3_step(stmt_lru_) != SQLITE_ROW) { |
440 |
1 |
LogCvmfs(kLogQuota, kLogDebug, "could not get lru-entry"); |
|
441 |
1 |
break; |
|
442 |
} |
||
443 |
|||
444 |
hash_str = string(reinterpret_cast<const char *>( |
||
445 |
15 |
sqlite3_column_text(stmt_lru_, 0))); |
|
446 |
15 |
LogCvmfs(kLogQuota, kLogDebug, "removing %s", hash_str.c_str()); |
|
447 |
15 |
shash::Any hash = shash::MkFromHexPtr(shash::HexPtr(hash_str)); |
|
448 |
|||
449 |
// That's a critical condition. We must not delete a not yet inserted |
||
450 |
// pinned file as it is already reserved (but will be inserted later). |
||
451 |
// Instead, set the pin bit in the db to not run into an endless loop |
||
452 |
✓✓ | 15 |
if (pinned_chunks_.find(hash) == pinned_chunks_.end()) { |
453 |
14 |
trash.push_back(cache_dir_ + "/" + hash.MakePathWithoutSuffix()); |
|
454 |
14 |
gauge_ -= sqlite3_column_int64(stmt_lru_, 1); |
|
455 |
LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64, |
||
456 |
14 |
hash_str.c_str(), gauge_); |
|
457 |
|||
458 |
sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(), |
||
459 |
14 |
SQLITE_STATIC); |
|
460 |
14 |
result = (sqlite3_step(stmt_rm_) == SQLITE_DONE); |
|
461 |
14 |
sqlite3_reset(stmt_rm_); |
|
462 |
|||
463 |
✗✓ | 14 |
if (!result) { |
464 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
465 |
"failed to find %s in cache database (%d). " |
||
466 |
"Cache database is out of sync. " |
||
467 |
"Restart cvmfs with clean cache.", hash_str.c_str(), result); |
||
468 |
return false; |
||
469 |
} |
||
470 |
} else { |
||
471 |
sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(), |
||
472 |
1 |
SQLITE_STATIC); |
|
473 |
1 |
result = (sqlite3_step(stmt_block_) == SQLITE_DONE); |
|
474 |
1 |
sqlite3_reset(stmt_block_); |
|
475 |
✗✓ | 1 |
assert(result); |
476 |
} |
||
477 |
} while (gauge_ > leave_size); |
||
478 |
|||
479 |
8 |
result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE); |
|
480 |
8 |
sqlite3_reset(stmt_unblock_); |
|
481 |
✗✓ | 8 |
assert(result); |
482 |
|||
483 |
// Double fork avoids zombie, forked removal process must not flush file |
||
484 |
// buffers |
||
485 |
✓✗ | 8 |
if (!trash.empty()) { |
486 |
✓✓ | 8 |
if (async_delete_) { |
487 |
pid_t pid; |
||
488 |
int statloc; |
||
489 |
✗✓ | 6 |
if ((pid = fork()) == 0) { |
490 |
// TODO(jblomer): eviciting files in the cache should perhaps become a |
||
491 |
// thread. This would also allow to block the chunks and prevent the |
||
492 |
// race with re-insertion. Then again, a thread can block umount. |
||
493 |
#ifndef DEBUGMSG |
||
494 |
int max_fd = sysconf(_SC_OPEN_MAX); |
||
495 |
for (int i = 0; i < max_fd; ++i) |
||
496 |
close(i); |
||
497 |
#endif |
||
498 |
if (fork() == 0) { |
||
499 |
for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) { |
||
500 |
LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str()); |
||
501 |
unlink(trash[i].c_str()); |
||
502 |
} |
||
503 |
_exit(0); |
||
504 |
} |
||
505 |
_exit(0); |
||
506 |
} else { |
||
507 |
✓✗ | 6 |
if (pid > 0) |
508 |
6 |
waitpid(pid, &statloc, 0); |
|
509 |
else |
||
510 |
return false; |
||
511 |
} |
||
512 |
} else { // !async_delete_ |
||
513 |
✓✓ | 5 |
for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) { |
514 |
3 |
LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str()); |
|
515 |
3 |
unlink(trash[i].c_str()); |
|
516 |
} |
||
517 |
} |
||
518 |
} |
||
519 |
|||
520 |
✓✓ | 8 |
if (gauge_ > leave_size) { |
521 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn, |
||
522 |
"request to clean until %" PRIu64 ", " |
||
523 |
1 |
"but effective gauge is %" PRIu64, leave_size, gauge_); |
|
524 |
1 |
return false; |
|
525 |
} |
||
526 |
7 |
return true; |
|
527 |
} |
||
528 |
|||
529 |
|||
530 |
59 |
void PosixQuotaManager::DoInsert( |
|
531 |
const shash::Any &hash, |
||
532 |
const uint64_t size, |
||
533 |
const string &description, |
||
534 |
const CommandType command_type) |
||
535 |
{ |
||
536 |
59 |
const string hash_str = hash.ToString(); |
|
537 |
LogCvmfs(kLogQuota, kLogDebug, "insert into lru %s, path %s, method %d", |
||
538 |
59 |
hash_str.c_str(), description.c_str(), command_type); |
|
539 |
const unsigned desc_length = (description.length() > kMaxDescription) ? |
||
540 |
✓✗ | 59 |
kMaxDescription : description.length(); |
541 |
|||
542 |
LruCommand *cmd = |
||
543 |
59 |
reinterpret_cast<LruCommand *>(alloca(sizeof(LruCommand) + desc_length)); |
|
544 |
✓✗ | 59 |
new (cmd) LruCommand; |
545 |
59 |
cmd->command_type = command_type; |
|
546 |
59 |
cmd->SetSize(size); |
|
547 |
59 |
cmd->StoreHash(hash); |
|
548 |
59 |
cmd->desc_length = desc_length; |
|
549 |
memcpy(reinterpret_cast<char *>(cmd)+sizeof(LruCommand), |
||
550 |
59 |
&description[0], desc_length); |
|
551 |
59 |
WritePipe(pipe_lru_[1], cmd, sizeof(LruCommand) + desc_length); |
|
552 |
59 |
} |
|
553 |
|||
554 |
|||
555 |
35 |
vector<string> PosixQuotaManager::DoList(const CommandType list_command) { |
|
556 |
35 |
vector<string> result; |
|
557 |
|||
558 |
int pipe_list[2]; |
||
559 |
35 |
MakeReturnPipe(pipe_list); |
|
560 |
char description_buffer[kMaxDescription]; |
||
561 |
|||
562 |
35 |
LruCommand cmd; |
|
563 |
35 |
cmd.command_type = list_command; |
|
564 |
35 |
cmd.return_pipe = pipe_list[1]; |
|
565 |
35 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
566 |
|||
567 |
int length; |
||
568 |
✓✓ | 86 |
do { |
569 |
86 |
ReadHalfPipe(pipe_list[0], &length, sizeof(length)); |
|
570 |
✓✓ | 86 |
if (length > 0) { |
571 |
51 |
ReadPipe(pipe_list[0], description_buffer, length); |
|
572 |
51 |
result.push_back(string(description_buffer, length)); |
|
573 |
} |
||
574 |
} while (length >= 0); |
||
575 |
|||
576 |
35 |
CloseReturnPipe(pipe_list); |
|
577 |
35 |
return result; |
|
578 |
} |
||
579 |
|||
580 |
|||
581 |
55 |
uint64_t PosixQuotaManager::GetCapacity() { |
|
582 |
✓✗ | 55 |
if (limit_ != (uint64_t)(-1)) |
583 |
55 |
return limit_; |
|
584 |
|||
585 |
// Unrestricted cache, look at free space on cache dir fs |
||
586 |
struct statfs info; |
||
587 |
if (statfs(".", &info) == 0) { |
||
588 |
return info.f_bavail * info.f_bsize; |
||
589 |
} else { |
||
590 |
LogCvmfs(kLogQuota, kLogSyslogErr | kLogDebug, |
||
591 |
"failed to query file system info of cache (%d)", errno); |
||
592 |
return limit_; |
||
593 |
} |
||
594 |
} |
||
595 |
|||
596 |
|||
597 |
void PosixQuotaManager::GetLimits(uint64_t *limit, uint64_t *cleanup_threshold) |
||
598 |
{ |
||
599 |
int pipe_limits[2]; |
||
600 |
MakeReturnPipe(pipe_limits); |
||
601 |
|||
602 |
LruCommand cmd; |
||
603 |
cmd.command_type = kLimits; |
||
604 |
cmd.return_pipe = pipe_limits[1]; |
||
605 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
||
606 |
ReadHalfPipe(pipe_limits[0], limit, sizeof(*limit)); |
||
607 |
ReadPipe(pipe_limits[0], cleanup_threshold, sizeof(*cleanup_threshold)); |
||
608 |
CloseReturnPipe(pipe_limits); |
||
609 |
} |
||
610 |
|||
611 |
|||
612 |
/** |
||
613 |
* Since we only cleanup until cleanup_threshold, we can only add |
||
614 |
* files smaller than limit-cleanup_threshold. |
||
615 |
*/ |
||
616 |
11 |
uint64_t PosixQuotaManager::GetMaxFileSize() { |
|
617 |
11 |
return limit_ - cleanup_threshold_; |
|
618 |
} |
||
619 |
|||
620 |
|||
621 |
1 |
pid_t PosixQuotaManager::GetPid() { |
|
622 |
✗✓✗✗ |
1 |
if (!shared_ || !spawned_) { |
623 |
1 |
return getpid(); |
|
624 |
} |
||
625 |
|||
626 |
pid_t result; |
||
627 |
int pipe_pid[2]; |
||
628 |
MakeReturnPipe(pipe_pid); |
||
629 |
|||
630 |
LruCommand cmd; |
||
631 |
cmd.command_type = kPid; |
||
632 |
cmd.return_pipe = pipe_pid[1]; |
||
633 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
||
634 |
ReadHalfPipe(pipe_pid[0], &result, sizeof(result)); |
||
635 |
CloseReturnPipe(pipe_pid); |
||
636 |
return result; |
||
637 |
} |
||
638 |
|||
639 |
|||
640 |
1 |
uint32_t PosixQuotaManager::GetProtocolRevision() { |
|
641 |
int pipe_revision[2]; |
||
642 |
1 |
MakeReturnPipe(pipe_revision); |
|
643 |
|||
644 |
1 |
LruCommand cmd; |
|
645 |
1 |
cmd.command_type = kGetProtocolRevision; |
|
646 |
1 |
cmd.return_pipe = pipe_revision[1]; |
|
647 |
1 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
648 |
|||
649 |
uint32_t revision; |
||
650 |
1 |
ReadHalfPipe(pipe_revision[0], &revision, sizeof(revision)); |
|
651 |
1 |
CloseReturnPipe(pipe_revision); |
|
652 |
1 |
return revision; |
|
653 |
} |
||
654 |
|||
655 |
|||
656 |
/** |
||
657 |
* Queries the shared local hard disk quota manager. |
||
658 |
*/ |
||
659 |
18 |
void PosixQuotaManager::GetSharedStatus(uint64_t *gauge, uint64_t *pinned) { |
|
660 |
int pipe_status[2]; |
||
661 |
18 |
MakeReturnPipe(pipe_status); |
|
662 |
|||
663 |
18 |
LruCommand cmd; |
|
664 |
18 |
cmd.command_type = kStatus; |
|
665 |
18 |
cmd.return_pipe = pipe_status[1]; |
|
666 |
18 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
667 |
18 |
ReadHalfPipe(pipe_status[0], gauge, sizeof(*gauge)); |
|
668 |
18 |
ReadPipe(pipe_status[0], pinned, sizeof(*pinned)); |
|
669 |
18 |
CloseReturnPipe(pipe_status); |
|
670 |
18 |
} |
|
671 |
|||
672 |
|||
673 |
126 |
uint64_t PosixQuotaManager::GetSize() { |
|
674 |
✓✓ | 126 |
if (!spawned_) return gauge_; |
675 |
uint64_t gauge, size_pinned; |
||
676 |
16 |
GetSharedStatus(&gauge, &size_pinned); |
|
677 |
16 |
return gauge; |
|
678 |
} |
||
679 |
|||
680 |
|||
681 |
2 |
uint64_t PosixQuotaManager::GetSizePinned() { |
|
682 |
✗✓ | 2 |
if (!spawned_) return pinned_; |
683 |
uint64_t gauge, size_pinned; |
||
684 |
2 |
GetSharedStatus(&gauge, &size_pinned); |
|
685 |
2 |
return size_pinned; |
|
686 |
} |
||
687 |
|||
688 |
|||
689 |
4 |
uint64_t PosixQuotaManager::GetCleanupRate(uint64_t period_s) { |
|
690 |
✓✗✗✓ |
4 |
if (!spawned_ || (protocol_revision_ < 2)) return 0; |
691 |
uint64_t cleanup_rate; |
||
692 |
|||
693 |
int pipe_cleanup_rate[2]; |
||
694 |
4 |
MakeReturnPipe(pipe_cleanup_rate); |
|
695 |
4 |
LruCommand cmd; |
|
696 |
4 |
cmd.command_type = kCleanupRate; |
|
697 |
4 |
cmd.size = period_s; |
|
698 |
4 |
cmd.return_pipe = pipe_cleanup_rate[1]; |
|
699 |
4 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
700 |
4 |
ReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate, sizeof(cleanup_rate)); |
|
701 |
4 |
CloseReturnPipe(pipe_cleanup_rate); |
|
702 |
|||
703 |
4 |
return cleanup_rate; |
|
704 |
} |
||
705 |
|||
706 |
|||
707 |
112 |
bool PosixQuotaManager::InitDatabase(const bool rebuild_database) { |
|
708 |
112 |
string sql; |
|
709 |
sqlite3_stmt *stmt; |
||
710 |
|||
711 |
112 |
fd_lock_cachedb_ = LockFile(workspace_dir_ + "/lock_cachedb"); |
|
712 |
✓✓ | 112 |
if (fd_lock_cachedb_ < 0) { |
713 |
1 |
LogCvmfs(kLogQuota, kLogDebug, "failed to create cachedb lock"); |
|
714 |
1 |
return false; |
|
715 |
} |
||
716 |
|||
717 |
111 |
bool retry = false; |
|
718 |
111 |
const string db_file = cache_dir_ + "/cachedb"; |
|
719 |
✓✓ | 111 |
if (rebuild_database) { |
720 |
LogCvmfs(kLogQuota, kLogDebug, "rebuild database, unlinking existing (%s)", |
||
721 |
6 |
db_file.c_str()); |
|
722 |
6 |
unlink(db_file.c_str()); |
|
723 |
6 |
unlink((db_file + "-journal").c_str()); |
|
724 |
} |
||
725 |
|||
726 |
init_recover: |
||
727 |
111 |
int err = sqlite3_open(db_file.c_str(), &database_); |
|
728 |
✗✓ | 111 |
if (err != SQLITE_OK) { |
729 |
LogCvmfs(kLogQuota, kLogDebug, "could not open cache database (%d)", err); |
||
730 |
goto init_database_fail; |
||
731 |
} |
||
732 |
// TODO(reneme): make this a `QuotaDatabase : public sqlite::Database` |
||
733 |
sql = "PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; " |
||
734 |
"PRAGMA auto_vacuum=1; " |
||
735 |
"CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, " |
||
736 |
" acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, " |
||
737 |
"CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); " |
||
738 |
"CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq " |
||
739 |
" ON cache_catalog (acseq); " |
||
740 |
"CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, " |
||
741 |
"CONSTRAINT pk_fscache PRIMARY KEY (sha1)); " |
||
742 |
"CREATE INDEX idx_fscache_actime ON fscache (actime); " |
||
743 |
"CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, " |
||
744 |
111 |
" CONSTRAINT pk_properties PRIMARY KEY(key));"; |
|
745 |
111 |
err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL); |
|
746 |
✗✓ | 111 |
if (err != SQLITE_OK) { |
747 |
if (!retry) { |
||
748 |
retry = true; |
||
749 |
sqlite3_close(database_); |
||
750 |
unlink(db_file.c_str()); |
||
751 |
unlink((db_file + "-journal").c_str()); |
||
752 |
LogCvmfs(kLogQuota, kLogSyslogWarn, |
||
753 |
"LRU database corrupted, re-building"); |
||
754 |
goto init_recover; |
||
755 |
} |
||
756 |
LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)", |
||
757 |
sql.c_str()); |
||
758 |
goto init_database_fail; |
||
759 |
} |
||
760 |
|||
761 |
// If this an old cache catalog, |
||
762 |
// add and initialize new columns to cache_catalog |
||
763 |
sql = "ALTER TABLE cache_catalog ADD type INTEGER; " |
||
764 |
111 |
"ALTER TABLE cache_catalog ADD pinned INTEGER"; |
|
765 |
111 |
err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL); |
|
766 |
✗✓ | 111 |
if (err == SQLITE_OK) { |
767 |
sql = "UPDATE cache_catalog SET type=" + StringifyInt(kFileRegular) + ";"; |
||
768 |
err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL); |
||
769 |
if (err != SQLITE_OK) { |
||
770 |
LogCvmfs(kLogQuota, kLogDebug, |
||
771 |
"could not init cache database (failed: %s)", sql.c_str()); |
||
772 |
goto init_database_fail; |
||
773 |
} |
||
774 |
} |
||
775 |
|||
776 |
// Set pinned back |
||
777 |
111 |
sql = "UPDATE cache_catalog SET pinned=0;"; |
|
778 |
111 |
err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL); |
|
779 |
✗✓ | 111 |
if (err != SQLITE_OK) { |
780 |
LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)", |
||
781 |
sql.c_str()); |
||
782 |
goto init_database_fail; |
||
783 |
} |
||
784 |
|||
785 |
// Set schema version |
||
786 |
sql = "INSERT OR REPLACE INTO properties (key, value) " |
||
787 |
111 |
"VALUES ('schema', '1.0')"; |
|
788 |
111 |
err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL); |
|
789 |
✗✓ | 111 |
if (err != SQLITE_OK) { |
790 |
LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)", |
||
791 |
sql.c_str()); |
||
792 |
goto init_database_fail; |
||
793 |
} |
||
794 |
|||
795 |
// If cache catalog is empty, recreate from file system |
||
796 |
111 |
sql = "SELECT count(*) FROM cache_catalog;"; |
|
797 |
111 |
sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL); |
|
798 |
✓✗ | 111 |
if (sqlite3_step(stmt) == SQLITE_ROW) { |
799 |
✓✓✗✓ ✓✓ |
111 |
if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) { |
800 |
LogCvmfs(kLogCvmfs, kLogDebug, |
||
801 |
107 |
"CernVM-FS: building lru cache database..."); |
|
802 |
✓✓ | 107 |
if (!RebuildDatabase()) { |
803 |
LogCvmfs(kLogQuota, kLogDebug, |
||
804 |
3 |
"could not build cache database from file system"); |
|
805 |
3 |
sqlite3_finalize(stmt); |
|
806 |
3 |
goto init_database_fail; |
|
807 |
} |
||
808 |
} |
||
809 |
108 |
sqlite3_finalize(stmt); |
|
810 |
} else { |
||
811 |
LogCvmfs(kLogQuota, kLogDebug, "could not select on cache catalog"); |
||
812 |
sqlite3_finalize(stmt); |
||
813 |
goto init_database_fail; |
||
814 |
} |
||
815 |
|||
816 |
// How many bytes do we already have in cache? |
||
817 |
108 |
sql = "SELECT sum(size) FROM cache_catalog;"; |
|
818 |
108 |
sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL); |
|
819 |
✓✗ | 108 |
if (sqlite3_step(stmt) == SQLITE_ROW) { |
820 |
108 |
gauge_ = sqlite3_column_int64(stmt, 0); |
|
821 |
} else { |
||
822 |
LogCvmfs(kLogQuota, kLogDebug, "could not determine cache size"); |
||
823 |
sqlite3_finalize(stmt); |
||
824 |
goto init_database_fail; |
||
825 |
} |
||
826 |
108 |
sqlite3_finalize(stmt); |
|
827 |
|||
828 |
// Highest seq-no? |
||
829 |
108 |
sql = "SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;"; |
|
830 |
108 |
sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL); |
|
831 |
✓✗ | 108 |
if (sqlite3_step(stmt) == SQLITE_ROW) { |
832 |
108 |
seq_ = sqlite3_column_int64(stmt, 0)+1; |
|
833 |
} else { |
||
834 |
LogCvmfs(kLogQuota, kLogDebug, "could not determine highest seq-no"); |
||
835 |
sqlite3_finalize(stmt); |
||
836 |
goto init_database_fail; |
||
837 |
} |
||
838 |
108 |
sqlite3_finalize(stmt); |
|
839 |
|||
840 |
// Prepare touch, new, remove statements |
||
841 |
sqlite3_prepare_v2(database_, |
||
842 |
"UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) " |
||
843 |
108 |
"WHERE sha1=:sha1;", -1, &stmt_touch_, NULL); |
|
844 |
sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=0 " |
||
845 |
108 |
"WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL); |
|
846 |
sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=2 " |
||
847 |
108 |
"WHERE sha1=:sha1;", -1, &stmt_block_, NULL); |
|
848 |
sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=1 " |
||
849 |
108 |
"WHERE pinned=2;", -1, &stmt_unblock_, NULL); |
|
850 |
sqlite3_prepare_v2(database_, |
||
851 |
"INSERT OR REPLACE INTO cache_catalog " |
||
852 |
"(sha1, size, acseq, path, type, pinned) " |
||
853 |
"VALUES (:sha1, :s, :seq, :p, :t, :pin);", |
||
854 |
108 |
-1, &stmt_new_, NULL); |
|
855 |
sqlite3_prepare_v2(database_, |
||
856 |
"SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;", |
||
857 |
108 |
-1, &stmt_size_, NULL); |
|
858 |
sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;", |
||
859 |
108 |
-1, &stmt_rm_, NULL); |
|
860 |
sqlite3_prepare_v2(database_, |
||
861 |
"SELECT sha1, size FROM cache_catalog WHERE " |
||
862 |
"acseq=(SELECT min(acseq) " |
||
863 |
"FROM cache_catalog WHERE pinned<>2);", |
||
864 |
108 |
-1, &stmt_lru_, NULL); |
|
865 |
sqlite3_prepare_v2(database_, |
||
866 |
("SELECT path FROM cache_catalog WHERE type=" + |
||
867 |
StringifyInt(kFileRegular) + |
||
868 |
108 |
";").c_str(), -1, &stmt_list_, NULL); |
|
869 |
sqlite3_prepare_v2(database_, |
||
870 |
"SELECT path FROM cache_catalog WHERE pinned<>0;", |
||
871 |
108 |
-1, &stmt_list_pinned_, NULL); |
|
872 |
sqlite3_prepare_v2(database_, |
||
873 |
"SELECT path FROM cache_catalog WHERE acseq < 0;", |
||
874 |
108 |
-1, &stmt_list_volatile_, NULL); |
|
875 |
sqlite3_prepare_v2(database_, |
||
876 |
("SELECT path FROM cache_catalog WHERE type=" + |
||
877 |
StringifyInt(kFileCatalog) + |
||
878 |
108 |
";").c_str(), -1, &stmt_list_catalogs_, NULL); |
|
879 |
108 |
return true; |
|
880 |
|||
881 |
init_database_fail: |
||
882 |
3 |
sqlite3_close(database_); |
|
883 |
3 |
database_ = NULL; |
|
884 |
3 |
UnlockFile(fd_lock_cachedb_); |
|
885 |
3 |
return false; |
|
886 |
} |
||
887 |
|||
888 |
|||
889 |
/** |
||
890 |
* Inserts a new file into cache catalog. This file gets a new, |
||
891 |
* highest sequence number. Does cache cleanup if necessary. |
||
892 |
*/ |
||
893 |
42 |
void PosixQuotaManager::Insert( |
|
894 |
const shash::Any &any_hash, |
||
895 |
const uint64_t size, |
||
896 |
const string &description) |
||
897 |
{ |
||
898 |
42 |
DoInsert(any_hash, size, description, kInsert); |
|
899 |
42 |
} |
|
900 |
|||
901 |
|||
902 |
/** |
||
903 |
* Inserts a new file into cache catalog. This file is marked as volatile |
||
904 |
* and gets a new highest sequence number with the first bit set. Cache cleanup |
||
905 |
* treats these files with priority. |
||
906 |
*/ |
||
907 |
4 |
void PosixQuotaManager::InsertVolatile( |
|
908 |
const shash::Any &any_hash, |
||
909 |
const uint64_t size, |
||
910 |
const string &description) |
||
911 |
{ |
||
912 |
4 |
DoInsert(any_hash, size, description, kInsertVolatile); |
|
913 |
4 |
} |
|
914 |
|||
915 |
|||
916 |
/** |
||
917 |
* Lists all path names from the cache db. |
||
918 |
*/ |
||
919 |
21 |
vector<string> PosixQuotaManager::List() { |
|
920 |
21 |
return DoList(kList); |
|
921 |
} |
||
922 |
|||
923 |
|||
924 |
/** |
||
925 |
* Lists all pinned files from the cache db. |
||
926 |
*/ |
||
927 |
8 |
vector<string> PosixQuotaManager::ListPinned() { |
|
928 |
8 |
return DoList(kListPinned); |
|
929 |
} |
||
930 |
|||
931 |
|||
932 |
/** |
||
933 |
* Lists all sqlite catalog files from the cache db. |
||
934 |
*/ |
||
935 |
3 |
vector<string> PosixQuotaManager::ListCatalogs() { |
|
936 |
3 |
return DoList(kListCatalogs); |
|
937 |
} |
||
938 |
|||
939 |
|||
940 |
/** |
||
941 |
* Lists only files flagged as volatile (priority removal) |
||
942 |
*/ |
||
943 |
3 |
vector<string> PosixQuotaManager::ListVolatile() { |
|
944 |
3 |
return DoList(kListVolatile); |
|
945 |
} |
||
946 |
|||
947 |
|||
948 |
/** |
||
949 |
* Entry point for the shared cache manager process |
||
950 |
*/ |
||
951 |
int PosixQuotaManager::MainCacheManager(int argc, char **argv) { |
||
952 |
LogCvmfs(kLogQuota, kLogDebug, "starting quota manager"); |
||
953 |
int retval; |
||
954 |
|||
955 |
UniquePtr<Watchdog> watchdog(Watchdog::Create("./stacktrace.cachemgr")); |
||
956 |
assert(watchdog.IsValid()); |
||
957 |
watchdog->Spawn(); |
||
958 |
|||
959 |
PosixQuotaManager shared_manager(0, 0, ""); |
||
960 |
shared_manager.shared_ = true; |
||
961 |
shared_manager.spawned_ = true; |
||
962 |
shared_manager.pinned_ = 0; |
||
963 |
|||
964 |
// Process command line arguments |
||
965 |
ParseDirectories(string(argv[2]), |
||
966 |
&shared_manager.cache_dir_, |
||
967 |
&shared_manager.workspace_dir_); |
||
968 |
int pipe_boot = String2Int64(argv[3]); |
||
969 |
int pipe_handshake = String2Int64(argv[4]); |
||
970 |
shared_manager.limit_ = String2Int64(argv[5]); |
||
971 |
shared_manager.cleanup_threshold_ = String2Int64(argv[6]); |
||
972 |
int foreground = String2Int64(argv[7]); |
||
973 |
int syslog_level = String2Int64(argv[8]); |
||
974 |
int syslog_facility = String2Int64(argv[9]); |
||
975 |
vector<string> logfiles = SplitString(argv[10], ':'); |
||
976 |
|||
977 |
SetLogSyslogLevel(syslog_level); |
||
978 |
SetLogSyslogFacility(syslog_facility); |
||
979 |
if ((logfiles.size() > 0) && (logfiles[0] != "")) |
||
980 |
SetLogDebugFile(logfiles[0] + ".cachemgr"); |
||
981 |
if (logfiles.size() > 1) |
||
982 |
SetLogMicroSyslog(logfiles[1]); |
||
983 |
|||
984 |
if (!foreground) |
||
985 |
Daemonize(); |
||
986 |
|||
987 |
// Initialize pipe, open non-blocking as cvmfs is not yet connected |
||
988 |
const int fd_lockfile_fifo = |
||
989 |
LockFile(shared_manager.workspace_dir_ + "/lock_cachemgr.fifo"); |
||
990 |
if (fd_lockfile_fifo < 0) { |
||
991 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, "could not open lock file " |
||
992 |
"%s (%d)", |
||
993 |
(shared_manager.workspace_dir_ + "/lock_cachemgr.fifo").c_str(), |
||
994 |
errno); |
||
995 |
return 1; |
||
996 |
} |
||
997 |
const string crash_guard = shared_manager.cache_dir_ + "/cachemgr.running"; |
||
998 |
const bool rebuild = FileExists(crash_guard); |
||
999 |
retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600); |
||
1000 |
if (retval < 0) { |
||
1001 |
LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogErr, |
||
1002 |
"failed to create shared cache manager crash guard"); |
||
1003 |
UnlockFile(fd_lockfile_fifo); |
||
1004 |
return 1; |
||
1005 |
} |
||
1006 |
close(retval); |
||
1007 |
|||
1008 |
// Redirect SQlite temp directory to cache (global variable) |
||
1009 |
const string tmp_dir = shared_manager.workspace_dir_; |
||
1010 |
sqlite3_temp_directory = |
||
1011 |
static_cast<char *>(sqlite3_malloc(tmp_dir.length() + 1)); |
||
1012 |
snprintf(sqlite3_temp_directory, tmp_dir.length() + 1, "%s", tmp_dir.c_str()); |
||
1013 |
|||
1014 |
// Cleanup leftover named pipes |
||
1015 |
shared_manager.CleanupPipes(); |
||
1016 |
|||
1017 |
if (!shared_manager.InitDatabase(rebuild)) { |
||
1018 |
UnlockFile(fd_lockfile_fifo); |
||
1019 |
return 1; |
||
1020 |
} |
||
1021 |
shared_manager.CheckFreeSpace(); |
||
1022 |
|||
1023 |
// Save protocol revision to file. If the file is not found, it indicates |
||
1024 |
// to the client that the cache manager is from times before the protocol |
||
1025 |
// was versioned. |
||
1026 |
const string protocol_revision_path = |
||
1027 |
shared_manager.workspace_dir_ + "/cachemgr.protocol"; |
||
1028 |
retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600); |
||
1029 |
if (retval < 0) { |
||
1030 |
LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogErr, |
||
1031 |
"failed to open protocol revision file (%d)", errno); |
||
1032 |
UnlockFile(fd_lockfile_fifo); |
||
1033 |
return 1; |
||
1034 |
} |
||
1035 |
const string revision = StringifyInt(kProtocolRevision); |
||
1036 |
int written = write(retval, revision.data(), revision.length()); |
||
1037 |
close(retval); |
||
1038 |
if ((written < 0) || static_cast<unsigned>(written) != revision.length()) { |
||
1039 |
LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogErr, |
||
1040 |
"failed to write protocol revision (%d)", errno); |
||
1041 |
UnlockFile(fd_lockfile_fifo); |
||
1042 |
return 1; |
||
1043 |
} |
||
1044 |
|||
1045 |
const string fifo_path = shared_manager.workspace_dir_ + "/cachemgr"; |
||
1046 |
shared_manager.pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK); |
||
1047 |
if (shared_manager.pipe_lru_[0] < 0) { |
||
1048 |
LogCvmfs(kLogQuota, kLogDebug, "failed to listen on FIFO %s (%d)", |
||
1049 |
fifo_path.c_str(), errno); |
||
1050 |
UnlockFile(fd_lockfile_fifo); |
||
1051 |
return 1; |
||
1052 |
} |
||
1053 |
Nonblock2Block(shared_manager.pipe_lru_[0]); |
||
1054 |
LogCvmfs(kLogQuota, kLogDebug, "shared cache manager listening"); |
||
1055 |
|||
1056 |
char buf = 'C'; |
||
1057 |
WritePipe(pipe_boot, &buf, 1); |
||
1058 |
close(pipe_boot); |
||
1059 |
|||
1060 |
ReadPipe(pipe_handshake, &buf, 1); |
||
1061 |
close(pipe_handshake); |
||
1062 |
LogCvmfs(kLogQuota, kLogDebug, "shared cache manager handshake done"); |
||
1063 |
|||
1064 |
// Ensure that broken pipes from clients do not kill the cache manager |
||
1065 |
signal(SIGPIPE, SIG_IGN); |
||
1066 |
// Don't let Ctrl-C ungracefully kill interactive session |
||
1067 |
signal(SIGINT, SIG_IGN); |
||
1068 |
|||
1069 |
shared_manager.MainCommandServer(&shared_manager); |
||
1070 |
unlink(fifo_path.c_str()); |
||
1071 |
unlink(protocol_revision_path.c_str()); |
||
1072 |
shared_manager.CloseDatabase(); |
||
1073 |
unlink(crash_guard.c_str()); |
||
1074 |
UnlockFile(fd_lockfile_fifo); |
||
1075 |
|||
1076 |
if (sqlite3_temp_directory) { |
||
1077 |
sqlite3_free(sqlite3_temp_directory); |
||
1078 |
sqlite3_temp_directory = NULL; |
||
1079 |
} |
||
1080 |
|||
1081 |
return 0; |
||
1082 |
} |
||
1083 |
|||
1084 |
|||
1085 |
30 |
void *PosixQuotaManager::MainCommandServer(void *data) { |
|
1086 |
30 |
PosixQuotaManager *quota_mgr = static_cast<PosixQuotaManager *>(data); |
|
1087 |
|||
1088 |
30 |
LogCvmfs(kLogQuota, kLogDebug, "starting quota manager"); |
|
1089 |
30 |
sqlite3_soft_heap_limit(quota_mgr->kSqliteMemPerThread); |
|
1090 |
|||
1091 |
✓✓ | 30 |
LruCommand command_buffer[kCommandBufferSize]; |
1092 |
char description_buffer[kCommandBufferSize*kMaxDescription]; |
||
1093 |
30 |
unsigned num_commands = 0; |
|
1094 |
|||
1095 |
✓✓ | 207 |
while (read(quota_mgr->pipe_lru_[0], &command_buffer[num_commands], |
1096 |
sizeof(command_buffer[0])) == sizeof(command_buffer[0])) |
||
1097 |
{ |
||
1098 |
147 |
const CommandType command_type = command_buffer[num_commands].command_type; |
|
1099 |
147 |
LogCvmfs(kLogQuota, kLogDebug, "received command %d", command_type); |
|
1100 |
147 |
const uint64_t size = command_buffer[num_commands].GetSize(); |
|
1101 |
|||
1102 |
// Inserts and pins come with a description (usually a path) |
||
1103 |
✓✓✓✓ ✓✓✓✓ |
147 |
if ((command_type == kInsert) || (command_type == kInsertVolatile) || |
1104 |
(command_type == kPin) || (command_type == kPinRegular)) |
||
1105 |
{ |
||
1106 |
46 |
const int desc_length = command_buffer[num_commands].desc_length; |
|
1107 |
ReadPipe(quota_mgr->pipe_lru_[0], |
||
1108 |
46 |
&description_buffer[kMaxDescription*num_commands], desc_length); |
|
1109 |
} |
||
1110 |
|||
1111 |
// The protocol revision is returned immediately |
||
1112 |
✓✓ | 147 |
if (command_type == kGetProtocolRevision) { |
1113 |
int return_pipe = |
||
1114 |
1 |
quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe); |
|
1115 |
✗✓ | 1 |
if (return_pipe < 0) |
1116 |
continue; |
||
1117 |
WritePipe(return_pipe, "a_mgr->kProtocolRevision, |
||
1118 |
1 |
sizeof(quota_mgr->kProtocolRevision)); |
|
1119 |
1 |
quota_mgr->UnbindReturnPipe(return_pipe); |
|
1120 |
1 |
continue; |
|
1121 |
} |
||
1122 |
|||
1123 |
// The cleanup rate is returned immediately |
||
1124 |
✓✓ | 146 |
if (command_type == kCleanupRate) { |
1125 |
int return_pipe = |
||
1126 |
4 |
quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe); |
|
1127 |
✗✓ | 4 |
if (return_pipe < 0) |
1128 |
continue; |
||
1129 |
4 |
uint64_t period_s = size; // use the size field to transmit the period |
|
1130 |
4 |
uint64_t rate = quota_mgr->cleanup_recorder_.GetNoTicks(period_s); |
|
1131 |
4 |
WritePipe(return_pipe, &rate, sizeof(rate)); |
|
1132 |
4 |
quota_mgr->UnbindReturnPipe(return_pipe); |
|
1133 |
4 |
continue; |
|
1134 |
} |
||
1135 |
|||
1136 |
// Reservations are handled immediately and "out of band" |
||
1137 |
✓✓ | 142 |
if (command_type == kReserve) { |
1138 |
14 |
bool success = true; |
|
1139 |
int return_pipe = |
||
1140 |
14 |
quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe); |
|
1141 |
✗✓ | 14 |
if (return_pipe < 0) |
1142 |
continue; |
||
1143 |
|||
1144 |
14 |
const shash::Any hash = command_buffer[num_commands].RetrieveHash(); |
|
1145 |
14 |
const string hash_str(hash.ToString()); |
|
1146 |
LogCvmfs(kLogQuota, kLogDebug, "reserve %d bytes for %s", |
||
1147 |
14 |
size, hash_str.c_str()); |
|
1148 |
|||
1149 |
✓✓ | 14 |
if (quota_mgr->pinned_chunks_.find(hash) == |
1150 |
quota_mgr->pinned_chunks_.end()) |
||
1151 |
{ |
||
1152 |
✓✓ | 12 |
if ((quota_mgr->pinned_ + size) > quota_mgr->cleanup_threshold_) { |
1153 |
LogCvmfs(kLogQuota, kLogDebug, |
||
1154 |
1 |
"failed to insert %s (pinned), no space", hash_str.c_str()); |
|
1155 |
1 |
success = false; |
|
1156 |
} else { |
||
1157 |
11 |
quota_mgr->pinned_chunks_[hash] = size; |
|
1158 |
11 |
quota_mgr->pinned_ += size; |
|
1159 |
11 |
quota_mgr->CheckHighPinWatermark(); |
|
1160 |
} |
||
1161 |
} |
||
1162 |
|||
1163 |
14 |
WritePipe(return_pipe, &success, sizeof(success)); |
|
1164 |
14 |
quota_mgr->UnbindReturnPipe(return_pipe); |
|
1165 |
14 |
continue; |
|
1166 |
} |
||
1167 |
|||
1168 |
// Back channels are also handled out of band |
||
1169 |
✓✓ | 128 |
if (command_type == kRegisterBackChannel) { |
1170 |
int return_pipe = |
||
1171 |
4 |
quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe); |
|
1172 |
✗✓ | 4 |
if (return_pipe < 0) |
1173 |
continue; |
||
1174 |
|||
1175 |
4 |
quota_mgr->UnlinkReturnPipe(command_buffer[num_commands].return_pipe); |
|
1176 |
4 |
Block2Nonblock(return_pipe); // back channels are opportunistic |
|
1177 |
4 |
shash::Md5 hash; |
|
1178 |
memcpy(hash.digest, command_buffer[num_commands].digest, |
||
1179 |
4 |
shash::kDigestSizes[shash::kMd5]); |
|
1180 |
|||
1181 |
4 |
quota_mgr->LockBackChannels(); |
|
1182 |
map<shash::Md5, int>::const_iterator iter = |
||
1183 |
4 |
quota_mgr->back_channels_.find(hash); |
|
1184 |
✗✓ | 4 |
if (iter != quota_mgr->back_channels_.end()) { |
1185 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn, |
||
1186 |
"closing left-over back channel %s", hash.ToString().c_str()); |
||
1187 |
close(iter->second); |
||
1188 |
} |
||
1189 |
4 |
quota_mgr->back_channels_[hash] = return_pipe; |
|
1190 |
4 |
quota_mgr->UnlockBackChannels(); |
|
1191 |
|||
1192 |
4 |
char success = 'S'; |
|
1193 |
4 |
WritePipe(return_pipe, &success, sizeof(success)); |
|
1194 |
LogCvmfs(kLogQuota, kLogDebug, "register back channel %s on fd %d", |
||
1195 |
4 |
hash.ToString().c_str(), return_pipe); |
|
1196 |
|||
1197 |
4 |
continue; |
|
1198 |
} |
||
1199 |
|||
1200 |
✓✓ | 124 |
if (command_type == kUnregisterBackChannel) { |
1201 |
2 |
shash::Md5 hash; |
|
1202 |
memcpy(hash.digest, command_buffer[num_commands].digest, |
||
1203 |
2 |
shash::kDigestSizes[shash::kMd5]); |
|
1204 |
|||
1205 |
2 |
quota_mgr->LockBackChannels(); |
|
1206 |
map<shash::Md5, int>::iterator iter = |
||
1207 |
2 |
quota_mgr->back_channels_.find(hash); |
|
1208 |
✓✗ | 2 |
if (iter != quota_mgr->back_channels_.end()) { |
1209 |
LogCvmfs(kLogQuota, kLogDebug, |
||
1210 |
2 |
"closing back channel %s", hash.ToString().c_str()); |
|
1211 |
2 |
close(iter->second); |
|
1212 |
2 |
quota_mgr->back_channels_.erase(iter); |
|
1213 |
} else { |
||
1214 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn, |
||
1215 |
"did not find back channel %s", hash.ToString().c_str()); |
||
1216 |
} |
||
1217 |
2 |
quota_mgr->UnlockBackChannels(); |
|
1218 |
|||
1219 |
2 |
continue; |
|
1220 |
} |
||
1221 |
|||
1222 |
// Unpinnings are also handled immediately with respect to the pinned gauge |
||
1223 |
✓✓ | 122 |
if (command_type == kUnpin) { |
1224 |
2 |
const shash::Any hash = command_buffer[num_commands].RetrieveHash(); |
|
1225 |
2 |
const string hash_str(hash.ToString()); |
|
1226 |
|||
1227 |
map<shash::Any, uint64_t>::iterator iter = |
||
1228 |
2 |
quota_mgr->pinned_chunks_.find(hash); |
|
1229 |
✓✗ | 2 |
if (iter != quota_mgr->pinned_chunks_.end()) { |
1230 |
2 |
quota_mgr->pinned_ -= iter->second; |
|
1231 |
2 |
quota_mgr->pinned_chunks_.erase(iter); |
|
1232 |
// It can happen that files get pinned that were removed from the cache |
||
1233 |
// (see cache.cc). We fix this at this point, where we remove such |
||
1234 |
// entries from the cache database. |
||
1235 |
✓✓ | 2 |
if (!FileExists(quota_mgr->cache_dir_ + "/" + |
1236 |
hash.MakePathWithoutSuffix())) |
||
1237 |
{ |
||
1238 |
LogCvmfs(kLogQuota, kLogDebug, |
||
1239 |
"remove orphaned pinned hash %s from cache database", |
||
1240 |
1 |
hash_str.c_str()); |
|
1241 |
sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0], |
||
1242 |
1 |
hash_str.length(), SQLITE_STATIC); |
|
1243 |
int retval; |
||
1244 |
✓✗ | 1 |
if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) { |
1245 |
1 |
uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0); |
|
1246 |
sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]), |
||
1247 |
1 |
hash_str.length(), SQLITE_STATIC); |
|
1248 |
1 |
retval = sqlite3_step(quota_mgr->stmt_rm_); |
|
1249 |
✗✓✗✗ |
2 |
if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) { |
1250 |
1 |
quota_mgr->gauge_ -= size; |
|
1251 |
} else { |
||
1252 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
1253 |
"failed to delete %s (%d)", hash_str.c_str(), retval); |
||
1254 |
} |
||
1255 |
1 |
sqlite3_reset(quota_mgr->stmt_rm_); |
|
1256 |
} |
||
1257 |
1 |
sqlite3_reset(quota_mgr->stmt_size_); |
|
1258 |
} |
||
1259 |
} else { |
||
1260 |
LogCvmfs(kLogQuota, kLogDebug, "this chunk was not pinned"); |
||
1261 |
} |
||
1262 |
} |
||
1263 |
|||
1264 |
// Immediate commands trigger flushing of the buffer |
||
1265 |
bool immediate_command = (command_type == kCleanup) || |
||
1266 |
(command_type == kList) || (command_type == kListPinned) || |
||
1267 |
(command_type == kListCatalogs) || (command_type == kListVolatile) || |
||
1268 |
(command_type == kRemove) || (command_type == kStatus) || |
||
1269 |
✓✓✓✓ ✓✓✓✓ ✓✓✓✓ ✓✓✓✗ ✗✓ |
122 |
(command_type == kLimits) || (command_type == kPid); |
1270 |
✓✓ | 122 |
if (!immediate_command) num_commands++; |
1271 |
|||
1272 |
✓✗✓✓ |
122 |
if ((num_commands == kCommandBufferSize) || immediate_command) |
1273 |
{ |
||
1274 |
quota_mgr->ProcessCommandBunch(num_commands, command_buffer, |
||
1275 |
65 |
description_buffer); |
|
1276 |
✗✓ | 65 |
if (!immediate_command) num_commands = 0; |
1277 |
} |
||
1278 |
|||
1279 |
✓✓ | 122 |
if (immediate_command) { |
1280 |
// Process cleanup, listings |
||
1281 |
int return_pipe = |
||
1282 |
65 |
quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe); |
|
1283 |
✗✓ | 65 |
if (return_pipe < 0) { |
1284 |
num_commands = 0; |
||
1285 |
continue; |
||
1286 |
} |
||
1287 |
|||
1288 |
int retval; |
||
1289 |
65 |
sqlite3_stmt *this_stmt_list = NULL; |
|
1290 |
✓✓✓✓ ✓✓✓✗ ✗✗ |
65 |
switch (command_type) { |
1291 |
case kRemove: { |
||
1292 |
3 |
const shash::Any hash = command_buffer[num_commands].RetrieveHash(); |
|
1293 |
3 |
const string hash_str = hash.ToString(); |
|
1294 |
LogCvmfs(kLogQuota, kLogDebug, "manually removing %s", |
||
1295 |
3 |
hash_str.c_str()); |
|
1296 |
3 |
bool success = false; |
|
1297 |
|||
1298 |
sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0], |
||
1299 |
3 |
hash_str.length(), SQLITE_STATIC); |
|
1300 |
int retval; |
||
1301 |
✓✓ | 3 |
if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) { |
1302 |
2 |
uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0); |
|
1303 |
2 |
uint64_t is_pinned = sqlite3_column_int64(quota_mgr->stmt_size_, 1); |
|
1304 |
|||
1305 |
sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]), |
||
1306 |
2 |
hash_str.length(), SQLITE_STATIC); |
|
1307 |
2 |
retval = sqlite3_step(quota_mgr->stmt_rm_); |
|
1308 |
✗✓✗✗ |
4 |
if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) { |
1309 |
2 |
success = true; |
|
1310 |
2 |
quota_mgr->gauge_ -= size; |
|
1311 |
✓✓ | 2 |
if (is_pinned) { |
1312 |
1 |
quota_mgr->pinned_chunks_.erase(hash); |
|
1313 |
1 |
quota_mgr->pinned_ -= size; |
|
1314 |
} |
||
1315 |
} else { |
||
1316 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
1317 |
"failed to delete %s (%d)", hash_str.c_str(), retval); |
||
1318 |
} |
||
1319 |
2 |
sqlite3_reset(quota_mgr->stmt_rm_); |
|
1320 |
} else { |
||
1321 |
// File does not exist |
||
1322 |
1 |
success = true; |
|
1323 |
} |
||
1324 |
3 |
sqlite3_reset(quota_mgr->stmt_size_); |
|
1325 |
|||
1326 |
3 |
WritePipe(return_pipe, &success, sizeof(success)); |
|
1327 |
break; } |
||
1328 |
case kCleanup: |
||
1329 |
9 |
retval = quota_mgr->DoCleanup(size); |
|
1330 |
9 |
WritePipe(return_pipe, &retval, sizeof(retval)); |
|
1331 |
9 |
break; |
|
1332 |
case kList: |
||
1333 |
✓✗ | 21 |
if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_; |
1334 |
case kListPinned: |
||
1335 |
✓✓ | 29 |
if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_pinned_; |
1336 |
case kListCatalogs: |
||
1337 |
✓✓ | 32 |
if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_catalogs_; |
1338 |
case kListVolatile: |
||
1339 |
✓✓ | 35 |
if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_volatile_; |
1340 |
|||
1341 |
// Pipe back the list, one by one |
||
1342 |
int length; |
||
1343 |
✓✓ | 35 |
while (sqlite3_step(this_stmt_list) == SQLITE_ROW) { |
1344 |
51 |
string path = "(NULL)"; |
|
1345 |
✓✗ | 51 |
if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) { |
1346 |
path = string( |
||
1347 |
reinterpret_cast<const char *>( |
||
1348 |
51 |
sqlite3_column_text(this_stmt_list, 0))); |
|
1349 |
} |
||
1350 |
51 |
length = path.length(); |
|
1351 |
51 |
WritePipe(return_pipe, &length, sizeof(length)); |
|
1352 |
✓✗ | 51 |
if (length > 0) |
1353 |
51 |
WritePipe(return_pipe, &path[0], length); |
|
1354 |
} |
||
1355 |
35 |
length = -1; |
|
1356 |
35 |
WritePipe(return_pipe, &length, sizeof(length)); |
|
1357 |
35 |
sqlite3_reset(this_stmt_list); |
|
1358 |
35 |
break; |
|
1359 |
case kStatus: |
||
1360 |
18 |
WritePipe(return_pipe, "a_mgr->gauge_, sizeof(quota_mgr->gauge_)); |
|
1361 |
WritePipe(return_pipe, "a_mgr->pinned_, |
||
1362 |
18 |
sizeof(quota_mgr->pinned_)); |
|
1363 |
18 |
break; |
|
1364 |
case kLimits: |
||
1365 |
WritePipe(return_pipe, "a_mgr->limit_, sizeof(quota_mgr->limit_)); |
||
1366 |
WritePipe(return_pipe, "a_mgr->cleanup_threshold_, |
||
1367 |
sizeof(quota_mgr->cleanup_threshold_)); |
||
1368 |
break; |
||
1369 |
case kPid: { |
||
1370 |
pid_t pid = getpid(); |
||
1371 |
WritePipe(return_pipe, &pid, sizeof(pid)); |
||
1372 |
break; |
||
1373 |
} |
||
1374 |
default: |
||
1375 |
abort(); // other types are handled by the bunch processor |
||
1376 |
} |
||
1377 |
65 |
quota_mgr->UnbindReturnPipe(return_pipe); |
|
1378 |
65 |
num_commands = 0; |
|
1379 |
} |
||
1380 |
} |
||
1381 |
|||
1382 |
30 |
LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager (%d)", errno); |
|
1383 |
30 |
close(quota_mgr->pipe_lru_[0]); |
|
1384 |
quota_mgr->ProcessCommandBunch(num_commands, command_buffer, |
||
1385 |
30 |
description_buffer); |
|
1386 |
|||
1387 |
// Unpin |
||
1388 |
30 |
command_buffer[0].command_type = kTouch; |
|
1389 |
✓✓ | 40 |
for (map<shash::Any, uint64_t>::const_iterator i = |
1390 |
30 |
quota_mgr->pinned_chunks_.begin(), |
|
1391 |
30 |
iEnd = quota_mgr->pinned_chunks_.end(); i != iEnd; ++i) |
|
1392 |
{ |
||
1393 |
10 |
command_buffer[0].StoreHash(i->first); |
|
1394 |
10 |
quota_mgr->ProcessCommandBunch(1, command_buffer, description_buffer); |
|
1395 |
} |
||
1396 |
|||
1397 |
30 |
return NULL; |
|
1398 |
} |
||
1399 |
|||
1400 |
|||
1401 |
91 |
void PosixQuotaManager::MakeReturnPipe(int pipe[2]) { |
|
1402 |
✓✓ | 91 |
if (!shared_) { |
1403 |
88 |
MakePipe(pipe); |
|
1404 |
88 |
return; |
|
1405 |
} |
||
1406 |
|||
1407 |
// Create FIFO in cache directory, store path name (number) in pipe write end |
||
1408 |
3 |
int i = 0; |
|
1409 |
int retval; |
||
1410 |
✓✓✓✗ ✓✓ |
4 |
do { |
1411 |
4 |
retval = mkfifo((workspace_dir_ + "/pipe" + StringifyInt(i)).c_str(), 0600); |
|
1412 |
4 |
pipe[1] = i; |
|
1413 |
4 |
i++; |
|
1414 |
} while ((retval == -1) && (errno == EEXIST)); |
||
1415 |
✗✓ | 3 |
assert(retval == 0); |
1416 |
|||
1417 |
// Connect reader's end |
||
1418 |
3 |
pipe[0] = open((workspace_dir_ + "/pipe" + StringifyInt(pipe[1])).c_str(), |
|
1419 |
3 |
O_RDONLY | O_NONBLOCK); |
|
1420 |
✗✓ | 3 |
assert(pipe[0] >= 0); |
1421 |
3 |
Nonblock2Block(pipe[0]); |
|
1422 |
} |
||
1423 |
|||
1424 |
|||
1425 |
115 |
void PosixQuotaManager::ParseDirectories( |
|
1426 |
const std::string cache_workspace, |
||
1427 |
std::string *cache_dir, |
||
1428 |
std::string *workspace_dir) |
||
1429 |
{ |
||
1430 |
115 |
vector<string> dir_tokens(SplitString(cache_workspace, ':')); |
|
1431 |
✓✓✗ | 115 |
switch (dir_tokens.size()) { |
1432 |
case 1: |
||
1433 |
113 |
*cache_dir = *workspace_dir = dir_tokens[0]; |
|
1434 |
113 |
break; |
|
1435 |
case 2: |
||
1436 |
2 |
*cache_dir = dir_tokens[0]; |
|
1437 |
2 |
*workspace_dir = dir_tokens[1]; |
|
1438 |
2 |
break; |
|
1439 |
default: |
||
1440 |
abort(); |
||
1441 |
115 |
} |
|
1442 |
115 |
} |
|
1443 |
|||
1444 |
|||
1445 |
/** |
||
1446 |
* Immediately inserts a new pinned catalog. Does cache cleanup if necessary. |
||
1447 |
* |
||
1448 |
* \return True on success, false otherwise |
||
1449 |
*/ |
||
1450 |
54 |
bool PosixQuotaManager::Pin( |
|
1451 |
const shash::Any &hash, |
||
1452 |
const uint64_t size, |
||
1453 |
const string &description, |
||
1454 |
const bool is_catalog) |
||
1455 |
{ |
||
1456 |
✓✓✗✓ |
54 |
assert((size > 0) || !is_catalog); |
1457 |
|||
1458 |
54 |
const string hash_str = hash.ToString(); |
|
1459 |
LogCvmfs(kLogQuota, kLogDebug, "pin into lru %s, path %s", |
||
1460 |
54 |
hash_str.c_str(), description.c_str()); |
|
1461 |
|||
1462 |
// Has to run when not yet spawned (cvmfs initialization) |
||
1463 |
✓✓ | 54 |
if (!spawned_) { |
1464 |
// Code duplication here |
||
1465 |
✓✓ | 40 |
if (pinned_chunks_.find(hash) == pinned_chunks_.end()) { |
1466 |
✓✓ | 24 |
if (pinned_ + size > cleanup_threshold_) { |
1467 |
LogCvmfs(kLogQuota, kLogDebug, "failed to insert %s (pinned), no space", |
||
1468 |
2 |
hash_str.c_str()); |
|
1469 |
2 |
return false; |
|
1470 |
} else { |
||
1471 |
22 |
pinned_chunks_[hash] = size; |
|
1472 |
22 |
pinned_ += size; |
|
1473 |
22 |
CheckHighPinWatermark(); |
|
1474 |
} |
||
1475 |
} |
||
1476 |
38 |
bool exists = Contains(hash_str); |
|
1477 |
✓✓✓✓ |
38 |
if (!exists && (gauge_ + size > limit_)) { |
1478 |
LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu", |
||
1479 |
1 |
gauge_, size); |
|
1480 |
1 |
int retval = DoCleanup(cleanup_threshold_); |
|
1481 |
✗✓ | 1 |
assert(retval != 0); |
1482 |
} |
||
1483 |
sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(), |
||
1484 |
38 |
SQLITE_STATIC); |
|
1485 |
38 |
sqlite3_bind_int64(stmt_new_, 2, size); |
|
1486 |
38 |
sqlite3_bind_int64(stmt_new_, 3, seq_++); |
|
1487 |
sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(), |
||
1488 |
38 |
SQLITE_STATIC); |
|
1489 |
✓✓ | 38 |
sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular); |
1490 |
38 |
sqlite3_bind_int64(stmt_new_, 6, 1); |
|
1491 |
38 |
int retval = sqlite3_step(stmt_new_); |
|
1492 |
✗✓✗✗ |
38 |
assert((retval == SQLITE_DONE) || (retval == SQLITE_OK)); |
1493 |
38 |
sqlite3_reset(stmt_new_); |
|
1494 |
✓✓ | 38 |
if (!exists) gauge_ += size; |
1495 |
38 |
return true; |
|
1496 |
} |
||
1497 |
|||
1498 |
int pipe_reserve[2]; |
||
1499 |
14 |
MakeReturnPipe(pipe_reserve); |
|
1500 |
|||
1501 |
14 |
LruCommand cmd; |
|
1502 |
14 |
cmd.command_type = kReserve; |
|
1503 |
14 |
cmd.SetSize(size); |
|
1504 |
14 |
cmd.StoreHash(hash); |
|
1505 |
14 |
cmd.return_pipe = pipe_reserve[1]; |
|
1506 |
14 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
1507 |
bool result; |
||
1508 |
14 |
ReadHalfPipe(pipe_reserve[0], &result, sizeof(result)); |
|
1509 |
14 |
CloseReturnPipe(pipe_reserve); |
|
1510 |
|||
1511 |
✓✓ | 14 |
if (!result) return false; |
1512 |
✓✓ | 13 |
DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular); |
1513 |
|||
1514 |
13 |
return true; |
|
1515 |
} |
||
1516 |
|||
1517 |
|||
1518 |
111 |
PosixQuotaManager::PosixQuotaManager( |
|
1519 |
const uint64_t limit, |
||
1520 |
const uint64_t cleanup_threshold, |
||
1521 |
const string &cache_workspace) |
||
1522 |
: shared_(false) |
||
1523 |
, spawned_(false) |
||
1524 |
, limit_(limit) |
||
1525 |
, cleanup_threshold_(cleanup_threshold) |
||
1526 |
, gauge_(0) |
||
1527 |
, pinned_(0) |
||
1528 |
, seq_(0) |
||
1529 |
, cache_dir_() // initialized in body |
||
1530 |
, workspace_dir_() // initialized in body |
||
1531 |
, fd_lock_cachedb_(-1) |
||
1532 |
, async_delete_(true) |
||
1533 |
, database_(NULL) |
||
1534 |
, stmt_touch_(NULL) |
||
1535 |
, stmt_unpin_(NULL) |
||
1536 |
, stmt_block_(NULL) |
||
1537 |
, stmt_unblock_(NULL) |
||
1538 |
, stmt_new_(NULL) |
||
1539 |
, stmt_lru_(NULL) |
||
1540 |
, stmt_size_(NULL) |
||
1541 |
, stmt_rm_(NULL) |
||
1542 |
, stmt_list_(NULL) |
||
1543 |
, stmt_list_pinned_(NULL) |
||
1544 |
, stmt_list_catalogs_(NULL) |
||
1545 |
, stmt_list_volatile_(NULL) |
||
1546 |
111 |
, initialized_(false) |
|
1547 |
{ |
||
1548 |
111 |
ParseDirectories(cache_workspace, &cache_dir_, &workspace_dir_); |
|
1549 |
111 |
pipe_lru_[0] = pipe_lru_[1] = -1; |
|
1550 |
111 |
cleanup_recorder_.AddRecorder(1, 90); // last 1.5 min with second resolution |
|
1551 |
// last 1.5 h with minute resolution |
||
1552 |
111 |
cleanup_recorder_.AddRecorder(60, 90*60); |
|
1553 |
// last 18 hours with 20 min resolution |
||
1554 |
111 |
cleanup_recorder_.AddRecorder(20*60, 60*60*18); |
|
1555 |
// last 4 days with hour resolution |
||
1556 |
111 |
cleanup_recorder_.AddRecorder(60*60, 60*60*24*4); |
|
1557 |
} |
||
1558 |
|||
1559 |
|||
1560 |
218 |
PosixQuotaManager::~PosixQuotaManager() { |
|
1561 |
✓✓ | 109 |
if (!initialized_) return; |
1562 |
|||
1563 |
✗✓ | 104 |
if (shared_) { |
1564 |
// Most of cleanup is done elsewhen by shared cache manager |
||
1565 |
close(pipe_lru_[1]); |
||
1566 |
return; |
||
1567 |
} |
||
1568 |
|||
1569 |
✓✓ | 104 |
if (spawned_) { |
1570 |
30 |
char fin = 0; |
|
1571 |
30 |
WritePipe(pipe_lru_[1], &fin, 1); |
|
1572 |
30 |
close(pipe_lru_[1]); |
|
1573 |
30 |
pthread_join(thread_lru_, NULL); |
|
1574 |
} else { |
||
1575 |
74 |
ClosePipe(pipe_lru_); |
|
1576 |
} |
||
1577 |
|||
1578 |
104 |
CloseDatabase(); |
|
1579 |
✓✓✓✓ ✓✓✓✓ ✗✓ |
109 |
} |
1580 |
|||
1581 |
|||
1582 |
105 |
void PosixQuotaManager::ProcessCommandBunch( |
|
1583 |
const unsigned num, |
||
1584 |
const LruCommand *commands, |
||
1585 |
const char *descriptions) |
||
1586 |
{ |
||
1587 |
105 |
int retval = sqlite3_exec(database_, "BEGIN", NULL, NULL, NULL); |
|
1588 |
✗✓ | 105 |
assert(retval == SQLITE_OK); |
1589 |
|||
1590 |
✓✓ | 172 |
for (unsigned i = 0; i < num; ++i) { |
1591 |
67 |
const shash::Any hash = commands[i].RetrieveHash(); |
|
1592 |
67 |
const string hash_str = hash.ToString(); |
|
1593 |
67 |
const unsigned size = commands[i].GetSize(); |
|
1594 |
LogCvmfs(kLogQuota, kLogDebug, "processing %s (%d)", |
||
1595 |
67 |
hash_str.c_str(), commands[i].command_type); |
|
1596 |
|||
1597 |
bool exists; |
||
1598 |
✓✓✓✗ |
67 |
switch (commands[i].command_type) { |
1599 |
case kTouch: |
||
1600 |
19 |
sqlite3_bind_int64(stmt_touch_, 1, seq_++); |
|
1601 |
sqlite3_bind_text(stmt_touch_, 2, &hash_str[0], hash_str.length(), |
||
1602 |
19 |
SQLITE_STATIC); |
|
1603 |
19 |
retval = sqlite3_step(stmt_touch_); |
|
1604 |
LogCvmfs(kLogQuota, kLogDebug, "touching %s (%ld): %d", |
||
1605 |
19 |
hash_str.c_str(), seq_-1, retval); |
|
1606 |
✗✓✗✗ |
19 |
if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) { |
1607 |
LogCvmfs(kLogQuota, kLogSyslogErr, |
||
1608 |
"failed to update %s in cachedb, error %d", |
||
1609 |
hash_str.c_str(), retval); |
||
1610 |
abort(); |
||
1611 |
} |
||
1612 |
19 |
sqlite3_reset(stmt_touch_); |
|
1613 |
19 |
break; |
|
1614 |
case kUnpin: |
||
1615 |
sqlite3_bind_text(stmt_unpin_, 1, &hash_str[0], hash_str.length(), |
||
1616 |
2 |
SQLITE_STATIC); |
|
1617 |
2 |
retval = sqlite3_step(stmt_unpin_); |
|
1618 |
LogCvmfs(kLogQuota, kLogDebug, "unpinning %s: %d", |
||
1619 |
2 |
hash_str.c_str(), retval); |
|
1620 |
✗✓✗✗ |
2 |
if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) { |
1621 |
LogCvmfs(kLogQuota, kLogSyslogErr, |
||
1622 |
"failed to unpin %s in cachedb, error %d", |
||
1623 |
hash_str.c_str(), retval); |
||
1624 |
abort(); |
||
1625 |
} |
||
1626 |
2 |
sqlite3_reset(stmt_unpin_); |
|
1627 |
2 |
break; |
|
1628 |
case kPin: |
||
1629 |
case kPinRegular: |
||
1630 |
case kInsert: |
||
1631 |
case kInsertVolatile: |
||
1632 |
// It could already be in, check |
||
1633 |
46 |
exists = Contains(hash_str); |
|
1634 |
|||
1635 |
// Cleanup, move to trash and unlink |
||
1636 |
✓✓✗✓ |
46 |
if (!exists && (gauge_ + size > limit_)) { |
1637 |
LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu", |
||
1638 |
gauge_, size); |
||
1639 |
retval = DoCleanup(cleanup_threshold_); |
||
1640 |
assert(retval != 0); |
||
1641 |
} |
||
1642 |
|||
1643 |
// Insert or replace |
||
1644 |
sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(), |
||
1645 |
46 |
SQLITE_STATIC); |
|
1646 |
46 |
sqlite3_bind_int64(stmt_new_, 2, size); |
|
1647 |
✓✓ | 46 |
if (commands[i].command_type == kInsertVolatile) { |
1648 |
4 |
sqlite3_bind_int64(stmt_new_, 3, (seq_++) | kVolatileFlag); |
|
1649 |
} else { |
||
1650 |
42 |
sqlite3_bind_int64(stmt_new_, 3, seq_++); |
|
1651 |
} |
||
1652 |
sqlite3_bind_text(stmt_new_, 4, &descriptions[i*kMaxDescription], |
||
1653 |
46 |
commands[i].desc_length, SQLITE_STATIC); |
|
1654 |
46 |
sqlite3_bind_int64(stmt_new_, 5, (commands[i].command_type == kPin) ? |
|
1655 |
✓✓ | 46 |
kFileCatalog : kFileRegular); |
1656 |
sqlite3_bind_int64(stmt_new_, 6, |
||
1657 |
46 |
((commands[i].command_type == kPin) || |
|
1658 |
✓✓✓✓ |
46 |
(commands[i].command_type == kPinRegular)) ? 1 : 0); |
1659 |
46 |
retval = sqlite3_step(stmt_new_); |
|
1660 |
LogCvmfs(kLogQuota, kLogDebug, "insert or replace %s, method %d: %d", |
||
1661 |
46 |
hash_str.c_str(), commands[i].command_type, retval); |
|
1662 |
✗✓✗✗ |
46 |
if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) { |
1663 |
LogCvmfs(kLogQuota, kLogSyslogErr, |
||
1664 |
"failed to insert %s in cachedb, error %d", |
||
1665 |
hash_str.c_str(), retval); |
||
1666 |
abort(); |
||
1667 |
} |
||
1668 |
46 |
sqlite3_reset(stmt_new_); |
|
1669 |
|||
1670 |
✓✓ | 46 |
if (!exists) gauge_ += size; |
1671 |
46 |
break; |
|
1672 |
default: |
||
1673 |
abort(); // other types should have been taken care of by event loop |
||
1674 |
} |
||
1675 |
} |
||
1676 |
|||
1677 |
105 |
retval = sqlite3_exec(database_, "COMMIT", NULL, NULL, NULL); |
|
1678 |
✗✓ | 105 |
if (retval != SQLITE_OK) { |
1679 |
LogCvmfs(kLogQuota, kLogSyslogErr, |
||
1680 |
"failed to commit to cachedb, error %d", retval); |
||
1681 |
abort(); |
||
1682 |
} |
||
1683 |
105 |
} |
|
1684 |
|||
1685 |
|||
1686 |
107 |
bool PosixQuotaManager::RebuildDatabase() { |
|
1687 |
107 |
bool result = false; |
|
1688 |
107 |
string sql; |
|
1689 |
107 |
sqlite3_stmt *stmt_select = NULL; |
|
1690 |
107 |
sqlite3_stmt *stmt_insert = NULL; |
|
1691 |
int sqlerr; |
||
1692 |
107 |
int seq = 0; |
|
1693 |
char hex[4]; |
||
1694 |
struct stat info; |
||
1695 |
platform_dirent64 *d; |
||
1696 |
107 |
DIR *dirp = NULL; |
|
1697 |
107 |
string path; |
|
1698 |
|||
1699 |
107 |
LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, "re-building cache database"); |
|
1700 |
|||
1701 |
// Empty cache catalog and fscache |
||
1702 |
107 |
sql = "DELETE FROM cache_catalog; DELETE FROM fscache;"; |
|
1703 |
107 |
sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL); |
|
1704 |
✗✓ | 107 |
if (sqlerr != SQLITE_OK) { |
1705 |
LogCvmfs(kLogQuota, kLogDebug, "could not clear cache database"); |
||
1706 |
goto build_return; |
||
1707 |
} |
||
1708 |
|||
1709 |
107 |
gauge_ = 0; |
|
1710 |
|||
1711 |
// Insert files from cache sub-directories 00 - ff |
||
1712 |
// TODO(jblomer): fs_traversal |
||
1713 |
sqlite3_prepare_v2(database_, "INSERT INTO fscache (sha1, size, actime) " |
||
1714 |
107 |
"VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL); |
|
1715 |
|||
1716 |
✓✓ | 26731 |
for (int i = 0; i <= 0xff; i++) { |
1717 |
26627 |
snprintf(hex, sizeof(hex), "%02x", i); |
|
1718 |
26627 |
path = cache_dir_ + "/" + string(hex); |
|
1719 |
✓✓ | 26627 |
if ((dirp = opendir(path.c_str())) == NULL) { |
1720 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
1721 |
"failed to open directory %s (tmpwatch interfering?)", |
||
1722 |
3 |
path.c_str()); |
|
1723 |
3 |
goto build_return; |
|
1724 |
} |
||
1725 |
✓✗✓✓ |
26624 |
while ((d = platform_readdir(dirp)) != NULL) { |
1726 |
53250 |
string file_path = path + "/" + string(d->d_name); |
|
1727 |
✓✗ | 53250 |
if (stat(file_path.c_str(), &info) == 0) { |
1728 |
✓✓ | 53250 |
if (!S_ISREG(info.st_mode)) |
1729 |
53248 |
continue; |
|
1730 |
✓✓ | 2 |
if (info.st_size == 0) { |
1731 |
LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, |
||
1732 |
"removing empty file %s during automatic cache db rebuild", |
||
1733 |
1 |
file_path.c_str()); |
|
1734 |
1 |
unlink(file_path.c_str()); |
|
1735 |
1 |
continue; |
|
1736 |
} |
||
1737 |
|||
1738 |
1 |
string hash = string(hex) + string(d->d_name); |
|
1739 |
sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(), |
||
1740 |
1 |
SQLITE_STATIC); |
|
1741 |
1 |
sqlite3_bind_int64(stmt_insert, 2, info.st_size); |
|
1742 |
1 |
sqlite3_bind_int64(stmt_insert, 3, info.st_atime); |
|
1743 |
✗✓ | 1 |
if (sqlite3_step(stmt_insert) != SQLITE_DONE) { |
1744 |
LogCvmfs(kLogQuota, kLogDebug, "could not insert into temp table"); |
||
1745 |
goto build_return; |
||
1746 |
} |
||
1747 |
1 |
sqlite3_reset(stmt_insert); |
|
1748 |
|||
1749 |
✗✓ | 1 |
gauge_ += info.st_size; |
1750 |
} else { |
||
1751 |
LogCvmfs(kLogQuota, kLogDebug, "could not stat %s", file_path.c_str()); |
||
1752 |
} |
||
1753 |
} |
||
1754 |
26624 |
closedir(dirp); |
|
1755 |
26624 |
dirp = NULL; |
|
1756 |
} |
||
1757 |
104 |
sqlite3_finalize(stmt_insert); |
|
1758 |
104 |
stmt_insert = NULL; |
|
1759 |
|||
1760 |
// Transfer from temp table in cache catalog |
||
1761 |
sqlite3_prepare_v2(database_, |
||
1762 |
"SELECT sha1, size FROM fscache ORDER BY actime;", |
||
1763 |
104 |
-1, &stmt_select, NULL); |
|
1764 |
sqlite3_prepare_v2(database_, |
||
1765 |
"INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) " |
||
1766 |
"VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);", |
||
1767 |
104 |
-1, &stmt_insert, NULL); |
|
1768 |
✓✗✓✓ |
104 |
while (sqlite3_step(stmt_select) == SQLITE_ROW) { |
1769 |
const string hash = string( |
||
1770 |
1 |
reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0))); |
|
1771 |
1 |
sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC); |
|
1772 |
1 |
sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1)); |
|
1773 |
1 |
sqlite3_bind_int64(stmt_insert, 3, seq++); |
|
1774 |
// Might also be a catalog (information is lost) |
||
1775 |
1 |
sqlite3_bind_int64(stmt_insert, 4, kFileRegular); |
|
1776 |
|||
1777 |
1 |
int retval = sqlite3_step(stmt_insert); |
|
1778 |
✗✓ | 1 |
if (retval != SQLITE_DONE) { |
1779 |
// If the file system hosting the cache is full, we'll likely notice here |
||
1780 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
1781 |
"could not insert into cache catalog (%d - %s)", |
||
1782 |
retval, sqlite3_errstr(retval)); |
||
1783 |
goto build_return; |
||
1784 |
} |
||
1785 |
1 |
sqlite3_reset(stmt_insert); |
|
1786 |
} |
||
1787 |
|||
1788 |
// Delete temporary table |
||
1789 |
104 |
sql = "DELETE FROM fscache;"; |
|
1790 |
104 |
sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL); |
|
1791 |
✗✓ | 104 |
if (sqlerr != SQLITE_OK) { |
1792 |
LogCvmfs(kLogQuota, kLogDebug, "could not clear temporary table (%d)", |
||
1793 |
sqlerr); |
||
1794 |
goto build_return; |
||
1795 |
} |
||
1796 |
|||
1797 |
104 |
seq_ = seq; |
|
1798 |
104 |
result = true; |
|
1799 |
LogCvmfs(kLogQuota, kLogDebug, |
||
1800 |
"rebuilding finished, seqence %" PRIu64 ", gauge %" PRIu64, |
||
1801 |
104 |
seq_, gauge_); |
|
1802 |
|||
1803 |
build_return: |
||
1804 |
✓✗ | 107 |
if (stmt_insert) sqlite3_finalize(stmt_insert); |
1805 |
✓✓ | 107 |
if (stmt_select) sqlite3_finalize(stmt_select); |
1806 |
✗✓ | 107 |
if (dirp) closedir(dirp); |
1807 |
107 |
return result; |
|
1808 |
} |
||
1809 |
|||
1810 |
|||
1811 |
/** |
||
1812 |
* Register a channel that allows the cache manager to trigger action to its |
||
1813 |
* clients. Currently used for releasing pinned catalogs. |
||
1814 |
*/ |
||
1815 |
4 |
void PosixQuotaManager::RegisterBackChannel( |
|
1816 |
int back_channel[2], |
||
1817 |
const string &channel_id) |
||
1818 |
{ |
||
1819 |
✓✗ | 4 |
if (protocol_revision_ >= 1) { |
1820 |
4 |
shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id)); |
|
1821 |
4 |
MakeReturnPipe(back_channel); |
|
1822 |
|||
1823 |
4 |
LruCommand cmd; |
|
1824 |
4 |
cmd.command_type = kRegisterBackChannel; |
|
1825 |
4 |
cmd.return_pipe = back_channel[1]; |
|
1826 |
// Not StoreHash(). This is an MD5 hash. |
||
1827 |
4 |
memcpy(cmd.digest, hash.digest, hash.GetDigestSize()); |
|
1828 |
4 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
1829 |
|||
1830 |
char success; |
||
1831 |
4 |
ReadHalfPipe(back_channel[0], &success, sizeof(success)); |
|
1832 |
// At this point, the named FIFO is unlinked, so don't use CloseReturnPipe |
||
1833 |
✗✓ | 4 |
if (success != 'S') { |
1834 |
LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, |
||
1835 |
"failed to register quota back channel (%c)", success); |
||
1836 |
abort(); |
||
1837 |
} |
||
1838 |
} else { |
||
1839 |
// Dummy pipe to return valid file descriptors |
||
1840 |
MakePipe(back_channel); |
||
1841 |
} |
||
1842 |
4 |
} |
|
1843 |
|||
1844 |
|||
1845 |
/** |
||
1846 |
* Removes a chunk from cache, if it exists. |
||
1847 |
*/ |
||
1848 |
3 |
void PosixQuotaManager::Remove(const shash::Any &hash) { |
|
1849 |
3 |
string hash_str = hash.ToString(); |
|
1850 |
|||
1851 |
int pipe_remove[2]; |
||
1852 |
3 |
MakeReturnPipe(pipe_remove); |
|
1853 |
|||
1854 |
3 |
LruCommand cmd; |
|
1855 |
3 |
cmd.command_type = kRemove; |
|
1856 |
3 |
cmd.return_pipe = pipe_remove[1]; |
|
1857 |
3 |
cmd.StoreHash(hash); |
|
1858 |
3 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
1859 |
|||
1860 |
bool success; |
||
1861 |
3 |
ReadHalfPipe(pipe_remove[0], &success, sizeof(success)); |
|
1862 |
3 |
CloseReturnPipe(pipe_remove); |
|
1863 |
|||
1864 |
3 |
unlink((cache_dir_ + "/" + hash.MakePathWithoutSuffix()).c_str()); |
|
1865 |
3 |
} |
|
1866 |
|||
1867 |
|||
1868 |
32 |
void PosixQuotaManager::Spawn() { |
|
1869 |
✓✓ | 32 |
if (spawned_) |
1870 |
2 |
return; |
|
1871 |
|||
1872 |
✗✓ | 30 |
if (pthread_create(&thread_lru_, NULL, MainCommandServer, |
1873 |
static_cast<void *>(this)) != 0) |
||
1874 |
{ |
||
1875 |
LogCvmfs(kLogQuota, kLogDebug, "could not create lru thread"); |
||
1876 |
abort(); |
||
1877 |
} |
||
1878 |
|||
1879 |
30 |
spawned_ = true; |
|
1880 |
} |
||
1881 |
|||
1882 |
|||
1883 |
/** |
||
1884 |
* Updates the sequence number of the file specified by the hash. |
||
1885 |
*/ |
||
1886 |
30 |
void PosixQuotaManager::Touch(const shash::Any &hash) { |
|
1887 |
30 |
LruCommand cmd; |
|
1888 |
30 |
cmd.command_type = kTouch; |
|
1889 |
30 |
cmd.StoreHash(hash); |
|
1890 |
30 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
1891 |
30 |
} |
|
1892 |
|||
1893 |
|||
1894 |
85 |
void PosixQuotaManager::UnbindReturnPipe(int pipe_wronly) { |
|
1895 |
✓✓ | 85 |
if (shared_) |
1896 |
1 |
close(pipe_wronly); |
|
1897 |
85 |
} |
|
1898 |
|||
1899 |
|||
1900 |
7 |
void PosixQuotaManager::UnlinkReturnPipe(int pipe_wronly) { |
|
1901 |
✓✓ | 7 |
if (shared_) |
1902 |
3 |
unlink((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str()); |
|
1903 |
7 |
} |
|
1904 |
|||
1905 |
|||
1906 |
37 |
void PosixQuotaManager::Unpin(const shash::Any &hash) { |
|
1907 |
37 |
LogCvmfs(kLogQuota, kLogDebug, "Unpin %s", hash.ToString().c_str()); |
|
1908 |
|||
1909 |
37 |
LruCommand cmd; |
|
1910 |
37 |
cmd.command_type = kUnpin; |
|
1911 |
37 |
cmd.StoreHash(hash); |
|
1912 |
37 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
1913 |
37 |
} |
|
1914 |
|||
1915 |
|||
1916 |
2 |
void PosixQuotaManager::UnregisterBackChannel( |
|
1917 |
int back_channel[2], |
||
1918 |
const string &channel_id) |
||
1919 |
{ |
||
1920 |
✓✗ | 2 |
if (protocol_revision_ >= 1) { |
1921 |
2 |
shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id)); |
|
1922 |
|||
1923 |
2 |
LruCommand cmd; |
|
1924 |
2 |
cmd.command_type = kUnregisterBackChannel; |
|
1925 |
// Not StoreHash(). This is an MD5 hash. |
||
1926 |
2 |
memcpy(cmd.digest, hash.digest, hash.GetDigestSize()); |
|
1927 |
2 |
WritePipe(pipe_lru_[1], &cmd, sizeof(cmd)); |
|
1928 |
|||
1929 |
// Writer's end will be closed by cache manager, FIFO is already unlinked |
||
1930 |
2 |
close(back_channel[0]); |
|
1931 |
} else { |
||
1932 |
ClosePipe(back_channel); |
||
1933 |
} |
||
1934 |
2 |
} |
Generated by: GCOVR (Version 4.1) |