15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
18 #include "cvmfs_config.h"
31 #include <sys/statfs.h>
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
70 open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe_wronly)).c_str(),
71 O_WRONLY | O_NONBLOCK);
76 "failed to bind return pipe (%d)", errno);
83 const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84 if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86 "high watermark of pinned files (%" PRIu64
"M > %" PRIu64
"M)",
87 pinned_/(1024*1024), watermark/(1024*1024));
88 BroadcastBackchannels(
"R");
94 DIR *dirp = opendir(workspace_dir_.c_str());
98 bool found_leftovers =
false;
100 const string name = dent->d_name;
101 const string path = workspace_dir_ +
"/" + name;
106 if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) ==
"pipe")) {
107 if (!found_leftovers) {
109 "removing left-over FIFOs from cache directory");
111 found_leftovers =
true;
112 unlink(path.c_str());
127 return DoCleanup(leave_size);
131 MakeReturnPipe(pipe_cleanup);
135 cmd.
size = leave_size;
138 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
140 CloseReturnPipe(pipe_cleanup);
147 if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148 if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149 if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150 if (stmt_list_) sqlite3_finalize(stmt_list_);
151 if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152 if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153 if (stmt_size_) sqlite3_finalize(stmt_size_);
154 if (stmt_touch_) sqlite3_finalize(stmt_touch_);
155 if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
156 if (stmt_block_) sqlite3_finalize(stmt_block_);
157 if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
158 if (stmt_new_) sqlite3_finalize(stmt_new_);
159 if (database_) sqlite3_close(database_);
162 stmt_list_catalogs_ = NULL;
163 stmt_list_pinned_ = NULL;
164 stmt_list_volatile_ = NULL;
171 stmt_unblock_ = NULL;
175 pinned_chunks_.clear();
182 UnlinkReturnPipe(pipe[1]);
192 sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
194 if (sqlite3_step(stmt_size_) == SQLITE_ROW)
196 sqlite3_reset(stmt_size_);
198 hash_str.c_str(), result);
205 if ((limit_ == 0) || (gauge_ >= limit_))
208 struct statvfs vfs_info;
209 int retval = statvfs((cache_dir_ +
"/cachedb").c_str(), &vfs_info);
212 "failed to query %s for free space (%d)",
213 cache_dir_.c_str(), errno);
216 int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
218 free_space_byte / (1024 * 1024));
220 int64_t required_byte = limit_ - gauge_;
221 if (free_space_byte < required_byte) {
223 "too little free space on the file system hosting the cache,"
224 " %" PRId64
" MB available",
225 free_space_byte / (1024 * 1024));
231 const string &cache_workspace,
232 const uint64_t limit,
233 const uint64_t cleanup_threshold,
234 const bool rebuild_database)
236 if (cleanup_threshold >= limit) {
238 "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
247 delete quota_manager;
255 return quota_manager;
263 const std::string &exe_path,
264 const std::string &cache_workspace,
265 const uint64_t limit,
266 const uint64_t cleanup_threshold,
270 string workspace_dir;
271 ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
274 const int fd_lockfile =
LockFile(workspace_dir +
"/lock_cachemgr");
275 if (fd_lockfile < 0) {
277 (workspace_dir +
"/lock_cachemgr").c_str(), errno);
287 const string fifo_path = workspace_dir +
"/cachemgr";
289 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
297 "received limit %" PRIu64
", threshold %" PRIu64,
299 if (
FileExists(workspace_dir +
"/cachemgr.protocol")) {
308 const int connect_error = errno;
311 const int fd_lockfile_fifo =
LockFile(workspace_dir +
"/lock_cachemgr.fifo");
312 if (fd_lockfile_fifo < 0) {
314 (workspace_dir +
"/lock_cachemgr.fifo").c_str(), errno);
321 if (connect_error == ENXIO) {
323 unlink(fifo_path.c_str());
327 int retval = mkfifo(fifo_path.c_str(), 0600);
338 int pipe_handshake[2];
342 vector<string> command_line;
343 command_line.push_back(exe_path);
344 command_line.push_back(
"__cachemgr__");
345 command_line.push_back(cache_workspace);
347 command_line.push_back(
StringifyInt(pipe_handshake[0]));
349 command_line.push_back(
StringifyInt(cleanup_threshold));
355 set<int> preserve_filedes;
356 preserve_filedes.insert(0);
357 preserve_filedes.insert(1);
358 preserve_filedes.insert(2);
359 preserve_filedes.insert(pipe_boot[1]);
360 preserve_filedes.insert(pipe_handshake[0]);
362 retval =
ManagedExec(command_line, preserve_filedes, map<int, int>(),
false);
374 close(pipe_handshake[0]);
376 if (read(pipe_boot[0], &buf, 1) != 1) {
379 close(pipe_handshake[1]);
382 "cache manager did not start");
388 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
391 "failed to connect to newly created FIFO (%d)", errno);
392 close(pipe_handshake[1]);
400 if (write(pipe_handshake[1], &buf, 1) != 1) {
402 close(pipe_handshake[1]);
407 close(pipe_handshake[1]);
418 "threshold %" PRIu64,
425 if (gauge_ <= leave_size)
430 "clean up cache until at most %lu KB is used", leave_size/1024);
432 cleanup_recorder_.Tick();
436 vector<string> trash;
439 sqlite3_reset(stmt_lru_);
440 if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
445 hash_str = string(reinterpret_cast<const char *>(
446 sqlite3_column_text(stmt_lru_, 0)));
453 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
454 trash.push_back(cache_dir_ +
"/" + hash.MakePathWithoutSuffix());
455 gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
457 hash_str.c_str(), gauge_);
459 sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
461 result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
462 sqlite3_reset(stmt_rm_);
466 "failed to find %s in cache database (%d). "
467 "Cache database is out of sync. "
468 "Restart cvmfs with clean cache.", hash_str.c_str(), result);
472 sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
474 result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
475 sqlite3_reset(stmt_block_);
478 }
while (gauge_ > leave_size);
480 result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
481 sqlite3_reset(stmt_unblock_);
486 if (!trash.empty()) {
490 if ((pid = fork()) == 0) {
498 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
500 unlink(trash[i].c_str());
507 waitpid(pid, &statloc, 0);
512 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
514 unlink(trash[i].c_str());
519 if (gauge_ > leave_size) {
521 "request to clean until %" PRIu64
", "
522 "but effective gauge is %" PRIu64, leave_size, gauge_);
532 const string &description,
535 const string hash_str = hash.
ToString();
537 hash_str.c_str(), description.c_str(), command_type);
538 const unsigned desc_length = (description.length() > kMaxDescription) ?
539 kMaxDescription : description.length();
548 memcpy(reinterpret_cast<char *>(cmd)+
sizeof(
LruCommand),
549 &description[0], desc_length);
555 vector<string> result;
558 MakeReturnPipe(pipe_list);
559 char description_buffer[kMaxDescription];
564 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
570 ReadPipe(pipe_list[0], description_buffer, length);
571 result.push_back(
string(description_buffer, length));
573 }
while (length >= 0);
575 CloseReturnPipe(pipe_list);
581 if (limit_ != (uint64_t)(-1))
586 if (statfs(
".", &info) == 0) {
587 return info.f_bavail * info.f_bsize;
590 "failed to query file system info of cache (%d)", errno);
599 MakeReturnPipe(pipe_limits);
604 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
606 ReadPipe(pipe_limits[0], cleanup_threshold,
sizeof(*cleanup_threshold));
607 CloseReturnPipe(pipe_limits);
616 return limit_ - cleanup_threshold_;
627 MakeReturnPipe(pipe_pid);
632 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
634 CloseReturnPipe(pipe_pid);
640 int pipe_revision[2];
641 MakeReturnPipe(pipe_revision);
646 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
649 ReadHalfPipe(pipe_revision[0], &revision,
sizeof(revision));
650 CloseReturnPipe(pipe_revision);
660 MakeReturnPipe(pipe_status);
665 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
667 ReadPipe(pipe_status[0], pinned,
sizeof(*pinned));
668 CloseReturnPipe(pipe_status);
674 uint64_t gauge, size_pinned;
675 GetSharedStatus(&gauge, &size_pinned);
682 uint64_t gauge, size_pinned;
683 GetSharedStatus(&gauge, &size_pinned);
689 if (!
spawned_ || (protocol_revision_ < 2))
return 0;
690 uint64_t cleanup_rate;
692 int pipe_cleanup_rate[2];
693 MakeReturnPipe(pipe_cleanup_rate);
698 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
699 ReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate,
sizeof(cleanup_rate));
700 CloseReturnPipe(pipe_cleanup_rate);
710 fd_lock_cachedb_ =
LockFile(workspace_dir_ +
"/lock_cachedb");
711 if (fd_lock_cachedb_ < 0) {
717 const string db_file = cache_dir_ +
"/cachedb";
718 if (rebuild_database) {
721 unlink(db_file.c_str());
722 unlink((db_file +
"-journal").c_str());
726 int err = sqlite3_open(db_file.c_str(), &database_);
727 if (err != SQLITE_OK) {
729 goto init_database_fail;
732 sql =
"PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
733 "PRAGMA auto_vacuum=1; "
734 "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
735 " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
736 "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
737 "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
738 " ON cache_catalog (acseq); "
739 "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
740 "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
741 "CREATE INDEX idx_fscache_actime ON fscache (actime); "
742 "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
743 " CONSTRAINT pk_properties PRIMARY KEY(key));";
744 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
745 if (err != SQLITE_OK) {
748 sqlite3_close(database_);
749 unlink(db_file.c_str());
750 unlink((db_file +
"-journal").c_str());
752 "LRU database corrupted, re-building");
757 goto init_database_fail;
762 sql =
"ALTER TABLE cache_catalog ADD type INTEGER; "
763 "ALTER TABLE cache_catalog ADD pinned INTEGER";
764 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
765 if (err == SQLITE_OK) {
766 sql =
"UPDATE cache_catalog SET type=" +
StringifyInt(kFileRegular) +
";";
767 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
768 if (err != SQLITE_OK) {
770 "could not init cache database (failed: %s)", sql.c_str());
771 goto init_database_fail;
776 sql =
"UPDATE cache_catalog SET pinned=0;";
777 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
778 if (err != SQLITE_OK) {
781 goto init_database_fail;
785 sql =
"INSERT OR REPLACE INTO properties (key, value) "
786 "VALUES ('schema', '1.0')";
787 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
788 if (err != SQLITE_OK) {
791 goto init_database_fail;
795 sql =
"SELECT count(*) FROM cache_catalog;";
796 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
797 if (sqlite3_step(stmt) == SQLITE_ROW) {
798 if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
800 "CernVM-FS: building lru cache database...");
801 if (!RebuildDatabase()) {
803 "could not build cache database from file system");
804 sqlite3_finalize(stmt);
805 goto init_database_fail;
808 sqlite3_finalize(stmt);
811 sqlite3_finalize(stmt);
812 goto init_database_fail;
816 sql =
"SELECT sum(size) FROM cache_catalog;";
817 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
818 if (sqlite3_step(stmt) == SQLITE_ROW) {
819 gauge_ = sqlite3_column_int64(stmt, 0);
822 sqlite3_finalize(stmt);
823 goto init_database_fail;
825 sqlite3_finalize(stmt);
828 sql =
"SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
829 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
830 if (sqlite3_step(stmt) == SQLITE_ROW) {
831 seq_ = sqlite3_column_int64(stmt, 0)+1;
834 sqlite3_finalize(stmt);
835 goto init_database_fail;
837 sqlite3_finalize(stmt);
840 sqlite3_prepare_v2(database_,
841 "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
842 "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
843 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=0 "
844 "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
845 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=2 "
846 "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
847 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=1 "
848 "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
849 sqlite3_prepare_v2(database_,
850 "INSERT OR REPLACE INTO cache_catalog "
851 "(sha1, size, acseq, path, type, pinned) "
852 "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
853 -1, &stmt_new_, NULL);
854 sqlite3_prepare_v2(database_,
855 "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
856 -1, &stmt_size_, NULL);
857 sqlite3_prepare_v2(database_,
"DELETE FROM cache_catalog WHERE sha1=:sha1;",
858 -1, &stmt_rm_, NULL);
859 sqlite3_prepare_v2(database_,
860 "SELECT sha1, size FROM cache_catalog WHERE "
861 "acseq=(SELECT min(acseq) "
862 "FROM cache_catalog WHERE pinned<>2);",
863 -1, &stmt_lru_, NULL);
864 sqlite3_prepare_v2(database_,
865 (
"SELECT path FROM cache_catalog WHERE type=" +
867 ";").c_str(), -1, &stmt_list_, NULL);
868 sqlite3_prepare_v2(database_,
869 "SELECT path FROM cache_catalog WHERE pinned<>0;",
870 -1, &stmt_list_pinned_, NULL);
871 sqlite3_prepare_v2(database_,
872 "SELECT path FROM cache_catalog WHERE acseq < 0;",
873 -1, &stmt_list_volatile_, NULL);
874 sqlite3_prepare_v2(database_,
875 (
"SELECT path FROM cache_catalog WHERE type=" +
877 ";").c_str(), -1, &stmt_list_catalogs_, NULL);
881 sqlite3_close(database_);
895 const string &description)
897 DoInsert(any_hash, size, description, kInsert);
909 const string &description)
911 DoInsert(any_hash, size, description, kInsertVolatile);
919 return DoList(kList);
927 return DoList(kListPinned);
935 return DoList(kListCatalogs);
943 return DoList(kListVolatile);
960 ParseDirectories(
string(argv[2]),
970 vector<string> logfiles =
SplitString(argv[10],
':');
974 if ((logfiles.size() > 0) && (logfiles[0] !=
""))
976 if (logfiles.size() > 1)
984 watchdog->
Spawn(
"./stacktrace.cachemgr");
987 const int fd_lockfile_fifo =
989 if (fd_lockfile_fifo < 0) {
996 const string crash_guard = shared_manager.
cache_dir_ +
"/cachemgr.running";
998 retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1001 "failed to create shared cache manager crash guard");
1009 sqlite3_temp_directory =
1010 static_cast<char *
>(sqlite3_malloc(tmp_dir.length() + 1));
1011 snprintf(sqlite3_temp_directory, tmp_dir.length() + 1,
"%s", tmp_dir.c_str());
1025 const string protocol_revision_path =
1027 retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1030 "failed to open protocol revision file (%d)", errno);
1034 const string revision =
StringifyInt(kProtocolRevision);
1035 int written = write(retval, revision.data(), revision.length());
1037 if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1039 "failed to write protocol revision (%d)", errno);
1044 const string fifo_path = shared_manager.
workspace_dir_ +
"/cachemgr";
1045 shared_manager.
pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1048 fifo_path.c_str(), errno);
1060 close(pipe_handshake);
1064 signal(SIGPIPE, SIG_IGN);
1066 signal(SIGINT, SIG_IGN);
1069 unlink(fifo_path.c_str());
1070 unlink(protocol_revision_path.c_str());
1072 unlink(crash_guard.c_str());
1075 if (sqlite3_temp_directory) {
1076 sqlite3_free(sqlite3_temp_directory);
1077 sqlite3_temp_directory = NULL;
1090 LruCommand command_buffer[kCommandBufferSize];
1091 char description_buffer[kCommandBufferSize*kMaxDescription];
1092 unsigned num_commands = 0;
1094 while (read(quota_mgr->
pipe_lru_[0], &command_buffer[num_commands],
1095 sizeof(command_buffer[0])) ==
sizeof(command_buffer[0]))
1099 const uint64_t
size = command_buffer[num_commands].
GetSize();
1102 if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1103 (command_type == kPin) || (command_type == kPinRegular))
1105 const int desc_length = command_buffer[num_commands].
desc_length;
1107 &description_buffer[kMaxDescription*num_commands], desc_length);
1111 if (command_type == kGetProtocolRevision) {
1113 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1114 if (return_pipe < 0)
1123 if (command_type == kCleanupRate) {
1125 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1126 if (return_pipe < 0)
1128 uint64_t period_s =
size;
1130 WritePipe(return_pipe, &rate,
sizeof(rate));
1136 if (command_type == kReserve) {
1137 bool success =
true;
1139 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1140 if (return_pipe < 0)
1144 const string hash_str(hash.
ToString());
1146 size, hash_str.c_str());
1153 "failed to insert %s (pinned), no space", hash_str.c_str());
1162 WritePipe(return_pipe, &success,
sizeof(success));
1168 if (command_type == kRegisterBackChannel) {
1170 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1171 if (return_pipe < 0)
1177 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1181 map<shash::Md5, int>::const_iterator iter =
1185 "closing left-over back channel %s", hash.
ToString().c_str());
1186 close(iter->second);
1192 WritePipe(return_pipe, &success,
sizeof(success));
1194 hash.
ToString().c_str(), return_pipe);
1199 if (command_type == kUnregisterBackChannel) {
1201 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1205 map<shash::Md5, int>::iterator iter =
1209 "closing back channel %s", hash.
ToString().c_str());
1210 close(iter->second);
1214 "did not find back channel %s", hash.
ToString().c_str());
1222 if (command_type == kUnpin) {
1224 const string hash_str(hash.
ToString());
1226 map<shash::Any, uint64_t>::iterator iter =
1229 quota_mgr->
pinned_ -= iter->second;
1238 "remove orphaned pinned hash %s from cache database",
1240 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1241 hash_str.length(), SQLITE_STATIC);
1243 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1244 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1245 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1246 hash_str.length(), SQLITE_STATIC);
1247 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1248 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1252 "failed to delete %s (%d)", hash_str.c_str(), retval);
1254 sqlite3_reset(quota_mgr->
stmt_rm_);
1264 bool immediate_command = (command_type == kCleanup) ||
1265 (command_type == kList) || (command_type == kListPinned) ||
1266 (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1267 (command_type == kRemove) || (command_type == kStatus) ||
1268 (command_type == kLimits) || (command_type == kPid);
1269 if (!immediate_command) num_commands++;
1271 if ((num_commands == kCommandBufferSize) || immediate_command)
1274 description_buffer);
1275 if (!immediate_command) num_commands = 0;
1278 if (immediate_command) {
1281 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1282 if (return_pipe < 0) {
1288 sqlite3_stmt *this_stmt_list = NULL;
1289 switch (command_type) {
1292 const string hash_str = hash.
ToString();
1295 bool success =
false;
1297 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1298 hash_str.length(), SQLITE_STATIC);
1300 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1301 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1302 uint64_t is_pinned = sqlite3_column_int64(quota_mgr->
stmt_size_, 1);
1304 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1305 hash_str.length(), SQLITE_STATIC);
1306 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1307 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1316 "failed to delete %s (%d)", hash_str.c_str(), retval);
1318 sqlite3_reset(quota_mgr->
stmt_rm_);
1325 WritePipe(return_pipe, &success,
sizeof(success));
1329 WritePipe(return_pipe, &retval,
sizeof(retval));
1332 if (!this_stmt_list) this_stmt_list = quota_mgr->
stmt_list_;
1342 while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1343 string path =
"(NULL)";
1344 if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1346 reinterpret_cast<const char *>(
1347 sqlite3_column_text(this_stmt_list, 0)));
1349 length = path.length();
1350 WritePipe(return_pipe, &length,
sizeof(length));
1352 WritePipe(return_pipe, &path[0], length);
1355 WritePipe(return_pipe, &length,
sizeof(length));
1356 sqlite3_reset(this_stmt_list);
1369 pid_t pid = getpid();
1370 WritePipe(return_pipe, &pid,
sizeof(pid));
1384 description_buffer);
1388 for (map<shash::Any, uint64_t>::const_iterator i =
1410 retval = mkfifo((workspace_dir_ +
"/pipe" +
StringifyInt(i)).c_str(), 0600);
1413 }
while ((retval == -1) && (errno == EEXIST));
1417 pipe[0] = open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe[1])).c_str(),
1418 O_RDONLY | O_NONBLOCK);
1425 const std::string cache_workspace,
1426 std::string *cache_dir,
1427 std::string *workspace_dir)
1429 vector<string> dir_tokens(
SplitString(cache_workspace,
':'));
1430 switch (dir_tokens.size()) {
1432 *cache_dir = *workspace_dir = dir_tokens[0];
1435 *cache_dir = dir_tokens[0];
1436 *workspace_dir = dir_tokens[1];
1451 const uint64_t
size,
1452 const string &description,
1453 const bool is_catalog)
1455 assert((size > 0) || !is_catalog);
1457 const string hash_str = hash.
ToString();
1459 hash_str.c_str(), description.c_str());
1464 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1465 if (pinned_ + size > cleanup_threshold_) {
1470 pinned_chunks_[hash] =
size;
1472 CheckHighPinWatermark();
1475 bool exists = Contains(hash_str);
1476 if (!exists && (gauge_ + size > limit_)) {
1479 int retval = DoCleanup(cleanup_threshold_);
1482 sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1484 sqlite3_bind_int64(stmt_new_, 2, size);
1485 sqlite3_bind_int64(stmt_new_, 3, seq_++);
1486 sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1488 sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1489 sqlite3_bind_int64(stmt_new_, 6, 1);
1490 int retval = sqlite3_step(stmt_new_);
1491 assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1492 sqlite3_reset(stmt_new_);
1493 if (!exists) gauge_ +=
size;
1497 int pipe_reserve[2];
1498 MakeReturnPipe(pipe_reserve);
1505 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
1507 ReadHalfPipe(pipe_reserve[0], &result,
sizeof(result));
1508 CloseReturnPipe(pipe_reserve);
1510 if (!result)
return false;
1511 DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1518 const uint64_t limit,
1519 const uint64_t cleanup_threshold,
1520 const string &cache_workspace)
1524 , cleanup_threshold_(cleanup_threshold)
1530 , fd_lock_cachedb_(-1)
1531 , async_delete_(true)
1536 , stmt_unblock_(NULL)
1542 , stmt_list_pinned_(NULL)
1543 , stmt_list_catalogs_(NULL)
1544 , stmt_list_volatile_(NULL)
1545 , initialized_(false)
1584 const char *descriptions)
1586 int retval = sqlite3_exec(
database_,
"BEGIN", NULL, NULL, NULL);
1587 assert(retval == SQLITE_OK);
1589 for (
unsigned i = 0; i < num; ++i) {
1591 const string hash_str = hash.
ToString();
1597 switch (commands[i].command_type) {
1600 sqlite3_bind_text(
stmt_touch_, 2, &hash_str[0], hash_str.length(),
1604 hash_str.c_str(),
seq_-1, retval);
1605 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1607 hash_str.c_str(), retval);
1612 sqlite3_bind_text(
stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1616 hash_str.c_str(), retval);
1617 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1619 hash_str.c_str(), retval);
1639 sqlite3_bind_text(
stmt_new_, 1, &hash_str[0], hash_str.length(),
1648 commands[i].desc_length, SQLITE_STATIC);
1649 sqlite3_bind_int64(
stmt_new_, 5, (commands[i].command_type ==
kPin) ?
1652 ((commands[i].command_type ==
kPin) ||
1653 (commands[i].command_type ==
kPinRegular)) ? 1 : 0);
1657 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1659 hash_str.c_str(), retval);
1671 retval = sqlite3_exec(
database_,
"COMMIT", NULL, NULL, NULL);
1672 if (retval != SQLITE_OK) {
1679 bool result =
false;
1681 sqlite3_stmt *stmt_select = NULL;
1682 sqlite3_stmt *stmt_insert = NULL;
1694 sql =
"DELETE FROM cache_catalog; DELETE FROM fscache;";
1695 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1696 if (sqlerr != SQLITE_OK) {
1705 sqlite3_prepare_v2(
database_,
"INSERT INTO fscache (sha1, size, actime) "
1706 "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1708 for (
int i = 0; i <= 0xff; i++) {
1709 snprintf(hex,
sizeof(hex),
"%02x", i);
1711 if ((dirp = opendir(path.c_str())) == NULL) {
1713 "failed to open directory %s (tmpwatch interfering?)",
1718 string file_path = path +
"/" + string(d->d_name);
1719 if (stat(file_path.c_str(), &info) == 0) {
1720 if (!S_ISREG(info.st_mode))
1722 if (info.st_size == 0) {
1724 "removing empty file %s during automatic cache db rebuild",
1726 unlink(file_path.c_str());
1730 string hash = string(hex) + string(d->d_name);
1731 sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1733 sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1734 sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1735 if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1739 sqlite3_reset(stmt_insert);
1749 sqlite3_finalize(stmt_insert);
1754 "SELECT sha1, size FROM fscache ORDER BY actime;",
1755 -1, &stmt_select, NULL);
1757 "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1758 "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1759 -1, &stmt_insert, NULL);
1760 while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1761 const string hash = string(
1762 reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1763 sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1764 sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1765 sqlite3_bind_int64(stmt_insert, 3, seq++);
1769 int retval = sqlite3_step(stmt_insert);
1770 if (retval != SQLITE_DONE) {
1773 "could not insert into cache catalog (%d - %s)",
1774 retval, sqlite3_errstr(retval));
1777 sqlite3_reset(stmt_insert);
1781 sql =
"DELETE FROM fscache;";
1782 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1783 if (sqlerr != SQLITE_OK) {
1792 "rebuilding finished, sequence %" PRIu64
", gauge %" PRIu64,
1796 if (stmt_insert) sqlite3_finalize(stmt_insert);
1797 if (stmt_select) sqlite3_finalize(stmt_select);
1798 if (dirp) closedir(dirp);
1808 int back_channel[2],
1809 const string &channel_id)
1823 ReadHalfPipe(back_channel[0], &success,
sizeof(success));
1825 if (success !=
'S') {
1827 "failed to register quota back channel (%c)", success);
1852 ReadHalfPipe(pipe_remove[0], &success,
sizeof(success));
1864 static_cast<void *>(
this)) != 0)
1901 cmd.StoreHash(hash);
1907 int back_channel[2],
1908 const string &channel_id)
1920 close(back_channel[0]);
virtual uint32_t GetProtocolRevision()
#define LogCvmfs(source, mask,...)
void SetLogSyslogFacility(const int local_facility)
sqlite3_stmt * stmt_list_pinned_
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
std::vector< std::string > DoList(const CommandType list_command)
virtual uint64_t GetCleanupRate(uint64_t period_s)
sqlite3_stmt * stmt_list_
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
virtual uint64_t GetSize()
virtual bool Cleanup(const uint64_t leave_size)
static const unsigned kSqliteMemPerThread
virtual uint64_t GetMaxFileSize()
std::string ToString(const bool with_suffix=false) const
uint64_t cleanup_threshold_
static const uint32_t kProtocolRevision
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
bool DoCleanup(const uint64_t leave_size)
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
int BindReturnPipe(int pipe_wronly)
bool Contains(const std::string &hash_str)
perf::MultiRecorder cleanup_recorder_
void SetLogMicroSyslog(const std::string &filename)
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
virtual std::vector< std::string > ListCatalogs()
void CheckHighPinWatermark()
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
bool FileExists(const std::string &path)
static Watchdog * Create(FnOnCrash on_crash)
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::string workspace_dir_
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
#define GetLogDebugFile()
void ReadHalfPipe(int fd, void *buf, size_t nbyte)
vector< string > SplitString(const string &str, char delim)
void Nonblock2Block(int filedes)
virtual uint64_t GetSizePinned()
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
virtual uint64_t GetCapacity()
void SetSize(const uint64_t new_size)
shash::Any RetrieveHash() const
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
sqlite3_stmt * stmt_touch_
string StringifyInt(const int64_t value)
std::string GetLogMicroSyslog()
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
sqlite3_stmt * stmt_list_volatile_
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
static int MainCacheManager(int argc, char **argv)
std::map< shash::Md5, int > back_channels_
static const unsigned kMaxDescription
bool CloseAllFildes(const std::set< int > &preserve_fildes)
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
unsigned char digest[shash::kMaxDigestSize]
std::string MakePathWithoutSuffix() const
sqlite3_stmt * stmt_list_catalogs_
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
uint64_t GetNoTicks(uint32_t retrospect_s) const
Any MkFromHexPtr(const HexPtr hex, const char suffix)
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
std::map< shash::Any, uint64_t > pinned_chunks_
sqlite3_stmt * stmt_unpin_
void CloseReturnPipe(int pipe[2])
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
const unsigned kDigestSizes[]
void UnlockBackChannels()
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
static const uint64_t kVolatileFlag
void UnlockFile(const int filedes)
sqlite3_stmt * stmt_size_
int GetLogSyslogFacility()