15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
31 #include <sys/statfs.h>
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
70 open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe_wronly)).c_str(),
71 O_WRONLY | O_NONBLOCK);
76 "failed to bind return pipe (%d)", errno);
83 const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84 if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86 "high watermark of pinned files (%" PRIu64
"M > %" PRIu64
"M)",
87 pinned_/(1024*1024), watermark/(1024*1024));
88 BroadcastBackchannels(
"R");
94 DIR *dirp = opendir(workspace_dir_.c_str());
98 bool found_leftovers =
false;
100 const string name = dent->d_name;
101 const string path = workspace_dir_ +
"/" + name;
106 if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) ==
"pipe")) {
107 if (!found_leftovers) {
109 "removing left-over FIFOs from cache directory");
111 found_leftovers =
true;
112 unlink(path.c_str());
127 return DoCleanup(leave_size);
131 MakeReturnPipe(pipe_cleanup);
135 cmd.
size = leave_size;
138 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
139 ManagedReadHalfPipe(pipe_cleanup[0], &result,
sizeof(result));
140 CloseReturnPipe(pipe_cleanup);
147 if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148 if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149 if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150 if (stmt_list_) sqlite3_finalize(stmt_list_);
151 if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152 if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153 if (stmt_rm_batch_) sqlite3_finalize(stmt_rm_batch_);
154 if (stmt_size_) sqlite3_finalize(stmt_size_);
155 if (stmt_touch_) sqlite3_finalize(stmt_touch_);
156 if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
157 if (stmt_block_) sqlite3_finalize(stmt_block_);
158 if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
159 if (stmt_new_) sqlite3_finalize(stmt_new_);
160 if (database_) sqlite3_close(database_);
163 stmt_list_catalogs_ = NULL;
164 stmt_list_pinned_ = NULL;
165 stmt_list_volatile_ = NULL;
168 stmt_rm_batch_ = NULL;
173 stmt_unblock_ = NULL;
177 pinned_chunks_.clear();
184 UnlinkReturnPipe(pipe[1]);
194 sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
196 if (sqlite3_step(stmt_size_) == SQLITE_ROW)
198 sqlite3_reset(stmt_size_);
200 hash_str.c_str(), result);
207 if ((limit_ == 0) || (gauge_ >= limit_))
210 struct statvfs vfs_info;
211 int retval = statvfs((cache_dir_ +
"/cachedb").c_str(), &vfs_info);
214 "failed to query %s for free space (%d)",
215 cache_dir_.c_str(), errno);
218 int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
220 free_space_byte / (1024 * 1024));
222 int64_t required_byte = limit_ - gauge_;
223 if (free_space_byte < required_byte) {
225 "too little free space on the file system hosting the cache,"
226 " %" PRId64
" MB available",
227 free_space_byte / (1024 * 1024));
233 const string &cache_workspace,
234 const uint64_t limit,
235 const uint64_t cleanup_threshold,
236 const bool rebuild_database)
238 if (cleanup_threshold >= limit) {
240 "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
249 delete quota_manager;
257 return quota_manager;
265 const std::string &exe_path,
266 const std::string &cache_workspace,
267 const uint64_t limit,
268 const uint64_t cleanup_threshold,
272 string workspace_dir;
273 ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
275 pid_t new_cachemgr_pid;
278 const int fd_lockfile =
LockFile(workspace_dir +
"/lock_cachemgr");
279 if (fd_lockfile < 0) {
281 (workspace_dir +
"/lock_cachemgr").c_str(), errno);
291 const string fifo_path = workspace_dir +
"/cachemgr";
293 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
295 const int fd_lockfile_rw = open((workspace_dir +
"/lock_cachemgr").c_str(), O_RDWR, 0600);
296 unsigned lockfile_magicnumber = 0;
297 const ssize_t result_mn =
SafeRead(fd_lockfile_rw, &lockfile_magicnumber,
sizeof(lockfile_magicnumber));
298 const ssize_t result =
SafeRead(fd_lockfile_rw, &new_cachemgr_pid,
sizeof(new_cachemgr_pid));
299 close(fd_lockfile_rw);
301 if ((lockfile_magicnumber != kLockFileMagicNumber) || (result < 0) || (result_mn < 0)
302 || (static_cast<size_t>(result) <
sizeof(new_cachemgr_pid))) {
305 "could not read cache manager pid from lockfile");
325 "received limit %" PRIu64
", threshold %" PRIu64,
327 if (
FileExists(workspace_dir +
"/cachemgr.protocol")) {
336 const int connect_error = errno;
339 const int fd_lockfile_fifo =
LockFile(workspace_dir +
"/lock_cachemgr.fifo");
340 if (fd_lockfile_fifo < 0) {
342 (workspace_dir +
"/lock_cachemgr.fifo").c_str(), errno);
349 if (connect_error == ENXIO) {
351 unlink(fifo_path.c_str());
355 int retval = mkfifo(fifo_path.c_str(), 0600);
366 int pipe_handshake[2];
370 vector<string> command_line;
371 command_line.push_back(exe_path);
372 command_line.push_back(
"__cachemgr__");
373 command_line.push_back(cache_workspace);
375 command_line.push_back(
StringifyInt(pipe_handshake[0]));
377 command_line.push_back(
StringifyInt(cleanup_threshold));
385 set<int> preserve_filedes;
386 preserve_filedes.insert(0);
387 preserve_filedes.insert(1);
388 preserve_filedes.insert(2);
389 preserve_filedes.insert(pipe_boot[1]);
390 preserve_filedes.insert(pipe_handshake[0]);
393 retval =
ManagedExec(command_line, preserve_filedes, map<int, int>(),
411 const int fd_lockfile_rw = open((workspace_dir +
"/lock_cachemgr").c_str(), O_RDWR | O_TRUNC, 0600);
413 const bool result_mn =
SafeWrite(fd_lockfile_rw, &magic_number,
sizeof(magic_number));
414 const bool result =
SafeWrite(fd_lockfile_rw, &new_cachemgr_pid,
sizeof(new_cachemgr_pid));
415 if (!result || !result_mn) {
419 close(fd_lockfile_rw);
422 close(pipe_handshake[0]);
424 if (read(pipe_boot[0], &buf, 1) != 1) {
427 close(pipe_handshake[1]);
430 "cache manager did not start");
436 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
439 "failed to connect to newly created FIFO (%d)", errno);
440 close(pipe_handshake[1]);
448 if (write(pipe_handshake[1], &buf, 1) != 1) {
450 close(pipe_handshake[1]);
455 close(pipe_handshake[1]);
466 "threshold %" PRIu64,
473 if (gauge_ <= leave_size)
478 "clean up cache until at most %lu KB is used", leave_size/1024);
480 cleanup_recorder_.Tick();
483 vector<string> trash;
488 int64_t max_acseq = -1;
490 sqlite3_reset(stmt_lru_);
491 sqlite3_bind_int64(stmt_lru_, 1, (max_acseq == -1) ?
492 std::numeric_limits<int64_t>::min() : (max_acseq + 1));
494 std::vector<EvictCandidate> candidates;
495 candidates.reserve(kEvictBatchSize);
498 while (sqlite3_step(stmt_lru_) == SQLITE_ROW) {
499 hash_str =
reinterpret_cast<const char *
>(
500 sqlite3_column_text(stmt_lru_, 0));
505 sqlite3_column_int64(stmt_lru_, 1),
506 sqlite3_column_int64(stmt_lru_, 2)));
509 if (candidates.empty()) {
514 const unsigned N = candidates.size();
515 for (i = 0; i < N; ++i) {
519 if (pinned_chunks_.find(candidates[i].hash) != pinned_chunks_.end()) {
520 hash_str = candidates[i].hash.ToString();
523 sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
525 result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
526 sqlite3_reset(stmt_block_);
531 trash.push_back(cache_dir_ +
"/" +
532 candidates[i].hash.MakePathWithoutSuffix());
533 gauge_ -= candidates[i].size;
534 max_acseq = candidates[i].acseq;
536 candidates[i].hash.ToString().c_str(), gauge_);
538 if (gauge_ <= leave_size)
541 }
while (gauge_ > leave_size);
543 if (max_acseq != -1) {
544 sqlite3_bind_int64(stmt_rm_batch_, 1, max_acseq);
545 result = (sqlite3_step(stmt_rm_batch_) == SQLITE_DONE);
547 sqlite3_reset(stmt_rm_batch_);
549 result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
550 sqlite3_reset(stmt_unblock_);
554 if (!EmptyTrash(trash))
557 if (gauge_ > leave_size) {
559 "request to clean until %" PRIu64
", "
560 "but effective gauge is %" PRIu64, leave_size, gauge_);
575 if ((pid = fork()) == 0) {
583 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
585 unlink(trash[i].c_str());
592 waitpid(pid, &statloc, 0);
597 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
599 unlink(trash[i].c_str());
609 const string &description,
612 const string hash_str = hash.
ToString();
614 hash_str.c_str(), description.c_str(), command_type);
615 const unsigned desc_length = (description.length() > kMaxDescription) ?
616 kMaxDescription : description.length();
625 memcpy(reinterpret_cast<char *>(cmd)+
sizeof(
LruCommand),
626 &description[0], desc_length);
632 vector<string> result;
635 MakeReturnPipe(pipe_list);
636 char description_buffer[kMaxDescription];
641 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
645 ManagedReadHalfPipe(pipe_list[0], &length,
sizeof(length));
647 ReadPipe(pipe_list[0], description_buffer, length);
648 result.push_back(
string(description_buffer, length));
650 }
while (length >= 0);
652 CloseReturnPipe(pipe_list);
658 if (limit_ != (uint64_t)(-1))
663 if (statfs(
".", &info) == 0) {
664 return info.f_bavail * info.f_bsize;
667 "failed to query file system info of cache (%d)", errno);
676 MakeReturnPipe(pipe_limits);
681 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
682 ManagedReadHalfPipe(pipe_limits[0], limit,
sizeof(*limit));
683 ReadPipe(pipe_limits[0], cleanup_threshold,
sizeof(*cleanup_threshold));
684 CloseReturnPipe(pipe_limits);
693 return limit_ - cleanup_threshold_;
702 return cachemgr_pid_;
707 MakeReturnPipe(pipe_pid);
712 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
714 CloseReturnPipe(pipe_pid);
720 int pipe_revision[2];
721 MakeReturnPipe(pipe_revision);
726 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
729 ManagedReadHalfPipe(pipe_revision[0], &revision,
sizeof(revision));
730 CloseReturnPipe(pipe_revision);
740 MakeReturnPipe(pipe_status);
745 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
746 ManagedReadHalfPipe(pipe_status[0], gauge,
sizeof(*gauge));
747 ReadPipe(pipe_status[0], pinned,
sizeof(*pinned));
748 CloseReturnPipe(pipe_status);
752 int pipe_set_limit[2];
754 MakeReturnPipe(pipe_set_limit);
760 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
761 ReadHalfPipe(pipe_set_limit[0], &result,
sizeof(result));
762 CloseReturnPipe(pipe_set_limit);
770 cleanup_threshold_ = size/2;
774 return SetSharedLimit(size);
779 uint64_t gauge, size_pinned;
780 GetSharedStatus(&gauge, &size_pinned);
787 uint64_t gauge, size_pinned;
788 GetSharedStatus(&gauge, &size_pinned);
794 if (!
spawned_ || (protocol_revision_ < 2))
return 0;
795 uint64_t cleanup_rate;
797 int pipe_cleanup_rate[2];
798 MakeReturnPipe(pipe_cleanup_rate);
803 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
804 ManagedReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate,
sizeof(cleanup_rate));
805 CloseReturnPipe(pipe_cleanup_rate);
815 fd_lock_cachedb_ =
LockFile(workspace_dir_ +
"/lock_cachedb");
816 if (fd_lock_cachedb_ < 0) {
822 const string db_file = cache_dir_ +
"/cachedb";
823 if (rebuild_database) {
826 unlink(db_file.c_str());
827 unlink((db_file +
"-journal").c_str());
831 int err = sqlite3_open(db_file.c_str(), &database_);
832 if (err != SQLITE_OK) {
834 goto init_database_fail;
837 sql =
"PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
838 "PRAGMA auto_vacuum=1; "
839 "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
840 " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
841 "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
842 "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
843 " ON cache_catalog (acseq); "
844 "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
845 "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
846 "CREATE INDEX idx_fscache_actime ON fscache (actime); "
847 "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
848 " CONSTRAINT pk_properties PRIMARY KEY(key));";
849 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
850 if (err != SQLITE_OK) {
853 sqlite3_close(database_);
854 unlink(db_file.c_str());
855 unlink((db_file +
"-journal").c_str());
857 "LRU database corrupted, re-building");
862 goto init_database_fail;
867 sql =
"ALTER TABLE cache_catalog ADD type INTEGER; "
868 "ALTER TABLE cache_catalog ADD pinned INTEGER";
869 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
870 if (err == SQLITE_OK) {
871 sql =
"UPDATE cache_catalog SET type=" +
StringifyInt(kFileRegular) +
";";
872 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
873 if (err != SQLITE_OK) {
875 "could not init cache database (failed: %s)", sql.c_str());
876 goto init_database_fail;
881 sql =
"UPDATE cache_catalog SET pinned=0;";
882 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
883 if (err != SQLITE_OK) {
886 goto init_database_fail;
890 sql =
"INSERT OR REPLACE INTO properties (key, value) "
891 "VALUES ('schema', '1.0')";
892 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
893 if (err != SQLITE_OK) {
896 goto init_database_fail;
900 sql =
"SELECT count(*) FROM cache_catalog;";
901 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
902 if (sqlite3_step(stmt) == SQLITE_ROW) {
903 if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
905 "CernVM-FS: building lru cache database...");
906 if (!RebuildDatabase()) {
908 "could not build cache database from file system");
909 sqlite3_finalize(stmt);
910 goto init_database_fail;
913 sqlite3_finalize(stmt);
916 sqlite3_finalize(stmt);
917 goto init_database_fail;
921 sql =
"SELECT sum(size) FROM cache_catalog;";
922 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
923 if (sqlite3_step(stmt) == SQLITE_ROW) {
924 gauge_ = sqlite3_column_int64(stmt, 0);
927 sqlite3_finalize(stmt);
928 goto init_database_fail;
930 sqlite3_finalize(stmt);
933 sql =
"SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
934 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
935 if (sqlite3_step(stmt) == SQLITE_ROW) {
936 seq_ = sqlite3_column_int64(stmt, 0)+1;
939 sqlite3_finalize(stmt);
940 goto init_database_fail;
942 sqlite3_finalize(stmt);
945 sqlite3_prepare_v2(database_,
946 "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
947 "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
948 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=0 "
949 "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
950 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=2 "
951 "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
952 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=1 "
953 "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
954 sqlite3_prepare_v2(database_,
955 "INSERT OR REPLACE INTO cache_catalog "
956 "(sha1, size, acseq, path, type, pinned) "
957 "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
958 -1, &stmt_new_, NULL);
959 sqlite3_prepare_v2(database_,
960 "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
961 -1, &stmt_size_, NULL);
962 sqlite3_prepare_v2(database_,
"DELETE FROM cache_catalog WHERE sha1=:sha1;",
963 -1, &stmt_rm_, NULL);
964 sqlite3_prepare_v2(database_,
965 "DELETE FROM cache_catalog WHERE acseq<=:a AND pinned<>2;",
966 -1, &stmt_rm_batch_, NULL);
967 sqlite3_prepare_v2(database_, (std::string(
968 "SELECT sha1, size, acseq FROM cache_catalog "
969 "WHERE pinned<>2 AND acseq>=:a "
970 "ORDER BY acseq ASC "
971 "LIMIT ") +
StringifyInt(kEvictBatchSize) +
";").c_str(),
972 -1, &stmt_lru_, NULL);
973 sqlite3_prepare_v2(database_,
974 (
"SELECT path FROM cache_catalog WHERE type=" +
976 ";").c_str(), -1, &stmt_list_, NULL);
977 sqlite3_prepare_v2(database_,
978 "SELECT path FROM cache_catalog WHERE pinned<>0;",
979 -1, &stmt_list_pinned_, NULL);
980 sqlite3_prepare_v2(database_,
981 "SELECT path FROM cache_catalog WHERE acseq < 0;",
982 -1, &stmt_list_volatile_, NULL);
983 sqlite3_prepare_v2(database_,
984 (
"SELECT path FROM cache_catalog WHERE type=" +
986 ";").c_str(), -1, &stmt_list_catalogs_, NULL);
990 sqlite3_close(database_);
1003 const uint64_t
size,
1004 const string &description)
1006 DoInsert(any_hash, size, description, kInsert);
1017 const uint64_t
size,
1018 const string &description)
1020 DoInsert(any_hash, size, description, kInsertVolatile);
1028 return DoList(kList);
1036 return DoList(kListPinned);
1044 return DoList(kListCatalogs);
1052 return DoList(kListVolatile);
1065 shared_manager.
shared_ =
true;
1070 ParseDirectories(
string(argv[2]),
1080 vector<string> logfiles =
SplitString(argv[10],
':');
1084 if ((logfiles.size() > 0) && (logfiles[0] !=
""))
1086 if (logfiles.size() > 1)
1094 watchdog->
Spawn(
"./stacktrace.cachemgr");
1097 const int fd_lockfile_fifo =
1099 if (fd_lockfile_fifo < 0) {
1106 const string crash_guard = shared_manager.
cache_dir_ +
"/cachemgr.running";
1107 const bool rebuild =
FileExists(crash_guard);
1108 retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1111 "failed to create shared cache manager crash guard");
1119 sqlite3_temp_directory =
1120 static_cast<char *
>(sqlite3_malloc(tmp_dir.length() + 1));
1121 snprintf(sqlite3_temp_directory, tmp_dir.length() + 1,
"%s", tmp_dir.c_str());
1135 const string protocol_revision_path =
1137 retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1140 "failed to open protocol revision file (%d)", errno);
1144 const string revision =
StringifyInt(kProtocolRevision);
1145 int written = write(retval, revision.data(), revision.length());
1147 if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1149 "failed to write protocol revision (%d)", errno);
1154 const string fifo_path = shared_manager.
workspace_dir_ +
"/cachemgr";
1155 shared_manager.
pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1158 fifo_path.c_str(), errno);
1170 close(pipe_handshake);
1174 signal(SIGPIPE, SIG_IGN);
1176 signal(SIGINT, SIG_IGN);
1179 unlink(fifo_path.c_str());
1180 unlink(protocol_revision_path.c_str());
1182 unlink(crash_guard.c_str());
1185 if (sqlite3_temp_directory) {
1186 sqlite3_free(sqlite3_temp_directory);
1187 sqlite3_temp_directory = NULL;
1200 LruCommand command_buffer[kCommandBufferSize];
1201 char description_buffer[kCommandBufferSize*kMaxDescription];
1202 unsigned num_commands = 0;
1204 while (read(quota_mgr->
pipe_lru_[0], &command_buffer[num_commands],
1205 sizeof(command_buffer[0])) ==
sizeof(command_buffer[0]))
1209 const uint64_t
size = command_buffer[num_commands].
GetSize();
1212 if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1213 (command_type == kPin) || (command_type == kPinRegular))
1215 const int desc_length = command_buffer[num_commands].
desc_length;
1217 &description_buffer[kMaxDescription*num_commands], desc_length);
1221 if (command_type == kGetProtocolRevision) {
1223 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1224 if (return_pipe < 0)
1233 if (command_type == kCleanupRate) {
1235 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1236 if (return_pipe < 0)
1238 uint64_t period_s =
size;
1240 WritePipe(return_pipe, &rate,
sizeof(rate));
1245 if (command_type == kSetLimit) {
1246 const int return_pipe =
1247 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1248 if (return_pipe < 0)
1254 WritePipe(return_pipe, &ret,
sizeof(ret));
1260 if (command_type == kReserve) {
1261 bool success =
true;
1263 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1264 if (return_pipe < 0)
1268 const string hash_str(hash.
ToString());
1270 size, hash_str.c_str());
1277 "failed to insert %s (pinned), no space", hash_str.c_str());
1286 WritePipe(return_pipe, &success,
sizeof(success));
1292 if (command_type == kRegisterBackChannel) {
1294 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1295 if (return_pipe < 0)
1301 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1305 map<shash::Md5, int>::const_iterator iter =
1309 "closing left-over back channel %s", hash.
ToString().c_str());
1310 close(iter->second);
1316 WritePipe(return_pipe, &success,
sizeof(success));
1318 hash.
ToString().c_str(), return_pipe);
1323 if (command_type == kUnregisterBackChannel) {
1325 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1329 map<shash::Md5, int>::iterator iter =
1333 "closing back channel %s", hash.
ToString().c_str());
1334 close(iter->second);
1338 "did not find back channel %s", hash.
ToString().c_str());
1346 if (command_type == kUnpin) {
1348 const string hash_str(hash.
ToString());
1350 map<shash::Any, uint64_t>::iterator iter =
1353 quota_mgr->
pinned_ -= iter->second;
1362 "remove orphaned pinned hash %s from cache database",
1364 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1365 hash_str.length(), SQLITE_STATIC);
1367 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1368 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1369 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1370 hash_str.length(), SQLITE_STATIC);
1371 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1372 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1376 "failed to delete %s (%d)", hash_str.c_str(), retval);
1378 sqlite3_reset(quota_mgr->
stmt_rm_);
1388 bool immediate_command = (command_type == kCleanup) ||
1389 (command_type == kList) || (command_type == kListPinned) ||
1390 (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1391 (command_type == kRemove) || (command_type == kStatus) ||
1392 (command_type == kLimits) || (command_type == kPid);
1393 if (!immediate_command) num_commands++;
1395 if ((num_commands == kCommandBufferSize) || immediate_command)
1398 description_buffer);
1399 if (!immediate_command) num_commands = 0;
1402 if (immediate_command) {
1405 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1406 if (return_pipe < 0) {
1412 sqlite3_stmt *this_stmt_list = NULL;
1413 switch (command_type) {
1416 const string hash_str = hash.
ToString();
1419 bool success =
false;
1421 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1422 hash_str.length(), SQLITE_STATIC);
1424 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1425 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1426 uint64_t is_pinned = sqlite3_column_int64(quota_mgr->
stmt_size_, 1);
1428 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1429 hash_str.length(), SQLITE_STATIC);
1430 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1431 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1440 "failed to delete %s (%d)", hash_str.c_str(), retval);
1442 sqlite3_reset(quota_mgr->
stmt_rm_);
1449 WritePipe(return_pipe, &success,
sizeof(success));
1453 WritePipe(return_pipe, &retval,
sizeof(retval));
1456 if (!this_stmt_list) this_stmt_list = quota_mgr->
stmt_list_;
1466 while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1467 string path =
"(NULL)";
1468 if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1470 reinterpret_cast<const char *>(
1471 sqlite3_column_text(this_stmt_list, 0)));
1473 length = path.length();
1474 WritePipe(return_pipe, &length,
sizeof(length));
1476 WritePipe(return_pipe, &path[0], length);
1479 WritePipe(return_pipe, &length,
sizeof(length));
1480 sqlite3_reset(this_stmt_list);
1493 pid_t pid = getpid();
1494 WritePipe(return_pipe, &pid,
sizeof(pid));
1508 description_buffer);
1512 for (map<shash::Any, uint64_t>::const_iterator i =
1534 retval = mkfifo((workspace_dir_ +
"/pipe" +
StringifyInt(i)).c_str(), 0600);
1537 }
while ((retval == -1) && (errno == EEXIST));
1541 pipe[0] = open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe[1])).c_str(),
1542 O_RDONLY | O_NONBLOCK);
1549 const std::string cache_workspace,
1550 std::string *cache_dir,
1551 std::string *workspace_dir)
1553 vector<string> dir_tokens(
SplitString(cache_workspace,
':'));
1554 switch (dir_tokens.size()) {
1556 *cache_dir = *workspace_dir = dir_tokens[0];
1559 *cache_dir = dir_tokens[0];
1560 *workspace_dir = dir_tokens[1];
1575 const uint64_t
size,
1576 const string &description,
1577 const bool is_catalog)
1579 assert((size > 0) || !is_catalog);
1581 const string hash_str = hash.
ToString();
1583 hash_str.c_str(), description.c_str());
1588 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1589 if (pinned_ + size > cleanup_threshold_) {
1594 pinned_chunks_[hash] =
size;
1596 CheckHighPinWatermark();
1599 bool exists = Contains(hash_str);
1600 if (!exists && (gauge_ + size > limit_)) {
1603 int retval = DoCleanup(cleanup_threshold_);
1606 sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1608 sqlite3_bind_int64(stmt_new_, 2, size);
1609 sqlite3_bind_int64(stmt_new_, 3, seq_++);
1610 sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1612 sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1613 sqlite3_bind_int64(stmt_new_, 6, 1);
1614 int retval = sqlite3_step(stmt_new_);
1615 assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1616 sqlite3_reset(stmt_new_);
1617 if (!exists) gauge_ +=
size;
1621 int pipe_reserve[2];
1622 MakeReturnPipe(pipe_reserve);
1629 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
1631 ManagedReadHalfPipe(pipe_reserve[0], &result,
sizeof(result));
1632 CloseReturnPipe(pipe_reserve);
1634 if (!result)
return false;
1635 DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1642 const uint64_t limit,
1643 const uint64_t cleanup_threshold,
1644 const string &cache_workspace)
1648 , cleanup_threshold_(cleanup_threshold)
1654 , fd_lock_cachedb_(-1)
1655 , async_delete_(true)
1661 , stmt_unblock_(NULL)
1666 , stmt_rm_batch_(NULL)
1668 , stmt_list_pinned_(NULL)
1669 , stmt_list_catalogs_(NULL)
1670 , stmt_list_volatile_(NULL)
1671 , initialized_(false)
1710 const char *descriptions)
1712 int retval = sqlite3_exec(
database_,
"BEGIN", NULL, NULL, NULL);
1713 assert(retval == SQLITE_OK);
1715 for (
unsigned i = 0; i < num; ++i) {
1717 const string hash_str = hash.
ToString();
1723 switch (commands[i].command_type) {
1726 sqlite3_bind_text(
stmt_touch_, 2, &hash_str[0], hash_str.length(),
1730 hash_str.c_str(),
seq_-1, retval);
1731 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1733 hash_str.c_str(), retval);
1738 sqlite3_bind_text(
stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1742 hash_str.c_str(), retval);
1743 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1745 hash_str.c_str(), retval);
1765 sqlite3_bind_text(
stmt_new_, 1, &hash_str[0], hash_str.length(),
1774 commands[i].desc_length, SQLITE_STATIC);
1775 sqlite3_bind_int64(
stmt_new_, 5, (commands[i].command_type ==
kPin) ?
1778 ((commands[i].command_type ==
kPin) ||
1779 (commands[i].command_type ==
kPinRegular)) ? 1 : 0);
1783 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1785 hash_str.c_str(), retval);
1797 retval = sqlite3_exec(
database_,
"COMMIT", NULL, NULL, NULL);
1798 if (retval != SQLITE_OK) {
1805 bool result =
false;
1807 sqlite3_stmt *stmt_select = NULL;
1808 sqlite3_stmt *stmt_insert = NULL;
1820 sql =
"DELETE FROM cache_catalog; DELETE FROM fscache;";
1821 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1822 if (sqlerr != SQLITE_OK) {
1831 sqlite3_prepare_v2(
database_,
"INSERT INTO fscache (sha1, size, actime) "
1832 "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1834 for (
int i = 0; i <= 0xff; i++) {
1835 snprintf(hex,
sizeof(hex),
"%02x", i);
1837 if ((dirp = opendir(path.c_str())) == NULL) {
1839 "failed to open directory %s (tmpwatch interfering?)",
1844 string file_path = path +
"/" + string(d->d_name);
1845 if (stat(file_path.c_str(), &info) == 0) {
1846 if (!S_ISREG(info.st_mode))
1848 if (info.st_size == 0) {
1850 "removing empty file %s during automatic cache db rebuild",
1852 unlink(file_path.c_str());
1856 string hash = string(hex) + string(d->d_name);
1857 sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1859 sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1860 sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1861 if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1865 sqlite3_reset(stmt_insert);
1875 sqlite3_finalize(stmt_insert);
1880 "SELECT sha1, size FROM fscache ORDER BY actime;",
1881 -1, &stmt_select, NULL);
1883 "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1884 "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1885 -1, &stmt_insert, NULL);
1886 while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1887 const string hash = string(
1888 reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1889 sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1890 sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1891 sqlite3_bind_int64(stmt_insert, 3, seq++);
1895 int retval = sqlite3_step(stmt_insert);
1896 if (retval != SQLITE_DONE) {
1899 "could not insert into cache catalog (%d - %s)",
1900 retval, sqlite3_errstr(retval));
1903 sqlite3_reset(stmt_insert);
1907 sql =
"DELETE FROM fscache;";
1908 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1909 if (sqlerr != SQLITE_OK) {
1918 "rebuilding finished, sequence %" PRIu64
", gauge %" PRIu64,
1922 if (stmt_insert) sqlite3_finalize(stmt_insert);
1923 if (stmt_select) sqlite3_finalize(stmt_select);
1924 if (dirp) closedir(dirp);
1934 int back_channel[2],
1935 const string &channel_id)
1951 if (success !=
'S') {
1953 "failed to register quota back channel (%c)", success);
1990 static_cast<void *>(
this)) != 0)
2027 cmd.StoreHash(hash);
2033 int back_channel[2],
2034 const string &channel_id)
2046 close(back_channel[0]);
2054 bool result =
false;
2060 PANIC(
kLogStderr,
"Error: quota manager could not read from cachemanager pipe");
virtual uint32_t GetProtocolRevision()
void SetLogSyslogFacility(const int local_facility)
sqlite3_stmt * stmt_list_pinned_
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
std::vector< std::string > DoList(const CommandType list_command)
virtual uint64_t GetCleanupRate(uint64_t period_s)
sqlite3_stmt * stmt_list_
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
virtual uint64_t GetSize()
virtual bool Cleanup(const uint64_t leave_size)
static const unsigned kSqliteMemPerThread
virtual uint64_t GetMaxFileSize()
std::string ToString(const bool with_suffix=false) const
uint64_t cleanup_threshold_
static const uint32_t kProtocolRevision
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
bool DoCleanup(const uint64_t leave_size)
bool SafeWrite(int fd, const void *buf, size_t nbyte)
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
int BindReturnPipe(int pipe_wronly)
bool Contains(const std::string &hash_str)
perf::MultiRecorder cleanup_recorder_
void SetLogMicroSyslog(const std::string &filename)
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
virtual bool SetLimit(uint64_t limit)
virtual std::vector< std::string > ListCatalogs()
void CheckHighPinWatermark()
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
bool FileExists(const std::string &path)
static Watchdog * Create(FnOnCrash on_crash)
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::string workspace_dir_
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
bool ReadHalfPipe(int fd, void *buf, size_t nbyte, unsigned timeout_ms)
#define GetLogDebugFile()
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
vector< string > SplitString(const string &str, char delim)
void Nonblock2Block(int filedes)
virtual uint64_t GetSizePinned()
void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
virtual uint64_t GetCapacity()
void SetSize(const uint64_t new_size)
shash::Any RetrieveHash() const
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
sqlite3_stmt * stmt_touch_
string StringifyInt(const int64_t value)
std::string GetLogMicroSyslog()
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
sqlite3_stmt * stmt_list_volatile_
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
static int MainCacheManager(int argc, char **argv)
bool EmptyTrash(const std::vector< std::string > &trash)
std::map< shash::Md5, int > back_channels_
static const unsigned kMaxDescription
bool SetSharedLimit(uint64_t limit)
bool CloseAllFildes(const std::set< int > &preserve_fildes)
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
static const unsigned kLockFileMagicNumber
unsigned char digest[shash::kMaxDigestSize]
std::string MakePathWithoutSuffix() const
sqlite3_stmt * stmt_list_catalogs_
bool ExecAsDaemon(const std::vector< std::string > &command_line, pid_t *child_pid)
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
uint64_t GetNoTicks(uint32_t retrospect_s) const
Any MkFromHexPtr(const HexPtr hex, const char suffix)
void SetCacheMgrPid(pid_t pid_)
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
std::map< shash::Any, uint64_t > pinned_chunks_
sqlite3_stmt * stmt_unpin_
void CloseReturnPipe(int pipe[2])
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
const unsigned kDigestSizes[]
void UnlockBackChannels()
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
static const uint64_t kVolatileFlag
void UnlockFile(const int filedes)
sqlite3_stmt * stmt_size_
int GetLogSyslogFacility()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)