15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
31 #include <sys/statfs.h>
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
70 open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe_wronly)).c_str(),
71 O_WRONLY | O_NONBLOCK);
76 "failed to bind return pipe (%d)", errno);
83 const uint64_t watermark = kHighPinWatermark * cleanup_threshold_ / 100;
84 if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86 "high watermark of pinned files (%" PRIu64
"M > %" PRIu64
"M)",
87 pinned_ / (1024 * 1024), watermark / (1024 * 1024));
88 BroadcastBackchannels(
"R");
94 DIR *dirp = opendir(workspace_dir_.c_str());
98 bool found_leftovers =
false;
100 const string name = dent->d_name;
101 const string path = workspace_dir_ +
"/" + name;
106 if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) ==
"pipe")) {
107 if (!found_leftovers) {
109 "removing left-over FIFOs from cache directory");
111 found_leftovers =
true;
112 unlink(path.c_str());
127 return DoCleanup(leave_size);
131 MakeReturnPipe(pipe_cleanup);
135 cmd.
size = leave_size;
138 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
139 ManagedReadHalfPipe(pipe_cleanup[0], &result,
sizeof(result));
140 CloseReturnPipe(pipe_cleanup);
147 if (stmt_list_catalogs_)
148 sqlite3_finalize(stmt_list_catalogs_);
149 if (stmt_list_pinned_)
150 sqlite3_finalize(stmt_list_pinned_);
151 if (stmt_list_volatile_)
152 sqlite3_finalize(stmt_list_volatile_);
154 sqlite3_finalize(stmt_list_);
156 sqlite3_finalize(stmt_lru_);
158 sqlite3_finalize(stmt_rm_);
160 sqlite3_finalize(stmt_rm_batch_);
162 sqlite3_finalize(stmt_size_);
164 sqlite3_finalize(stmt_touch_);
166 sqlite3_finalize(stmt_unpin_);
168 sqlite3_finalize(stmt_block_);
170 sqlite3_finalize(stmt_unblock_);
172 sqlite3_finalize(stmt_new_);
174 sqlite3_close(database_);
177 stmt_list_catalogs_ = NULL;
178 stmt_list_pinned_ = NULL;
179 stmt_list_volatile_ = NULL;
182 stmt_rm_batch_ = NULL;
187 stmt_unblock_ = NULL;
191 pinned_chunks_.clear();
198 UnlinkReturnPipe(pipe[1]);
208 sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
210 if (sqlite3_step(stmt_size_) == SQLITE_ROW)
212 sqlite3_reset(stmt_size_);
221 if ((limit_ == 0) || (gauge_ >= limit_))
224 struct statvfs vfs_info;
225 const int retval = statvfs((cache_dir_ +
"/cachedb").c_str(), &vfs_info);
228 "failed to query %s for free space (%d)", cache_dir_.c_str(),
232 const int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
234 free_space_byte / (1024 * 1024));
236 const int64_t required_byte = limit_ - gauge_;
237 if (free_space_byte < required_byte) {
239 "too little free space on the file system hosting the cache,"
240 " %" PRId64
" MB available",
241 free_space_byte / (1024 * 1024));
247 const uint64_t limit,
248 const uint64_t cleanup_threshold,
249 const bool rebuild_database) {
250 if (cleanup_threshold >= limit) {
252 "invalid parameters: limit %" PRIu64
", "
253 "cleanup_threshold %" PRIu64,
254 limit, cleanup_threshold);
259 limit, cleanup_threshold, cache_workspace);
263 delete quota_manager;
271 return quota_manager;
279 const std::string &exe_path,
280 const std::string &cache_workspace,
281 const uint64_t limit,
282 const uint64_t cleanup_threshold,
285 string workspace_dir;
286 ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
288 pid_t new_cachemgr_pid;
291 const int fd_lockfile =
LockFile(workspace_dir +
"/lock_cachemgr");
292 if (fd_lockfile < 0) {
294 (workspace_dir +
"/lock_cachemgr").c_str(), errno);
304 const string fifo_path = workspace_dir +
"/cachemgr";
306 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
308 const int fd_lockfile_rw = open((workspace_dir +
"/lock_cachemgr").c_str(),
310 unsigned lockfile_magicnumber = 0;
311 const ssize_t result_mn =
SafeRead(fd_lockfile_rw, &lockfile_magicnumber,
312 sizeof(lockfile_magicnumber));
313 const ssize_t result =
SafeRead(fd_lockfile_rw, &new_cachemgr_pid,
314 sizeof(new_cachemgr_pid));
315 close(fd_lockfile_rw);
317 if ((lockfile_magicnumber != kLockFileMagicNumber) || (result < 0)
319 || (static_cast<size_t>(result) <
sizeof(new_cachemgr_pid))) {
322 "could not read cache manager pid from lockfile");
342 "received limit %" PRIu64
", threshold %" PRIu64,
344 if (
FileExists(workspace_dir +
"/cachemgr.protocol")) {
353 const int connect_error = errno;
356 const int fd_lockfile_fifo =
LockFile(workspace_dir +
"/lock_cachemgr.fifo");
357 if (fd_lockfile_fifo < 0) {
359 (workspace_dir +
"/lock_cachemgr.fifo").c_str(), errno);
366 if (connect_error == ENXIO) {
368 unlink(fifo_path.c_str());
372 int retval = mkfifo(fifo_path.c_str(), 0600);
383 int pipe_handshake[2];
387 vector<string> command_line;
388 command_line.push_back(exe_path);
389 command_line.push_back(
"__cachemgr__");
390 command_line.push_back(cache_workspace);
392 command_line.push_back(
StringifyInt(pipe_handshake[0]));
394 command_line.push_back(
StringifyInt(cleanup_threshold));
402 set<int> preserve_filedes;
403 preserve_filedes.insert(0);
404 preserve_filedes.insert(1);
405 preserve_filedes.insert(2);
406 preserve_filedes.insert(pipe_boot[1]);
407 preserve_filedes.insert(pipe_handshake[0]);
410 retval =
ManagedExec(command_line, preserve_filedes, map<int, int>(),
413 true, &new_cachemgr_pid);
427 const int fd_lockfile_rw = open((workspace_dir +
"/lock_cachemgr").c_str(),
428 O_RDWR | O_TRUNC, 0600);
430 const bool result_mn =
SafeWrite(fd_lockfile_rw, &magic_number,
431 sizeof(magic_number));
432 const bool result =
SafeWrite(fd_lockfile_rw, &new_cachemgr_pid,
433 sizeof(new_cachemgr_pid));
434 if (!result || !result_mn) {
438 close(fd_lockfile_rw);
441 close(pipe_handshake[0]);
443 if (read(pipe_boot[0], &buf, 1) != 1) {
446 close(pipe_handshake[1]);
449 "cache manager did not start");
455 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
458 "failed to connect to newly created FIFO (%d)", errno);
459 close(pipe_handshake[1]);
467 if (write(pipe_handshake[1], &buf, 1) != 1) {
469 close(pipe_handshake[1]);
474 close(pipe_handshake[1]);
485 "received limit %" PRIu64
", "
486 "threshold %" PRIu64,
493 if (gauge_ <= leave_size)
498 "clean up cache until at most %lu KB is used", leave_size / 1024);
500 cleanup_recorder_.Tick();
503 vector<string> trash;
508 int64_t max_acseq = -1;
510 sqlite3_reset(stmt_lru_);
511 sqlite3_bind_int64(stmt_lru_, 1,
512 (max_acseq == -1) ? std::numeric_limits<int64_t>::min()
515 std::vector<EvictCandidate> candidates;
516 candidates.reserve(kEvictBatchSize);
519 while (sqlite3_step(stmt_lru_) == SQLITE_ROW) {
520 hash_str =
reinterpret_cast<const char *
>(
521 sqlite3_column_text(stmt_lru_, 0));
524 candidates.push_back(
526 sqlite3_column_int64(stmt_lru_, 1),
527 sqlite3_column_int64(stmt_lru_, 2)));
530 if (candidates.empty()) {
535 const unsigned N = candidates.size();
536 for (i = 0; i < N; ++i) {
540 if (pinned_chunks_.find(candidates[i].hash) != pinned_chunks_.end()) {
541 hash_str = candidates[i].hash.ToString();
544 sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
546 result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
547 sqlite3_reset(stmt_block_);
552 trash.push_back(cache_dir_ +
"/"
553 + candidates[i].hash.MakePathWithoutSuffix());
554 gauge_ -= candidates[i].size;
555 max_acseq = candidates[i].acseq;
557 candidates[i].hash.ToString().c_str(), gauge_);
559 if (gauge_ <= leave_size)
562 }
while (gauge_ > leave_size);
564 if (max_acseq != -1) {
565 sqlite3_bind_int64(stmt_rm_batch_, 1, max_acseq);
566 result = (sqlite3_step(stmt_rm_batch_) == SQLITE_DONE);
568 sqlite3_reset(stmt_rm_batch_);
570 result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
571 sqlite3_reset(stmt_unblock_);
575 if (!EmptyTrash(trash))
578 if (gauge_ > leave_size) {
580 "request to clean until %" PRIu64
", "
581 "but effective gauge is %" PRIu64,
597 if ((pid = fork()) == 0) {
605 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
607 unlink(trash[i].c_str());
614 waitpid(pid, &statloc, 0);
619 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
621 unlink(trash[i].c_str());
630 const string &description,
632 const string hash_str = hash.
ToString();
634 hash_str.c_str(), description.c_str(), command_type);
635 const unsigned desc_length = (description.length() > kMaxDescription)
637 : description.length();
646 memcpy(reinterpret_cast<char *>(cmd) +
sizeof(
LruCommand), &description[0],
653 vector<string> result;
656 MakeReturnPipe(pipe_list);
657 char description_buffer[kMaxDescription];
662 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
666 ManagedReadHalfPipe(pipe_list[0], &length,
sizeof(length));
668 ReadPipe(pipe_list[0], description_buffer, length);
669 result.push_back(
string(description_buffer, length));
671 }
while (length >= 0);
673 CloseReturnPipe(pipe_list);
679 if (limit_ != (uint64_t)(-1))
684 if (statfs(
".", &info) == 0) {
685 return info.f_bavail * info.f_bsize;
688 "failed to query file system info of cache (%d)", errno);
695 uint64_t *cleanup_threshold) {
697 MakeReturnPipe(pipe_limits);
702 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
703 ManagedReadHalfPipe(pipe_limits[0], limit,
sizeof(*limit));
704 ReadPipe(pipe_limits[0], cleanup_threshold,
sizeof(*cleanup_threshold));
705 CloseReturnPipe(pipe_limits);
714 return limit_ - cleanup_threshold_;
723 return cachemgr_pid_;
728 MakeReturnPipe(pipe_pid);
733 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
735 CloseReturnPipe(pipe_pid);
741 int pipe_revision[2];
742 MakeReturnPipe(pipe_revision);
747 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
750 ManagedReadHalfPipe(pipe_revision[0], &revision,
sizeof(revision));
751 CloseReturnPipe(pipe_revision);
761 MakeReturnPipe(pipe_status);
766 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
767 ManagedReadHalfPipe(pipe_status[0], gauge,
sizeof(*gauge));
768 ReadPipe(pipe_status[0], pinned,
sizeof(*pinned));
769 CloseReturnPipe(pipe_status);
773 int pipe_set_limit[2];
775 MakeReturnPipe(pipe_set_limit);
781 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
782 ReadHalfPipe(pipe_set_limit[0], &result,
sizeof(result));
783 CloseReturnPipe(pipe_set_limit);
791 cleanup_threshold_ = size / 2;
793 "Quota limit set to %lu / threshold %lu", limit_,
797 return SetSharedLimit(size);
803 uint64_t gauge, size_pinned;
804 GetSharedStatus(&gauge, &size_pinned);
812 uint64_t gauge, size_pinned;
813 GetSharedStatus(&gauge, &size_pinned);
819 if (!
spawned_ || (protocol_revision_ < 2))
821 uint64_t cleanup_rate;
823 int pipe_cleanup_rate[2];
824 MakeReturnPipe(pipe_cleanup_rate);
829 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
830 ManagedReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate,
831 sizeof(cleanup_rate));
832 CloseReturnPipe(pipe_cleanup_rate);
842 fd_lock_cachedb_ =
LockFile(workspace_dir_ +
"/lock_cachedb");
843 if (fd_lock_cachedb_ < 0) {
849 const string db_file = cache_dir_ +
"/cachedb";
850 if (rebuild_database) {
853 unlink(db_file.c_str());
854 unlink((db_file +
"-journal").c_str());
858 int err = sqlite3_open(db_file.c_str(), &database_);
859 if (err != SQLITE_OK) {
861 goto init_database_fail;
864 sql =
"PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
865 "PRAGMA auto_vacuum=1; "
866 "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
867 " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
868 "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
869 "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
870 " ON cache_catalog (acseq); "
871 "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
872 "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
873 "CREATE INDEX idx_fscache_actime ON fscache (actime); "
874 "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
875 " CONSTRAINT pk_properties PRIMARY KEY(key));";
876 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
877 if (err != SQLITE_OK) {
880 sqlite3_close(database_);
881 unlink(db_file.c_str());
882 unlink((db_file +
"-journal").c_str());
884 "LRU database corrupted, re-building");
889 goto init_database_fail;
894 sql =
"ALTER TABLE cache_catalog ADD type INTEGER; "
895 "ALTER TABLE cache_catalog ADD pinned INTEGER";
896 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
897 if (err == SQLITE_OK) {
898 sql =
"UPDATE cache_catalog SET type=" +
StringifyInt(kFileRegular) +
";";
899 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
900 if (err != SQLITE_OK) {
902 "could not init cache database (failed: %s)", sql.c_str());
903 goto init_database_fail;
908 sql =
"UPDATE cache_catalog SET pinned=0;";
909 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
910 if (err != SQLITE_OK) {
913 goto init_database_fail;
917 sql =
"INSERT OR REPLACE INTO properties (key, value) "
918 "VALUES ('schema', '1.0')";
919 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
920 if (err != SQLITE_OK) {
923 goto init_database_fail;
927 sql =
"SELECT count(*) FROM cache_catalog;";
928 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
929 if (sqlite3_step(stmt) == SQLITE_ROW) {
930 if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
932 "CernVM-FS: building lru cache database...");
933 if (!RebuildDatabase()) {
935 "could not build cache database from file system");
936 sqlite3_finalize(stmt);
937 goto init_database_fail;
940 sqlite3_finalize(stmt);
943 sqlite3_finalize(stmt);
944 goto init_database_fail;
948 sql =
"SELECT sum(size) FROM cache_catalog;";
949 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
950 if (sqlite3_step(stmt) == SQLITE_ROW) {
951 gauge_ = sqlite3_column_int64(stmt, 0);
954 sqlite3_finalize(stmt);
955 goto init_database_fail;
957 sqlite3_finalize(stmt);
960 sql =
"SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
961 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
962 if (sqlite3_step(stmt) == SQLITE_ROW) {
963 seq_ = sqlite3_column_int64(stmt, 0) + 1;
966 sqlite3_finalize(stmt);
967 goto init_database_fail;
969 sqlite3_finalize(stmt);
972 sqlite3_prepare_v2(database_,
973 "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
975 -1, &stmt_touch_, NULL);
976 sqlite3_prepare_v2(database_,
977 "UPDATE cache_catalog SET pinned=0 "
979 -1, &stmt_unpin_, NULL);
980 sqlite3_prepare_v2(database_,
981 "UPDATE cache_catalog SET pinned=2 "
983 -1, &stmt_block_, NULL);
984 sqlite3_prepare_v2(database_,
985 "UPDATE cache_catalog SET pinned=1 "
987 -1, &stmt_unblock_, NULL);
988 sqlite3_prepare_v2(database_,
989 "INSERT OR REPLACE INTO cache_catalog "
990 "(sha1, size, acseq, path, type, pinned) "
991 "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
992 -1, &stmt_new_, NULL);
993 sqlite3_prepare_v2(database_,
994 "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
995 -1, &stmt_size_, NULL);
996 sqlite3_prepare_v2(database_,
"DELETE FROM cache_catalog WHERE sha1=:sha1;",
997 -1, &stmt_rm_, NULL);
998 sqlite3_prepare_v2(database_,
999 "DELETE FROM cache_catalog WHERE acseq<=:a AND pinned<>2;",
1000 -1, &stmt_rm_batch_, NULL);
1001 sqlite3_prepare_v2(database_,
1002 (std::string(
"SELECT sha1, size, acseq FROM cache_catalog "
1003 "WHERE pinned<>2 AND acseq>=:a "
1004 "ORDER BY acseq ASC "
1008 -1, &stmt_lru_, NULL);
1009 sqlite3_prepare_v2(database_,
1010 (
"SELECT path FROM cache_catalog WHERE type="
1013 -1, &stmt_list_, NULL);
1014 sqlite3_prepare_v2(database_,
1015 "SELECT path FROM cache_catalog WHERE pinned<>0;", -1,
1016 &stmt_list_pinned_, NULL);
1017 sqlite3_prepare_v2(database_,
1018 "SELECT path FROM cache_catalog WHERE acseq < 0;", -1,
1019 &stmt_list_volatile_, NULL);
1020 sqlite3_prepare_v2(database_,
1021 (
"SELECT path FROM cache_catalog WHERE type="
1024 -1, &stmt_list_catalogs_, NULL);
1028 sqlite3_close(database_);
1040 const uint64_t
size,
1041 const string &description) {
1042 DoInsert(any_hash, size, description, kInsert);
1052 const uint64_t
size,
1053 const string &description) {
1054 DoInsert(any_hash, size, description, kInsertVolatile);
1074 return DoList(kListCatalogs);
1082 return DoList(kListVolatile);
1094 shared_manager.
shared_ =
true;
1099 ParseDirectories(
string(argv[2]),
1109 vector<string> logfiles =
SplitString(argv[10],
':');
1113 if ((logfiles.size() > 0) && (logfiles[0] !=
""))
1115 if (logfiles.size() > 1)
1123 watchdog->
Spawn(
"./stacktrace.cachemgr");
1127 +
"/lock_cachemgr.fifo");
1128 if (fd_lockfile_fifo < 0) {
1130 "could not open lock file "
1136 const string crash_guard = shared_manager.
cache_dir_ +
"/cachemgr.running";
1137 const bool rebuild =
FileExists(crash_guard);
1138 retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1141 "failed to create shared cache manager crash guard");
1149 sqlite3_temp_directory =
static_cast<char *
>(
1150 sqlite3_malloc(tmp_dir.length() + 1));
1151 snprintf(sqlite3_temp_directory, tmp_dir.length() + 1,
"%s", tmp_dir.c_str());
1165 const string protocol_revision_path = shared_manager.
workspace_dir_
1166 +
"/cachemgr.protocol";
1167 retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1170 "failed to open protocol revision file (%d)", errno);
1174 const string revision =
StringifyInt(kProtocolRevision);
1175 const int written = write(retval, revision.data(), revision.length());
1177 if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1179 "failed to write protocol revision (%d)", errno);
1184 const string fifo_path = shared_manager.
workspace_dir_ +
"/cachemgr";
1185 shared_manager.
pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1188 fifo_path.c_str(), errno);
1200 close(pipe_handshake);
1204 signal(SIGPIPE, SIG_IGN);
1206 signal(SIGINT, SIG_IGN);
1209 unlink(fifo_path.c_str());
1210 unlink(protocol_revision_path.c_str());
1212 unlink(crash_guard.c_str());
1215 if (sqlite3_temp_directory) {
1216 sqlite3_free(sqlite3_temp_directory);
1217 sqlite3_temp_directory = NULL;
1230 LruCommand command_buffer[kCommandBufferSize];
1231 char description_buffer[kCommandBufferSize * kMaxDescription];
1232 unsigned num_commands = 0;
1234 while (read(quota_mgr->
pipe_lru_[0], &command_buffer[num_commands],
1235 sizeof(command_buffer[0]))
1236 ==
sizeof(command_buffer[0])) {
1239 const uint64_t
size = command_buffer[num_commands].
GetSize();
1242 if ((command_type == kInsert) || (command_type == kInsertVolatile)
1243 || (command_type == kPin) || (command_type == kPinRegular)) {
1244 const int desc_length = command_buffer[num_commands].
desc_length;
1246 &description_buffer[kMaxDescription * num_commands],
1251 if (command_type == kGetProtocolRevision) {
1252 const int return_pipe =
1253 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1254 if (return_pipe < 0)
1263 if (command_type == kCleanupRate) {
1264 const int return_pipe =
1265 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1266 if (return_pipe < 0)
1268 const uint64_t period_s =
1271 WritePipe(return_pipe, &rate,
sizeof(rate));
1276 if (command_type == kSetLimit) {
1278 command_buffer[num_commands].return_pipe);
1279 if (return_pipe < 0)
1284 "Quota limit set to %lu / threshold %lu", quota_mgr->
limit_,
1287 WritePipe(return_pipe, &ret,
sizeof(ret));
1293 if (command_type == kReserve) {
1294 bool success =
true;
1295 const int return_pipe =
1296 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1297 if (return_pipe < 0)
1301 const string hash_str(hash.
ToString());
1309 "failed to insert %s (pinned), no space", hash_str.c_str());
1318 WritePipe(return_pipe, &success,
sizeof(success));
1324 if (command_type == kRegisterBackChannel) {
1325 const int return_pipe =
1326 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1327 if (return_pipe < 0)
1333 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1337 const map<shash::Md5, int>::const_iterator iter =
1341 "closing left-over back channel %s", hash.
ToString().c_str());
1342 close(iter->second);
1348 WritePipe(return_pipe, &success,
sizeof(success));
1350 hash.
ToString().c_str(), return_pipe);
1355 if (command_type == kUnregisterBackChannel) {
1357 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1361 const map<shash::Md5, int>::iterator iter =
1366 close(iter->second);
1370 "did not find back channel %s", hash.
ToString().c_str());
1378 if (command_type == kUnpin) {
1380 const string hash_str(hash.
ToString());
1382 const map<shash::Any, uint64_t>::iterator iter =
1385 quota_mgr->
pinned_ -= iter->second;
1393 "remove orphaned pinned hash %s from cache database",
1395 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1396 hash_str.length(), SQLITE_STATIC);
1398 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1399 const uint64_t size =
1400 sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1401 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1402 hash_str.length(), SQLITE_STATIC);
1403 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1404 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1408 "failed to delete %s (%d)", hash_str.c_str(), retval);
1410 sqlite3_reset(quota_mgr->
stmt_rm_);
1420 const bool immediate_command =
1421 (command_type == kCleanup) || (command_type == kList) ||
1422 (command_type == kListPinned) || (command_type == kListCatalogs) ||
1423 (command_type == kListVolatile) || (command_type == kRemove) ||
1424 (command_type == kStatus) || (command_type == kLimits) ||
1425 (command_type == kPid);
1426 if (!immediate_command)
1429 if ((num_commands == kCommandBufferSize) || immediate_command) {
1431 description_buffer);
1432 if (!immediate_command)
1436 if (immediate_command) {
1438 const int return_pipe =
1439 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1440 if (return_pipe < 0) {
1446 sqlite3_stmt *this_stmt_list = NULL;
1447 switch (command_type) {
1450 const string hash_str = hash.
ToString();
1453 bool success =
false;
1455 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1456 hash_str.length(), SQLITE_STATIC);
1458 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1459 const uint64_t size =
1460 sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1461 const uint64_t is_pinned =
1462 sqlite3_column_int64(quota_mgr->
stmt_size_, 1);
1464 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1465 hash_str.length(), SQLITE_STATIC);
1466 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1467 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1476 "failed to delete %s (%d)", hash_str.c_str(), retval);
1478 sqlite3_reset(quota_mgr->
stmt_rm_);
1485 WritePipe(return_pipe, &success,
sizeof(success));
1490 WritePipe(return_pipe, &retval,
sizeof(retval));
1493 if (!this_stmt_list)
1496 if (!this_stmt_list)
1499 if (!this_stmt_list)
1502 if (!this_stmt_list)
1507 while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1508 string path =
"(NULL)";
1509 if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1510 path = string(reinterpret_cast<const char *>(
1511 sqlite3_column_text(this_stmt_list, 0)));
1513 length = path.length();
1514 WritePipe(return_pipe, &length,
sizeof(length));
1516 WritePipe(return_pipe, &path[0], length);
1519 WritePipe(return_pipe, &length,
sizeof(length));
1520 sqlite3_reset(this_stmt_list);
1533 pid_t pid = getpid();
1534 WritePipe(return_pipe, &pid,
sizeof(pid));
1548 description_buffer);
1552 for (map<shash::Any, uint64_t>::const_iterator
1575 retval = mkfifo((workspace_dir_ +
"/pipe" +
StringifyInt(i)).c_str(), 0600);
1578 }
while ((retval == -1) && (errno == EEXIST));
1582 pipe[0] = open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe[1])).c_str(),
1583 O_RDONLY | O_NONBLOCK);
1590 std::string *cache_dir,
1591 std::string *workspace_dir) {
1592 vector<string> dir_tokens(
SplitString(cache_workspace,
':'));
1593 switch (dir_tokens.size()) {
1595 *cache_dir = *workspace_dir = dir_tokens[0];
1598 *cache_dir = dir_tokens[0];
1599 *workspace_dir = dir_tokens[1];
1613 const uint64_t
size,
1614 const string &description,
1615 const bool is_catalog) {
1616 assert((size > 0) || !is_catalog);
1618 const string hash_str = hash.
ToString();
1620 description.c_str());
1625 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1626 if (pinned_ + size > cleanup_threshold_) {
1631 pinned_chunks_[hash] =
size;
1633 CheckHighPinWatermark();
1636 const bool exists = Contains(hash_str);
1637 if (!exists && (gauge_ + size > limit_)) {
1640 const int retval = DoCleanup(cleanup_threshold_);
1643 sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1645 sqlite3_bind_int64(stmt_new_, 2, size);
1646 sqlite3_bind_int64(stmt_new_, 3, seq_++);
1647 sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1649 sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1650 sqlite3_bind_int64(stmt_new_, 6, 1);
1651 const int retval = sqlite3_step(stmt_new_);
1652 assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1653 sqlite3_reset(stmt_new_);
1659 int pipe_reserve[2];
1660 MakeReturnPipe(pipe_reserve);
1667 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
1669 ManagedReadHalfPipe(pipe_reserve[0], &result,
sizeof(result));
1670 CloseReturnPipe(pipe_reserve);
1674 DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1681 const uint64_t cleanup_threshold,
1682 const string &cache_workspace)
1686 , cleanup_threshold_(cleanup_threshold)
1692 , fd_lock_cachedb_(-1)
1693 , async_delete_(true)
1699 , stmt_unblock_(NULL)
1704 , stmt_rm_batch_(NULL)
1706 , stmt_list_pinned_(NULL)
1707 , stmt_list_catalogs_(NULL)
1708 , stmt_list_volatile_(NULL)
1709 , initialized_(false) {
1747 const char *descriptions) {
1748 int retval = sqlite3_exec(
database_,
"BEGIN", NULL, NULL, NULL);
1749 assert(retval == SQLITE_OK);
1751 for (
unsigned i = 0; i < num; ++i) {
1753 const string hash_str = hash.
ToString();
1759 switch (commands[i].command_type) {
1762 sqlite3_bind_text(
stmt_touch_, 2, &hash_str[0], hash_str.length(),
1766 hash_str.c_str(),
seq_ - 1, retval);
1767 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1769 hash_str.c_str(), retval);
1774 sqlite3_bind_text(
stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1779 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1781 hash_str.c_str(), retval);
1801 sqlite3_bind_text(
stmt_new_, 1, &hash_str[0], hash_str.length(),
1810 commands[i].desc_length, SQLITE_STATIC);
1815 ((commands[i].command_type ==
kPin)
1822 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1824 hash_str.c_str(), retval);
1837 retval = sqlite3_exec(
database_,
"COMMIT", NULL, NULL, NULL);
1838 if (retval != SQLITE_OK) {
1845 bool result =
false;
1847 sqlite3_stmt *stmt_select = NULL;
1848 sqlite3_stmt *stmt_insert = NULL;
1860 sql =
"DELETE FROM cache_catalog; DELETE FROM fscache;";
1861 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1862 if (sqlerr != SQLITE_OK) {
1872 "INSERT INTO fscache (sha1, size, actime) "
1873 "VALUES (:sha1, :s, :t);",
1874 -1, &stmt_insert, NULL);
1876 for (
int i = 0; i <= 0xff; i++) {
1877 snprintf(hex,
sizeof(hex),
"%02x", i);
1879 if ((dirp = opendir(path.c_str())) == NULL) {
1881 "failed to open directory %s (tmpwatch interfering?)",
1886 const string file_path = path +
"/" + string(d->d_name);
1887 if (stat(file_path.c_str(), &info) == 0) {
1888 if (!S_ISREG(info.st_mode))
1890 if (info.st_size == 0) {
1892 "removing empty file %s during automatic cache db rebuild",
1894 unlink(file_path.c_str());
1898 string hash = string(hex) + string(d->d_name);
1899 sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1901 sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1902 sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1903 if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1907 sqlite3_reset(stmt_insert);
1917 sqlite3_finalize(stmt_insert);
1922 "SELECT sha1, size FROM fscache ORDER BY actime;", -1,
1923 &stmt_select, NULL);
1926 "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1927 "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1928 -1, &stmt_insert, NULL);
1929 while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1930 const string hash = string(
1931 reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1932 sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1933 sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1934 sqlite3_bind_int64(stmt_insert, 3, seq++);
1938 const int retval = sqlite3_step(stmt_insert);
1939 if (retval != SQLITE_DONE) {
1942 "could not insert into cache catalog (%d - %s)", retval,
1943 sqlite3_errstr(retval));
1946 sqlite3_reset(stmt_insert);
1950 sql =
"DELETE FROM fscache;";
1951 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1952 if (sqlerr != SQLITE_OK) {
1961 "rebuilding finished, sequence %" PRIu64
", gauge %" PRIu64,
seq_,
1966 sqlite3_finalize(stmt_insert);
1968 sqlite3_finalize(stmt_select);
1980 const string &channel_id) {
1995 if (success !=
'S') {
1997 "failed to register quota back channel (%c)", success);
2010 const string hash_str = hash.
ToString();
2034 static_cast<void *>(
this))
2071 cmd.StoreHash(hash);
2077 const string &channel_id) {
2088 close(back_channel[0]);
2096 bool result =
false;
2103 "Error: quota manager could not read from cachemanager pipe");
virtual uint32_t GetProtocolRevision()
void SetLogSyslogFacility(const int local_facility)
sqlite3_stmt * stmt_list_pinned_
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
std::vector< std::string > DoList(const CommandType list_command)
virtual uint64_t GetCleanupRate(uint64_t period_s)
sqlite3_stmt * stmt_list_
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
virtual uint64_t GetSize()
virtual bool Cleanup(const uint64_t leave_size)
static const unsigned kSqliteMemPerThread
virtual uint64_t GetMaxFileSize()
std::string ToString(const bool with_suffix=false) const
uint64_t cleanup_threshold_
static const uint32_t kProtocolRevision
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
bool DoCleanup(const uint64_t leave_size)
bool SafeWrite(int fd, const void *buf, size_t nbyte)
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
int BindReturnPipe(int pipe_wronly)
bool Contains(const std::string &hash_str)
perf::MultiRecorder cleanup_recorder_
void SetLogMicroSyslog(const std::string &filename)
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
virtual bool SetLimit(uint64_t limit)
virtual std::vector< std::string > ListCatalogs()
void CheckHighPinWatermark()
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
bool FileExists(const std::string &path)
static Watchdog * Create(FnOnCrash on_crash)
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::string workspace_dir_
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
bool ReadHalfPipe(int fd, void *buf, size_t nbyte, unsigned timeout_ms)
#define GetLogDebugFile()
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
vector< string > SplitString(const string &str, char delim)
void Nonblock2Block(int filedes)
virtual uint64_t GetSizePinned()
void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
virtual uint64_t GetCapacity()
void SetSize(const uint64_t new_size)
shash::Any RetrieveHash() const
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
sqlite3_stmt * stmt_touch_
string StringifyInt(const int64_t value)
std::string GetLogMicroSyslog()
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
sqlite3_stmt * stmt_list_volatile_
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
static int MainCacheManager(int argc, char **argv)
bool EmptyTrash(const std::vector< std::string > &trash)
std::map< shash::Md5, int > back_channels_
static const unsigned kMaxDescription
bool SetSharedLimit(uint64_t limit)
bool CloseAllFildes(const std::set< int > &preserve_fildes)
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
static const unsigned kLockFileMagicNumber
unsigned char digest[shash::kMaxDigestSize]
std::string MakePathWithoutSuffix() const
sqlite3_stmt * stmt_list_catalogs_
bool ExecAsDaemon(const std::vector< std::string > &command_line, pid_t *child_pid)
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
uint64_t GetNoTicks(uint32_t retrospect_s) const
Any MkFromHexPtr(const HexPtr hex, const char suffix)
void SetCacheMgrPid(pid_t pid_)
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
std::map< shash::Any, uint64_t > pinned_chunks_
sqlite3_stmt * stmt_unpin_
void CloseReturnPipe(int pipe[2])
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
const unsigned kDigestSizes[]
void UnlockBackChannels()
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
static const uint64_t kVolatileFlag
void UnlockFile(const int filedes)
sqlite3_stmt * stmt_size_
int GetLogSyslogFacility()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)