15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
18 #include "cvmfs_config.h"
31 #include <sys/statfs.h>
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
70 open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe_wronly)).c_str(),
71 O_WRONLY | O_NONBLOCK);
76 "failed to bind return pipe (%d)", errno);
83 const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84 if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86 "high watermark of pinned files (%" PRIu64
"M > %" PRIu64
"M)",
87 pinned_/(1024*1024), watermark/(1024*1024));
88 BroadcastBackchannels(
"R");
94 DIR *dirp = opendir(workspace_dir_.c_str());
98 bool found_leftovers =
false;
100 const string name = dent->d_name;
101 const string path = workspace_dir_ +
"/" + name;
106 if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) ==
"pipe")) {
107 if (!found_leftovers) {
109 "removing left-over FIFOs from cache directory");
111 found_leftovers =
true;
112 unlink(path.c_str());
127 return DoCleanup(leave_size);
131 MakeReturnPipe(pipe_cleanup);
135 cmd.
size = leave_size;
138 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
140 CloseReturnPipe(pipe_cleanup);
147 if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148 if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149 if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150 if (stmt_list_) sqlite3_finalize(stmt_list_);
151 if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152 if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153 if (stmt_size_) sqlite3_finalize(stmt_size_);
154 if (stmt_touch_) sqlite3_finalize(stmt_touch_);
155 if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
156 if (stmt_block_) sqlite3_finalize(stmt_block_);
157 if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
158 if (stmt_new_) sqlite3_finalize(stmt_new_);
159 if (database_) sqlite3_close(database_);
162 stmt_list_catalogs_ = NULL;
163 stmt_list_pinned_ = NULL;
164 stmt_list_volatile_ = NULL;
171 stmt_unblock_ = NULL;
175 pinned_chunks_.clear();
182 UnlinkReturnPipe(pipe[1]);
192 sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
194 if (sqlite3_step(stmt_size_) == SQLITE_ROW)
196 sqlite3_reset(stmt_size_);
198 hash_str.c_str(), result);
205 if ((limit_ == 0) || (gauge_ >= limit_))
208 struct statvfs vfs_info;
209 int retval = statvfs((cache_dir_ +
"/cachedb").c_str(), &vfs_info);
212 "failed to query %s for free space (%d)",
213 cache_dir_.c_str(), errno);
216 int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
218 free_space_byte / (1024 * 1024));
220 int64_t required_byte = limit_ - gauge_;
221 if (free_space_byte < required_byte) {
223 "too little free space on the file system hosting the cache,"
224 " %" PRId64
" MB available",
225 free_space_byte / (1024 * 1024));
231 const string &cache_workspace,
232 const uint64_t limit,
233 const uint64_t cleanup_threshold,
234 const bool rebuild_database)
236 if (cleanup_threshold >= limit) {
238 "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
247 delete quota_manager;
255 return quota_manager;
263 const std::string &exe_path,
264 const std::string &cache_workspace,
265 const uint64_t limit,
266 const uint64_t cleanup_threshold,
270 string workspace_dir;
271 ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
274 const int fd_lockfile =
LockFile(workspace_dir +
"/lock_cachemgr");
275 if (fd_lockfile < 0) {
277 (workspace_dir +
"/lock_cachemgr").c_str(), errno);
287 const string fifo_path = workspace_dir +
"/cachemgr";
289 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
297 "received limit %" PRIu64
", threshold %" PRIu64,
299 if (
FileExists(workspace_dir +
"/cachemgr.protocol")) {
308 const int connect_error = errno;
311 const int fd_lockfile_fifo =
LockFile(workspace_dir +
"/lock_cachemgr.fifo");
312 if (fd_lockfile_fifo < 0) {
314 (workspace_dir +
"/lock_cachemgr.fifo").c_str(), errno);
321 if (connect_error == ENXIO) {
323 unlink(fifo_path.c_str());
327 int retval = mkfifo(fifo_path.c_str(), 0600);
338 int pipe_handshake[2];
342 vector<string> command_line;
343 command_line.push_back(exe_path);
344 command_line.push_back(
"__cachemgr__");
345 command_line.push_back(cache_workspace);
347 command_line.push_back(
StringifyInt(pipe_handshake[0]));
349 command_line.push_back(
StringifyInt(cleanup_threshold));
355 set<int> preserve_filedes;
356 preserve_filedes.insert(0);
357 preserve_filedes.insert(1);
358 preserve_filedes.insert(2);
359 preserve_filedes.insert(pipe_boot[1]);
360 preserve_filedes.insert(pipe_handshake[0]);
362 retval =
ManagedExec(command_line, preserve_filedes, map<int, int>(),
false);
374 close(pipe_handshake[0]);
376 if (read(pipe_boot[0], &buf, 1) != 1) {
379 close(pipe_handshake[1]);
382 "cache manager did not start");
388 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
391 "failed to connect to newly created FIFO (%d)", errno);
392 close(pipe_handshake[1]);
400 if (write(pipe_handshake[1], &buf, 1) != 1) {
402 close(pipe_handshake[1]);
407 close(pipe_handshake[1]);
418 "threshold %" PRIu64,
425 if (gauge_ <= leave_size)
430 "clean up cache until at most %lu KB is used", leave_size/1024);
432 cleanup_recorder_.Tick();
436 vector<string> trash;
439 sqlite3_reset(stmt_lru_);
440 if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
445 hash_str = string(reinterpret_cast<const char *>(
446 sqlite3_column_text(stmt_lru_, 0)));
453 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
454 trash.push_back(cache_dir_ +
"/" + hash.MakePathWithoutSuffix());
455 gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
457 hash_str.c_str(), gauge_);
459 sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
461 result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
462 sqlite3_reset(stmt_rm_);
466 "failed to find %s in cache database (%d). "
467 "Cache database is out of sync. "
468 "Restart cvmfs with clean cache.", hash_str.c_str(), result);
472 sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
474 result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
475 sqlite3_reset(stmt_block_);
478 }
while (gauge_ > leave_size);
480 result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
481 sqlite3_reset(stmt_unblock_);
486 if (!trash.empty()) {
490 if ((pid = fork()) == 0) {
495 int max_fd = sysconf(_SC_OPEN_MAX);
496 for (
int i = 0; i < max_fd; ++i)
500 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
502 unlink(trash[i].c_str());
509 waitpid(pid, &statloc, 0);
514 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
516 unlink(trash[i].c_str());
521 if (gauge_ > leave_size) {
523 "request to clean until %" PRIu64
", "
524 "but effective gauge is %" PRIu64, leave_size, gauge_);
534 const string &description,
537 const string hash_str = hash.
ToString();
539 hash_str.c_str(), description.c_str(), command_type);
540 const unsigned desc_length = (description.length() > kMaxDescription) ?
541 kMaxDescription : description.length();
550 memcpy(reinterpret_cast<char *>(cmd)+
sizeof(
LruCommand),
551 &description[0], desc_length);
557 vector<string> result;
560 MakeReturnPipe(pipe_list);
561 char description_buffer[kMaxDescription];
566 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
572 ReadPipe(pipe_list[0], description_buffer, length);
573 result.push_back(
string(description_buffer, length));
575 }
while (length >= 0);
577 CloseReturnPipe(pipe_list);
583 if (limit_ != (uint64_t)(-1))
588 if (statfs(
".", &info) == 0) {
589 return info.f_bavail * info.f_bsize;
592 "failed to query file system info of cache (%d)", errno);
601 MakeReturnPipe(pipe_limits);
606 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
608 ReadPipe(pipe_limits[0], cleanup_threshold,
sizeof(*cleanup_threshold));
609 CloseReturnPipe(pipe_limits);
618 return limit_ - cleanup_threshold_;
629 MakeReturnPipe(pipe_pid);
634 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
636 CloseReturnPipe(pipe_pid);
642 int pipe_revision[2];
643 MakeReturnPipe(pipe_revision);
648 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
651 ReadHalfPipe(pipe_revision[0], &revision,
sizeof(revision));
652 CloseReturnPipe(pipe_revision);
662 MakeReturnPipe(pipe_status);
667 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
669 ReadPipe(pipe_status[0], pinned,
sizeof(*pinned));
670 CloseReturnPipe(pipe_status);
676 uint64_t gauge, size_pinned;
677 GetSharedStatus(&gauge, &size_pinned);
684 uint64_t gauge, size_pinned;
685 GetSharedStatus(&gauge, &size_pinned);
691 if (!
spawned_ || (protocol_revision_ < 2))
return 0;
692 uint64_t cleanup_rate;
694 int pipe_cleanup_rate[2];
695 MakeReturnPipe(pipe_cleanup_rate);
700 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
701 ReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate,
sizeof(cleanup_rate));
702 CloseReturnPipe(pipe_cleanup_rate);
712 fd_lock_cachedb_ =
LockFile(workspace_dir_ +
"/lock_cachedb");
713 if (fd_lock_cachedb_ < 0) {
719 const string db_file = cache_dir_ +
"/cachedb";
720 if (rebuild_database) {
723 unlink(db_file.c_str());
724 unlink((db_file +
"-journal").c_str());
728 int err = sqlite3_open(db_file.c_str(), &database_);
729 if (err != SQLITE_OK) {
731 goto init_database_fail;
734 sql =
"PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
735 "PRAGMA auto_vacuum=1; "
736 "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
737 " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
738 "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
739 "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
740 " ON cache_catalog (acseq); "
741 "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
742 "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
743 "CREATE INDEX idx_fscache_actime ON fscache (actime); "
744 "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
745 " CONSTRAINT pk_properties PRIMARY KEY(key));";
746 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
747 if (err != SQLITE_OK) {
750 sqlite3_close(database_);
751 unlink(db_file.c_str());
752 unlink((db_file +
"-journal").c_str());
754 "LRU database corrupted, re-building");
759 goto init_database_fail;
764 sql =
"ALTER TABLE cache_catalog ADD type INTEGER; "
765 "ALTER TABLE cache_catalog ADD pinned INTEGER";
766 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
767 if (err == SQLITE_OK) {
768 sql =
"UPDATE cache_catalog SET type=" +
StringifyInt(kFileRegular) +
";";
769 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
770 if (err != SQLITE_OK) {
772 "could not init cache database (failed: %s)", sql.c_str());
773 goto init_database_fail;
778 sql =
"UPDATE cache_catalog SET pinned=0;";
779 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
780 if (err != SQLITE_OK) {
783 goto init_database_fail;
787 sql =
"INSERT OR REPLACE INTO properties (key, value) "
788 "VALUES ('schema', '1.0')";
789 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
790 if (err != SQLITE_OK) {
793 goto init_database_fail;
797 sql =
"SELECT count(*) FROM cache_catalog;";
798 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
799 if (sqlite3_step(stmt) == SQLITE_ROW) {
800 if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
802 "CernVM-FS: building lru cache database...");
803 if (!RebuildDatabase()) {
805 "could not build cache database from file system");
806 sqlite3_finalize(stmt);
807 goto init_database_fail;
810 sqlite3_finalize(stmt);
813 sqlite3_finalize(stmt);
814 goto init_database_fail;
818 sql =
"SELECT sum(size) FROM cache_catalog;";
819 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
820 if (sqlite3_step(stmt) == SQLITE_ROW) {
821 gauge_ = sqlite3_column_int64(stmt, 0);
824 sqlite3_finalize(stmt);
825 goto init_database_fail;
827 sqlite3_finalize(stmt);
830 sql =
"SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
831 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
832 if (sqlite3_step(stmt) == SQLITE_ROW) {
833 seq_ = sqlite3_column_int64(stmt, 0)+1;
836 sqlite3_finalize(stmt);
837 goto init_database_fail;
839 sqlite3_finalize(stmt);
842 sqlite3_prepare_v2(database_,
843 "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
844 "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
845 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=0 "
846 "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
847 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=2 "
848 "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
849 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=1 "
850 "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
851 sqlite3_prepare_v2(database_,
852 "INSERT OR REPLACE INTO cache_catalog "
853 "(sha1, size, acseq, path, type, pinned) "
854 "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
855 -1, &stmt_new_, NULL);
856 sqlite3_prepare_v2(database_,
857 "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
858 -1, &stmt_size_, NULL);
859 sqlite3_prepare_v2(database_,
"DELETE FROM cache_catalog WHERE sha1=:sha1;",
860 -1, &stmt_rm_, NULL);
861 sqlite3_prepare_v2(database_,
862 "SELECT sha1, size FROM cache_catalog WHERE "
863 "acseq=(SELECT min(acseq) "
864 "FROM cache_catalog WHERE pinned<>2);",
865 -1, &stmt_lru_, NULL);
866 sqlite3_prepare_v2(database_,
867 (
"SELECT path FROM cache_catalog WHERE type=" +
869 ";").c_str(), -1, &stmt_list_, NULL);
870 sqlite3_prepare_v2(database_,
871 "SELECT path FROM cache_catalog WHERE pinned<>0;",
872 -1, &stmt_list_pinned_, NULL);
873 sqlite3_prepare_v2(database_,
874 "SELECT path FROM cache_catalog WHERE acseq < 0;",
875 -1, &stmt_list_volatile_, NULL);
876 sqlite3_prepare_v2(database_,
877 (
"SELECT path FROM cache_catalog WHERE type=" +
879 ";").c_str(), -1, &stmt_list_catalogs_, NULL);
883 sqlite3_close(database_);
897 const string &description)
899 DoInsert(any_hash, size, description, kInsert);
911 const string &description)
913 DoInsert(any_hash, size, description, kInsertVolatile);
921 return DoList(kList);
929 return DoList(kListPinned);
937 return DoList(kListCatalogs);
945 return DoList(kListVolatile);
962 ParseDirectories(
string(argv[2]),
972 vector<string> logfiles =
SplitString(argv[10],
':');
976 if ((logfiles.size() > 0) && (logfiles[0] !=
""))
978 if (logfiles.size() > 1)
989 const int fd_lockfile_fifo =
991 if (fd_lockfile_fifo < 0) {
998 const string crash_guard = shared_manager.
cache_dir_ +
"/cachemgr.running";
1000 retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1003 "failed to create shared cache manager crash guard");
1011 sqlite3_temp_directory =
1012 static_cast<char *
>(sqlite3_malloc(tmp_dir.length() + 1));
1013 snprintf(sqlite3_temp_directory, tmp_dir.length() + 1,
"%s", tmp_dir.c_str());
1027 const string protocol_revision_path =
1029 retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1032 "failed to open protocol revision file (%d)", errno);
1036 const string revision =
StringifyInt(kProtocolRevision);
1037 int written = write(retval, revision.data(), revision.length());
1039 if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1041 "failed to write protocol revision (%d)", errno);
1046 const string fifo_path = shared_manager.
workspace_dir_ +
"/cachemgr";
1047 shared_manager.
pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1050 fifo_path.c_str(), errno);
1062 close(pipe_handshake);
1066 signal(SIGPIPE, SIG_IGN);
1068 signal(SIGINT, SIG_IGN);
1071 unlink(fifo_path.c_str());
1072 unlink(protocol_revision_path.c_str());
1074 unlink(crash_guard.c_str());
1077 if (sqlite3_temp_directory) {
1078 sqlite3_free(sqlite3_temp_directory);
1079 sqlite3_temp_directory = NULL;
1092 LruCommand command_buffer[kCommandBufferSize];
1093 char description_buffer[kCommandBufferSize*kMaxDescription];
1094 unsigned num_commands = 0;
1096 while (read(quota_mgr->
pipe_lru_[0], &command_buffer[num_commands],
1097 sizeof(command_buffer[0])) ==
sizeof(command_buffer[0]))
1101 const uint64_t
size = command_buffer[num_commands].
GetSize();
1104 if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1105 (command_type == kPin) || (command_type == kPinRegular))
1107 const int desc_length = command_buffer[num_commands].
desc_length;
1109 &description_buffer[kMaxDescription*num_commands], desc_length);
1113 if (command_type == kGetProtocolRevision) {
1115 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1116 if (return_pipe < 0)
1125 if (command_type == kCleanupRate) {
1127 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1128 if (return_pipe < 0)
1130 uint64_t period_s =
size;
1132 WritePipe(return_pipe, &rate,
sizeof(rate));
1138 if (command_type == kReserve) {
1139 bool success =
true;
1141 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1142 if (return_pipe < 0)
1146 const string hash_str(hash.
ToString());
1148 size, hash_str.c_str());
1155 "failed to insert %s (pinned), no space", hash_str.c_str());
1164 WritePipe(return_pipe, &success,
sizeof(success));
1170 if (command_type == kRegisterBackChannel) {
1172 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1173 if (return_pipe < 0)
1179 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1183 map<shash::Md5, int>::const_iterator iter =
1187 "closing left-over back channel %s", hash.
ToString().c_str());
1188 close(iter->second);
1194 WritePipe(return_pipe, &success,
sizeof(success));
1196 hash.
ToString().c_str(), return_pipe);
1201 if (command_type == kUnregisterBackChannel) {
1203 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1207 map<shash::Md5, int>::iterator iter =
1211 "closing back channel %s", hash.
ToString().c_str());
1212 close(iter->second);
1216 "did not find back channel %s", hash.
ToString().c_str());
1224 if (command_type == kUnpin) {
1226 const string hash_str(hash.
ToString());
1228 map<shash::Any, uint64_t>::iterator iter =
1231 quota_mgr->
pinned_ -= iter->second;
1240 "remove orphaned pinned hash %s from cache database",
1242 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1243 hash_str.length(), SQLITE_STATIC);
1245 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1246 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1247 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1248 hash_str.length(), SQLITE_STATIC);
1249 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1250 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1254 "failed to delete %s (%d)", hash_str.c_str(), retval);
1256 sqlite3_reset(quota_mgr->
stmt_rm_);
1266 bool immediate_command = (command_type == kCleanup) ||
1267 (command_type == kList) || (command_type == kListPinned) ||
1268 (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1269 (command_type == kRemove) || (command_type == kStatus) ||
1270 (command_type == kLimits) || (command_type == kPid);
1271 if (!immediate_command) num_commands++;
1273 if ((num_commands == kCommandBufferSize) || immediate_command)
1276 description_buffer);
1277 if (!immediate_command) num_commands = 0;
1280 if (immediate_command) {
1283 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1284 if (return_pipe < 0) {
1290 sqlite3_stmt *this_stmt_list = NULL;
1291 switch (command_type) {
1294 const string hash_str = hash.
ToString();
1297 bool success =
false;
1299 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1300 hash_str.length(), SQLITE_STATIC);
1302 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1303 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1304 uint64_t is_pinned = sqlite3_column_int64(quota_mgr->
stmt_size_, 1);
1306 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1307 hash_str.length(), SQLITE_STATIC);
1308 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1309 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1318 "failed to delete %s (%d)", hash_str.c_str(), retval);
1320 sqlite3_reset(quota_mgr->
stmt_rm_);
1327 WritePipe(return_pipe, &success,
sizeof(success));
1331 WritePipe(return_pipe, &retval,
sizeof(retval));
1334 if (!this_stmt_list) this_stmt_list = quota_mgr->
stmt_list_;
1344 while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1345 string path =
"(NULL)";
1346 if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1348 reinterpret_cast<const char *>(
1349 sqlite3_column_text(this_stmt_list, 0)));
1351 length = path.length();
1352 WritePipe(return_pipe, &length,
sizeof(length));
1354 WritePipe(return_pipe, &path[0], length);
1357 WritePipe(return_pipe, &length,
sizeof(length));
1358 sqlite3_reset(this_stmt_list);
1371 pid_t pid = getpid();
1372 WritePipe(return_pipe, &pid,
sizeof(pid));
1386 description_buffer);
1390 for (map<shash::Any, uint64_t>::const_iterator i =
1412 retval = mkfifo((workspace_dir_ +
"/pipe" +
StringifyInt(i)).c_str(), 0600);
1415 }
while ((retval == -1) && (errno == EEXIST));
1419 pipe[0] = open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe[1])).c_str(),
1420 O_RDONLY | O_NONBLOCK);
1427 const std::string cache_workspace,
1428 std::string *cache_dir,
1429 std::string *workspace_dir)
1431 vector<string> dir_tokens(
SplitString(cache_workspace,
':'));
1432 switch (dir_tokens.size()) {
1434 *cache_dir = *workspace_dir = dir_tokens[0];
1437 *cache_dir = dir_tokens[0];
1438 *workspace_dir = dir_tokens[1];
1453 const uint64_t
size,
1454 const string &description,
1455 const bool is_catalog)
1457 assert((size > 0) || !is_catalog);
1459 const string hash_str = hash.
ToString();
1461 hash_str.c_str(), description.c_str());
1466 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1467 if (pinned_ + size > cleanup_threshold_) {
1472 pinned_chunks_[hash] =
size;
1474 CheckHighPinWatermark();
1477 bool exists = Contains(hash_str);
1478 if (!exists && (gauge_ + size > limit_)) {
1481 int retval = DoCleanup(cleanup_threshold_);
1484 sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1486 sqlite3_bind_int64(stmt_new_, 2, size);
1487 sqlite3_bind_int64(stmt_new_, 3, seq_++);
1488 sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1490 sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1491 sqlite3_bind_int64(stmt_new_, 6, 1);
1492 int retval = sqlite3_step(stmt_new_);
1493 assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1494 sqlite3_reset(stmt_new_);
1495 if (!exists) gauge_ +=
size;
1499 int pipe_reserve[2];
1500 MakeReturnPipe(pipe_reserve);
1507 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
1509 ReadHalfPipe(pipe_reserve[0], &result,
sizeof(result));
1510 CloseReturnPipe(pipe_reserve);
1512 if (!result)
return false;
1513 DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1520 const uint64_t limit,
1521 const uint64_t cleanup_threshold,
1522 const string &cache_workspace)
1526 , cleanup_threshold_(cleanup_threshold)
1532 , fd_lock_cachedb_(-1)
1533 , async_delete_(true)
1538 , stmt_unblock_(NULL)
1544 , stmt_list_pinned_(NULL)
1545 , stmt_list_catalogs_(NULL)
1546 , stmt_list_volatile_(NULL)
1547 , initialized_(false)
1586 const char *descriptions)
1588 int retval = sqlite3_exec(
database_,
"BEGIN", NULL, NULL, NULL);
1589 assert(retval == SQLITE_OK);
1591 for (
unsigned i = 0; i < num; ++i) {
1593 const string hash_str = hash.
ToString();
1599 switch (commands[i].command_type) {
1602 sqlite3_bind_text(
stmt_touch_, 2, &hash_str[0], hash_str.length(),
1606 hash_str.c_str(),
seq_-1, retval);
1607 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1609 hash_str.c_str(), retval);
1614 sqlite3_bind_text(
stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1618 hash_str.c_str(), retval);
1619 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1621 hash_str.c_str(), retval);
1641 sqlite3_bind_text(
stmt_new_, 1, &hash_str[0], hash_str.length(),
1650 commands[i].desc_length, SQLITE_STATIC);
1651 sqlite3_bind_int64(
stmt_new_, 5, (commands[i].command_type ==
kPin) ?
1654 ((commands[i].command_type ==
kPin) ||
1655 (commands[i].command_type ==
kPinRegular)) ? 1 : 0);
1659 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1661 hash_str.c_str(), retval);
1673 retval = sqlite3_exec(
database_,
"COMMIT", NULL, NULL, NULL);
1674 if (retval != SQLITE_OK) {
1681 bool result =
false;
1683 sqlite3_stmt *stmt_select = NULL;
1684 sqlite3_stmt *stmt_insert = NULL;
1696 sql =
"DELETE FROM cache_catalog; DELETE FROM fscache;";
1697 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1698 if (sqlerr != SQLITE_OK) {
1707 sqlite3_prepare_v2(
database_,
"INSERT INTO fscache (sha1, size, actime) "
1708 "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1710 for (
int i = 0; i <= 0xff; i++) {
1711 snprintf(hex,
sizeof(hex),
"%02x", i);
1713 if ((dirp = opendir(path.c_str())) == NULL) {
1715 "failed to open directory %s (tmpwatch interfering?)",
1720 string file_path = path +
"/" + string(d->d_name);
1721 if (stat(file_path.c_str(), &info) == 0) {
1722 if (!S_ISREG(info.st_mode))
1724 if (info.st_size == 0) {
1726 "removing empty file %s during automatic cache db rebuild",
1728 unlink(file_path.c_str());
1732 string hash = string(hex) + string(d->d_name);
1733 sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1735 sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1736 sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1737 if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1741 sqlite3_reset(stmt_insert);
1751 sqlite3_finalize(stmt_insert);
1756 "SELECT sha1, size FROM fscache ORDER BY actime;",
1757 -1, &stmt_select, NULL);
1759 "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1760 "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1761 -1, &stmt_insert, NULL);
1762 while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1763 const string hash = string(
1764 reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1765 sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1766 sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1767 sqlite3_bind_int64(stmt_insert, 3, seq++);
1771 int retval = sqlite3_step(stmt_insert);
1772 if (retval != SQLITE_DONE) {
1775 "could not insert into cache catalog (%d - %s)",
1776 retval, sqlite3_errstr(retval));
1779 sqlite3_reset(stmt_insert);
1783 sql =
"DELETE FROM fscache;";
1784 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1785 if (sqlerr != SQLITE_OK) {
1794 "rebuilding finished, seqence %" PRIu64
", gauge %" PRIu64,
1798 if (stmt_insert) sqlite3_finalize(stmt_insert);
1799 if (stmt_select) sqlite3_finalize(stmt_select);
1800 if (dirp) closedir(dirp);
1810 int back_channel[2],
1811 const string &channel_id)
1825 ReadHalfPipe(back_channel[0], &success,
sizeof(success));
1827 if (success !=
'S') {
1829 "failed to register quota back channel (%c)", success);
1854 ReadHalfPipe(pipe_remove[0], &success,
sizeof(success));
1866 static_cast<void *>(
this)) != 0)
1903 cmd.StoreHash(hash);
1909 int back_channel[2],
1910 const string &channel_id)
1922 close(back_channel[0]);
virtual uint32_t GetProtocolRevision()
#define LogCvmfs(source, mask,...)
void SetLogSyslogFacility(const int local_facility)
sqlite3_stmt * stmt_list_pinned_
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
std::vector< std::string > DoList(const CommandType list_command)
virtual uint64_t GetCleanupRate(uint64_t period_s)
sqlite3_stmt * stmt_list_
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
static Watchdog * Create(const std::string &crash_dump_path)
virtual uint64_t GetSize()
virtual bool Cleanup(const uint64_t leave_size)
static const unsigned kSqliteMemPerThread
virtual uint64_t GetMaxFileSize()
std::string ToString(const bool with_suffix=false) const
uint64_t cleanup_threshold_
static const uint32_t kProtocolRevision
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
bool DoCleanup(const uint64_t leave_size)
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
int BindReturnPipe(int pipe_wronly)
bool Contains(const std::string &hash_str)
perf::MultiRecorder cleanup_recorder_
void SetLogMicroSyslog(const std::string &filename)
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
virtual std::vector< std::string > ListCatalogs()
void CheckHighPinWatermark()
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
bool FileExists(const std::string &path)
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::string workspace_dir_
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
#define GetLogDebugFile()
void ReadHalfPipe(int fd, void *buf, size_t nbyte)
vector< string > SplitString(const string &str, char delim)
void Nonblock2Block(int filedes)
virtual uint64_t GetSizePinned()
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
virtual uint64_t GetCapacity()
void SetSize(const uint64_t new_size)
shash::Any RetrieveHash() const
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
sqlite3_stmt * stmt_touch_
string StringifyInt(const int64_t value)
std::string GetLogMicroSyslog()
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
sqlite3_stmt * stmt_list_volatile_
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
static int MainCacheManager(int argc, char **argv)
std::map< shash::Md5, int > back_channels_
static const unsigned kMaxDescription
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
unsigned char digest[shash::kMaxDigestSize]
std::string MakePathWithoutSuffix() const
sqlite3_stmt * stmt_list_catalogs_
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
uint64_t GetNoTicks(uint32_t retrospect_s) const
Any MkFromHexPtr(const HexPtr hex, const char suffix)
virtual ~PosixQuotaManager()
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
std::map< shash::Any, uint64_t > pinned_chunks_
sqlite3_stmt * stmt_unpin_
void CloseReturnPipe(int pipe[2])
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
const unsigned kDigestSizes[]
void UnlockBackChannels()
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
static const uint64_t kVolatileFlag
void UnlockFile(const int filedes)
sqlite3_stmt * stmt_size_
int GetLogSyslogFacility()