15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
18 #include "cvmfs_config.h"
31 #include <sys/statfs.h>
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
70 open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe_wronly)).c_str(),
71 O_WRONLY | O_NONBLOCK);
76 "failed to bind return pipe (%d)", errno);
83 const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84 if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86 "high watermark of pinned files (%" PRIu64
"M > %" PRIu64
"M)",
87 pinned_/(1024*1024), watermark/(1024*1024));
88 BroadcastBackchannels(
"R");
94 DIR *dirp = opendir(workspace_dir_.c_str());
98 bool found_leftovers =
false;
100 const string name = dent->d_name;
101 const string path = workspace_dir_ +
"/" + name;
106 if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) ==
"pipe")) {
107 if (!found_leftovers) {
109 "removing left-over FIFOs from cache directory");
111 found_leftovers =
true;
112 unlink(path.c_str());
127 return DoCleanup(leave_size);
131 MakeReturnPipe(pipe_cleanup);
135 cmd.
size = leave_size;
138 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
139 ManagedReadHalfPipe(pipe_cleanup[0], &result,
sizeof(result));
140 CloseReturnPipe(pipe_cleanup);
147 if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148 if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149 if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150 if (stmt_list_) sqlite3_finalize(stmt_list_);
151 if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152 if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153 if (stmt_size_) sqlite3_finalize(stmt_size_);
154 if (stmt_touch_) sqlite3_finalize(stmt_touch_);
155 if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
156 if (stmt_block_) sqlite3_finalize(stmt_block_);
157 if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
158 if (stmt_new_) sqlite3_finalize(stmt_new_);
159 if (database_) sqlite3_close(database_);
162 stmt_list_catalogs_ = NULL;
163 stmt_list_pinned_ = NULL;
164 stmt_list_volatile_ = NULL;
171 stmt_unblock_ = NULL;
175 pinned_chunks_.clear();
182 UnlinkReturnPipe(pipe[1]);
192 sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
194 if (sqlite3_step(stmt_size_) == SQLITE_ROW)
196 sqlite3_reset(stmt_size_);
198 hash_str.c_str(), result);
205 if ((limit_ == 0) || (gauge_ >= limit_))
208 struct statvfs vfs_info;
209 int retval = statvfs((cache_dir_ +
"/cachedb").c_str(), &vfs_info);
212 "failed to query %s for free space (%d)",
213 cache_dir_.c_str(), errno);
216 int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
218 free_space_byte / (1024 * 1024));
220 int64_t required_byte = limit_ - gauge_;
221 if (free_space_byte < required_byte) {
223 "too little free space on the file system hosting the cache,"
224 " %" PRId64
" MB available",
225 free_space_byte / (1024 * 1024));
231 const string &cache_workspace,
232 const uint64_t limit,
233 const uint64_t cleanup_threshold,
234 const bool rebuild_database)
236 if (cleanup_threshold >= limit) {
238 "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
247 delete quota_manager;
255 return quota_manager;
263 const std::string &exe_path,
264 const std::string &cache_workspace,
265 const uint64_t limit,
266 const uint64_t cleanup_threshold,
270 string workspace_dir;
271 ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
273 pid_t new_cachemgr_pid;
276 const int fd_lockfile =
LockFile(workspace_dir +
"/lock_cachemgr");
277 if (fd_lockfile < 0) {
279 (workspace_dir +
"/lock_cachemgr").c_str(), errno);
289 const string fifo_path = workspace_dir +
"/cachemgr";
291 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
293 const int fd_lockfile_rw = open((workspace_dir +
"/lock_cachemgr").c_str(), O_RDWR, 0600);
294 unsigned lockfile_magicnumber = 0;
295 const ssize_t result_mn =
SafeRead(fd_lockfile_rw, &lockfile_magicnumber,
sizeof(lockfile_magicnumber));
296 const ssize_t result =
SafeRead(fd_lockfile_rw, &new_cachemgr_pid,
sizeof(new_cachemgr_pid));
297 close(fd_lockfile_rw);
299 if ((lockfile_magicnumber != kLockFileMagicNumber) || (result < 0) || (result_mn < 0)
300 || (static_cast<size_t>(result) <
sizeof(new_cachemgr_pid))) {
303 "could not read cache manager pid from lockfile");
323 "received limit %" PRIu64
", threshold %" PRIu64,
325 if (
FileExists(workspace_dir +
"/cachemgr.protocol")) {
334 const int connect_error = errno;
337 const int fd_lockfile_fifo =
LockFile(workspace_dir +
"/lock_cachemgr.fifo");
338 if (fd_lockfile_fifo < 0) {
340 (workspace_dir +
"/lock_cachemgr.fifo").c_str(), errno);
347 if (connect_error == ENXIO) {
349 unlink(fifo_path.c_str());
353 int retval = mkfifo(fifo_path.c_str(), 0600);
364 int pipe_handshake[2];
368 vector<string> command_line;
369 command_line.push_back(exe_path);
370 command_line.push_back(
"__cachemgr__");
371 command_line.push_back(cache_workspace);
373 command_line.push_back(
StringifyInt(pipe_handshake[0]));
375 command_line.push_back(
StringifyInt(cleanup_threshold));
383 set<int> preserve_filedes;
384 preserve_filedes.insert(0);
385 preserve_filedes.insert(1);
386 preserve_filedes.insert(2);
387 preserve_filedes.insert(pipe_boot[1]);
388 preserve_filedes.insert(pipe_handshake[0]);
391 retval =
ManagedExec(command_line, preserve_filedes, map<int, int>(),
409 const int fd_lockfile_rw = open((workspace_dir +
"/lock_cachemgr").c_str(), O_RDWR | O_TRUNC, 0600);
411 const bool result_mn =
SafeWrite(fd_lockfile_rw, &magic_number,
sizeof(magic_number));
412 const bool result =
SafeWrite(fd_lockfile_rw, &new_cachemgr_pid,
sizeof(new_cachemgr_pid));
413 if (!result || !result_mn) {
417 close(fd_lockfile_rw);
420 close(pipe_handshake[0]);
422 if (read(pipe_boot[0], &buf, 1) != 1) {
425 close(pipe_handshake[1]);
428 "cache manager did not start");
434 quota_mgr->
pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
437 "failed to connect to newly created FIFO (%d)", errno);
438 close(pipe_handshake[1]);
446 if (write(pipe_handshake[1], &buf, 1) != 1) {
448 close(pipe_handshake[1]);
453 close(pipe_handshake[1]);
464 "threshold %" PRIu64,
471 if (gauge_ <= leave_size)
476 "clean up cache until at most %lu KB is used", leave_size/1024);
478 cleanup_recorder_.Tick();
482 vector<string> trash;
485 sqlite3_reset(stmt_lru_);
486 if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
491 hash_str = string(reinterpret_cast<const char *>(
492 sqlite3_column_text(stmt_lru_, 0)));
499 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
500 trash.push_back(cache_dir_ +
"/" + hash.MakePathWithoutSuffix());
501 gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
503 hash_str.c_str(), gauge_);
505 sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
507 result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
508 sqlite3_reset(stmt_rm_);
512 "failed to find %s in cache database (%d). "
513 "Cache database is out of sync. "
514 "Restart cvmfs with clean cache.", hash_str.c_str(), result);
518 sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
520 result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
521 sqlite3_reset(stmt_block_);
524 }
while (gauge_ > leave_size);
526 result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
527 sqlite3_reset(stmt_unblock_);
532 if (!trash.empty()) {
536 if ((pid = fork()) == 0) {
544 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
546 unlink(trash[i].c_str());
553 waitpid(pid, &statloc, 0);
558 for (
unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
560 unlink(trash[i].c_str());
565 if (gauge_ > leave_size) {
567 "request to clean until %" PRIu64
", "
568 "but effective gauge is %" PRIu64, leave_size, gauge_);
578 const string &description,
581 const string hash_str = hash.
ToString();
583 hash_str.c_str(), description.c_str(), command_type);
584 const unsigned desc_length = (description.length() > kMaxDescription) ?
585 kMaxDescription : description.length();
594 memcpy(reinterpret_cast<char *>(cmd)+
sizeof(
LruCommand),
595 &description[0], desc_length);
601 vector<string> result;
604 MakeReturnPipe(pipe_list);
605 char description_buffer[kMaxDescription];
610 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
614 ManagedReadHalfPipe(pipe_list[0], &length,
sizeof(length));
616 ReadPipe(pipe_list[0], description_buffer, length);
617 result.push_back(
string(description_buffer, length));
619 }
while (length >= 0);
621 CloseReturnPipe(pipe_list);
627 if (limit_ != (uint64_t)(-1))
632 if (statfs(
".", &info) == 0) {
633 return info.f_bavail * info.f_bsize;
636 "failed to query file system info of cache (%d)", errno);
645 MakeReturnPipe(pipe_limits);
650 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
651 ManagedReadHalfPipe(pipe_limits[0], limit,
sizeof(*limit));
652 ReadPipe(pipe_limits[0], cleanup_threshold,
sizeof(*cleanup_threshold));
653 CloseReturnPipe(pipe_limits);
662 return limit_ - cleanup_threshold_;
671 return cachemgr_pid_;
676 MakeReturnPipe(pipe_pid);
681 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
683 CloseReturnPipe(pipe_pid);
689 int pipe_revision[2];
690 MakeReturnPipe(pipe_revision);
695 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
698 ManagedReadHalfPipe(pipe_revision[0], &revision,
sizeof(revision));
699 CloseReturnPipe(pipe_revision);
709 MakeReturnPipe(pipe_status);
714 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
715 ManagedReadHalfPipe(pipe_status[0], gauge,
sizeof(*gauge));
716 ReadPipe(pipe_status[0], pinned,
sizeof(*pinned));
717 CloseReturnPipe(pipe_status);
723 uint64_t gauge, size_pinned;
724 GetSharedStatus(&gauge, &size_pinned);
731 uint64_t gauge, size_pinned;
732 GetSharedStatus(&gauge, &size_pinned);
738 if (!
spawned_ || (protocol_revision_ < 2))
return 0;
739 uint64_t cleanup_rate;
741 int pipe_cleanup_rate[2];
742 MakeReturnPipe(pipe_cleanup_rate);
747 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
748 ManagedReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate,
sizeof(cleanup_rate));
749 CloseReturnPipe(pipe_cleanup_rate);
759 fd_lock_cachedb_ =
LockFile(workspace_dir_ +
"/lock_cachedb");
760 if (fd_lock_cachedb_ < 0) {
766 const string db_file = cache_dir_ +
"/cachedb";
767 if (rebuild_database) {
770 unlink(db_file.c_str());
771 unlink((db_file +
"-journal").c_str());
775 int err = sqlite3_open(db_file.c_str(), &database_);
776 if (err != SQLITE_OK) {
778 goto init_database_fail;
781 sql =
"PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
782 "PRAGMA auto_vacuum=1; "
783 "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
784 " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
785 "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
786 "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
787 " ON cache_catalog (acseq); "
788 "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
789 "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
790 "CREATE INDEX idx_fscache_actime ON fscache (actime); "
791 "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
792 " CONSTRAINT pk_properties PRIMARY KEY(key));";
793 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
794 if (err != SQLITE_OK) {
797 sqlite3_close(database_);
798 unlink(db_file.c_str());
799 unlink((db_file +
"-journal").c_str());
801 "LRU database corrupted, re-building");
806 goto init_database_fail;
811 sql =
"ALTER TABLE cache_catalog ADD type INTEGER; "
812 "ALTER TABLE cache_catalog ADD pinned INTEGER";
813 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
814 if (err == SQLITE_OK) {
815 sql =
"UPDATE cache_catalog SET type=" +
StringifyInt(kFileRegular) +
";";
816 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
817 if (err != SQLITE_OK) {
819 "could not init cache database (failed: %s)", sql.c_str());
820 goto init_database_fail;
825 sql =
"UPDATE cache_catalog SET pinned=0;";
826 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
827 if (err != SQLITE_OK) {
830 goto init_database_fail;
834 sql =
"INSERT OR REPLACE INTO properties (key, value) "
835 "VALUES ('schema', '1.0')";
836 err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
837 if (err != SQLITE_OK) {
840 goto init_database_fail;
844 sql =
"SELECT count(*) FROM cache_catalog;";
845 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
846 if (sqlite3_step(stmt) == SQLITE_ROW) {
847 if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
849 "CernVM-FS: building lru cache database...");
850 if (!RebuildDatabase()) {
852 "could not build cache database from file system");
853 sqlite3_finalize(stmt);
854 goto init_database_fail;
857 sqlite3_finalize(stmt);
860 sqlite3_finalize(stmt);
861 goto init_database_fail;
865 sql =
"SELECT sum(size) FROM cache_catalog;";
866 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
867 if (sqlite3_step(stmt) == SQLITE_ROW) {
868 gauge_ = sqlite3_column_int64(stmt, 0);
871 sqlite3_finalize(stmt);
872 goto init_database_fail;
874 sqlite3_finalize(stmt);
877 sql =
"SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
878 sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
879 if (sqlite3_step(stmt) == SQLITE_ROW) {
880 seq_ = sqlite3_column_int64(stmt, 0)+1;
883 sqlite3_finalize(stmt);
884 goto init_database_fail;
886 sqlite3_finalize(stmt);
889 sqlite3_prepare_v2(database_,
890 "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
891 "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
892 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=0 "
893 "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
894 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=2 "
895 "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
896 sqlite3_prepare_v2(database_,
"UPDATE cache_catalog SET pinned=1 "
897 "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
898 sqlite3_prepare_v2(database_,
899 "INSERT OR REPLACE INTO cache_catalog "
900 "(sha1, size, acseq, path, type, pinned) "
901 "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
902 -1, &stmt_new_, NULL);
903 sqlite3_prepare_v2(database_,
904 "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
905 -1, &stmt_size_, NULL);
906 sqlite3_prepare_v2(database_,
"DELETE FROM cache_catalog WHERE sha1=:sha1;",
907 -1, &stmt_rm_, NULL);
908 sqlite3_prepare_v2(database_,
909 "SELECT sha1, size FROM cache_catalog WHERE "
910 "acseq=(SELECT min(acseq) "
911 "FROM cache_catalog WHERE pinned<>2);",
912 -1, &stmt_lru_, NULL);
913 sqlite3_prepare_v2(database_,
914 (
"SELECT path FROM cache_catalog WHERE type=" +
916 ";").c_str(), -1, &stmt_list_, NULL);
917 sqlite3_prepare_v2(database_,
918 "SELECT path FROM cache_catalog WHERE pinned<>0;",
919 -1, &stmt_list_pinned_, NULL);
920 sqlite3_prepare_v2(database_,
921 "SELECT path FROM cache_catalog WHERE acseq < 0;",
922 -1, &stmt_list_volatile_, NULL);
923 sqlite3_prepare_v2(database_,
924 (
"SELECT path FROM cache_catalog WHERE type=" +
926 ";").c_str(), -1, &stmt_list_catalogs_, NULL);
930 sqlite3_close(database_);
944 const string &description)
946 DoInsert(any_hash, size, description, kInsert);
958 const string &description)
960 DoInsert(any_hash, size, description, kInsertVolatile);
968 return DoList(kList);
976 return DoList(kListPinned);
984 return DoList(kListCatalogs);
992 return DoList(kListVolatile);
1005 shared_manager.
shared_ =
true;
1010 ParseDirectories(
string(argv[2]),
1020 vector<string> logfiles =
SplitString(argv[10],
':');
1024 if ((logfiles.size() > 0) && (logfiles[0] !=
""))
1026 if (logfiles.size() > 1)
1034 watchdog->
Spawn(
"./stacktrace.cachemgr");
1037 const int fd_lockfile_fifo =
1039 if (fd_lockfile_fifo < 0) {
1046 const string crash_guard = shared_manager.
cache_dir_ +
"/cachemgr.running";
1047 const bool rebuild =
FileExists(crash_guard);
1048 retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1051 "failed to create shared cache manager crash guard");
1059 sqlite3_temp_directory =
1060 static_cast<char *
>(sqlite3_malloc(tmp_dir.length() + 1));
1061 snprintf(sqlite3_temp_directory, tmp_dir.length() + 1,
"%s", tmp_dir.c_str());
1075 const string protocol_revision_path =
1077 retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1080 "failed to open protocol revision file (%d)", errno);
1084 const string revision =
StringifyInt(kProtocolRevision);
1085 int written = write(retval, revision.data(), revision.length());
1087 if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1089 "failed to write protocol revision (%d)", errno);
1094 const string fifo_path = shared_manager.
workspace_dir_ +
"/cachemgr";
1095 shared_manager.
pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1098 fifo_path.c_str(), errno);
1110 close(pipe_handshake);
1114 signal(SIGPIPE, SIG_IGN);
1116 signal(SIGINT, SIG_IGN);
1119 unlink(fifo_path.c_str());
1120 unlink(protocol_revision_path.c_str());
1122 unlink(crash_guard.c_str());
1125 if (sqlite3_temp_directory) {
1126 sqlite3_free(sqlite3_temp_directory);
1127 sqlite3_temp_directory = NULL;
1140 LruCommand command_buffer[kCommandBufferSize];
1141 char description_buffer[kCommandBufferSize*kMaxDescription];
1142 unsigned num_commands = 0;
1144 while (read(quota_mgr->
pipe_lru_[0], &command_buffer[num_commands],
1145 sizeof(command_buffer[0])) ==
sizeof(command_buffer[0]))
1149 const uint64_t
size = command_buffer[num_commands].
GetSize();
1152 if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1153 (command_type == kPin) || (command_type == kPinRegular))
1155 const int desc_length = command_buffer[num_commands].
desc_length;
1157 &description_buffer[kMaxDescription*num_commands], desc_length);
1161 if (command_type == kGetProtocolRevision) {
1163 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1164 if (return_pipe < 0)
1173 if (command_type == kCleanupRate) {
1175 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1176 if (return_pipe < 0)
1178 uint64_t period_s =
size;
1180 WritePipe(return_pipe, &rate,
sizeof(rate));
1186 if (command_type == kReserve) {
1187 bool success =
true;
1189 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1190 if (return_pipe < 0)
1194 const string hash_str(hash.
ToString());
1196 size, hash_str.c_str());
1203 "failed to insert %s (pinned), no space", hash_str.c_str());
1212 WritePipe(return_pipe, &success,
sizeof(success));
1218 if (command_type == kRegisterBackChannel) {
1220 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1221 if (return_pipe < 0)
1227 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1231 map<shash::Md5, int>::const_iterator iter =
1235 "closing left-over back channel %s", hash.
ToString().c_str());
1236 close(iter->second);
1242 WritePipe(return_pipe, &success,
sizeof(success));
1244 hash.
ToString().c_str(), return_pipe);
1249 if (command_type == kUnregisterBackChannel) {
1251 memcpy(hash.
digest, command_buffer[num_commands].
digest,
1255 map<shash::Md5, int>::iterator iter =
1259 "closing back channel %s", hash.
ToString().c_str());
1260 close(iter->second);
1264 "did not find back channel %s", hash.
ToString().c_str());
1272 if (command_type == kUnpin) {
1274 const string hash_str(hash.
ToString());
1276 map<shash::Any, uint64_t>::iterator iter =
1279 quota_mgr->
pinned_ -= iter->second;
1288 "remove orphaned pinned hash %s from cache database",
1290 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1291 hash_str.length(), SQLITE_STATIC);
1293 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1294 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1295 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1296 hash_str.length(), SQLITE_STATIC);
1297 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1298 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1302 "failed to delete %s (%d)", hash_str.c_str(), retval);
1304 sqlite3_reset(quota_mgr->
stmt_rm_);
1314 bool immediate_command = (command_type == kCleanup) ||
1315 (command_type == kList) || (command_type == kListPinned) ||
1316 (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1317 (command_type == kRemove) || (command_type == kStatus) ||
1318 (command_type == kLimits) || (command_type == kPid);
1319 if (!immediate_command) num_commands++;
1321 if ((num_commands == kCommandBufferSize) || immediate_command)
1324 description_buffer);
1325 if (!immediate_command) num_commands = 0;
1328 if (immediate_command) {
1331 quota_mgr->
BindReturnPipe(command_buffer[num_commands].return_pipe);
1332 if (return_pipe < 0) {
1338 sqlite3_stmt *this_stmt_list = NULL;
1339 switch (command_type) {
1342 const string hash_str = hash.
ToString();
1345 bool success =
false;
1347 sqlite3_bind_text(quota_mgr->
stmt_size_, 1, &hash_str[0],
1348 hash_str.length(), SQLITE_STATIC);
1350 if ((retval = sqlite3_step(quota_mgr->
stmt_size_)) == SQLITE_ROW) {
1351 uint64_t size = sqlite3_column_int64(quota_mgr->
stmt_size_, 0);
1352 uint64_t is_pinned = sqlite3_column_int64(quota_mgr->
stmt_size_, 1);
1354 sqlite3_bind_text(quota_mgr->
stmt_rm_, 1, &(hash_str[0]),
1355 hash_str.length(), SQLITE_STATIC);
1356 retval = sqlite3_step(quota_mgr->
stmt_rm_);
1357 if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1366 "failed to delete %s (%d)", hash_str.c_str(), retval);
1368 sqlite3_reset(quota_mgr->
stmt_rm_);
1375 WritePipe(return_pipe, &success,
sizeof(success));
1379 WritePipe(return_pipe, &retval,
sizeof(retval));
1382 if (!this_stmt_list) this_stmt_list = quota_mgr->
stmt_list_;
1392 while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1393 string path =
"(NULL)";
1394 if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1396 reinterpret_cast<const char *>(
1397 sqlite3_column_text(this_stmt_list, 0)));
1399 length = path.length();
1400 WritePipe(return_pipe, &length,
sizeof(length));
1402 WritePipe(return_pipe, &path[0], length);
1405 WritePipe(return_pipe, &length,
sizeof(length));
1406 sqlite3_reset(this_stmt_list);
1419 pid_t pid = getpid();
1420 WritePipe(return_pipe, &pid,
sizeof(pid));
1434 description_buffer);
1438 for (map<shash::Any, uint64_t>::const_iterator i =
1460 retval = mkfifo((workspace_dir_ +
"/pipe" +
StringifyInt(i)).c_str(), 0600);
1463 }
while ((retval == -1) && (errno == EEXIST));
1467 pipe[0] = open((workspace_dir_ +
"/pipe" +
StringifyInt(pipe[1])).c_str(),
1468 O_RDONLY | O_NONBLOCK);
1475 const std::string cache_workspace,
1476 std::string *cache_dir,
1477 std::string *workspace_dir)
1479 vector<string> dir_tokens(
SplitString(cache_workspace,
':'));
1480 switch (dir_tokens.size()) {
1482 *cache_dir = *workspace_dir = dir_tokens[0];
1485 *cache_dir = dir_tokens[0];
1486 *workspace_dir = dir_tokens[1];
1501 const uint64_t
size,
1502 const string &description,
1503 const bool is_catalog)
1505 assert((size > 0) || !is_catalog);
1507 const string hash_str = hash.
ToString();
1509 hash_str.c_str(), description.c_str());
1514 if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1515 if (pinned_ + size > cleanup_threshold_) {
1520 pinned_chunks_[hash] =
size;
1522 CheckHighPinWatermark();
1525 bool exists = Contains(hash_str);
1526 if (!exists && (gauge_ + size > limit_)) {
1529 int retval = DoCleanup(cleanup_threshold_);
1532 sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1534 sqlite3_bind_int64(stmt_new_, 2, size);
1535 sqlite3_bind_int64(stmt_new_, 3, seq_++);
1536 sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1538 sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1539 sqlite3_bind_int64(stmt_new_, 6, 1);
1540 int retval = sqlite3_step(stmt_new_);
1541 assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1542 sqlite3_reset(stmt_new_);
1543 if (!exists) gauge_ +=
size;
1547 int pipe_reserve[2];
1548 MakeReturnPipe(pipe_reserve);
1555 WritePipe(pipe_lru_[1], &cmd,
sizeof(cmd));
1557 ManagedReadHalfPipe(pipe_reserve[0], &result,
sizeof(result));
1558 CloseReturnPipe(pipe_reserve);
1560 if (!result)
return false;
1561 DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1568 const uint64_t limit,
1569 const uint64_t cleanup_threshold,
1570 const string &cache_workspace)
1574 , cleanup_threshold_(cleanup_threshold)
1580 , fd_lock_cachedb_(-1)
1581 , async_delete_(true)
1587 , stmt_unblock_(NULL)
1593 , stmt_list_pinned_(NULL)
1594 , stmt_list_catalogs_(NULL)
1595 , stmt_list_volatile_(NULL)
1596 , initialized_(false)
1635 const char *descriptions)
1637 int retval = sqlite3_exec(
database_,
"BEGIN", NULL, NULL, NULL);
1638 assert(retval == SQLITE_OK);
1640 for (
unsigned i = 0; i < num; ++i) {
1642 const string hash_str = hash.
ToString();
1648 switch (commands[i].command_type) {
1651 sqlite3_bind_text(
stmt_touch_, 2, &hash_str[0], hash_str.length(),
1655 hash_str.c_str(),
seq_-1, retval);
1656 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1658 hash_str.c_str(), retval);
1663 sqlite3_bind_text(
stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1667 hash_str.c_str(), retval);
1668 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1670 hash_str.c_str(), retval);
1690 sqlite3_bind_text(
stmt_new_, 1, &hash_str[0], hash_str.length(),
1699 commands[i].desc_length, SQLITE_STATIC);
1700 sqlite3_bind_int64(
stmt_new_, 5, (commands[i].command_type ==
kPin) ?
1703 ((commands[i].command_type ==
kPin) ||
1704 (commands[i].command_type ==
kPinRegular)) ? 1 : 0);
1708 if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1710 hash_str.c_str(), retval);
1722 retval = sqlite3_exec(
database_,
"COMMIT", NULL, NULL, NULL);
1723 if (retval != SQLITE_OK) {
1730 bool result =
false;
1732 sqlite3_stmt *stmt_select = NULL;
1733 sqlite3_stmt *stmt_insert = NULL;
1745 sql =
"DELETE FROM cache_catalog; DELETE FROM fscache;";
1746 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1747 if (sqlerr != SQLITE_OK) {
1756 sqlite3_prepare_v2(
database_,
"INSERT INTO fscache (sha1, size, actime) "
1757 "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1759 for (
int i = 0; i <= 0xff; i++) {
1760 snprintf(hex,
sizeof(hex),
"%02x", i);
1762 if ((dirp = opendir(path.c_str())) == NULL) {
1764 "failed to open directory %s (tmpwatch interfering?)",
1769 string file_path = path +
"/" + string(d->d_name);
1770 if (stat(file_path.c_str(), &info) == 0) {
1771 if (!S_ISREG(info.st_mode))
1773 if (info.st_size == 0) {
1775 "removing empty file %s during automatic cache db rebuild",
1777 unlink(file_path.c_str());
1781 string hash = string(hex) + string(d->d_name);
1782 sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1784 sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1785 sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1786 if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1790 sqlite3_reset(stmt_insert);
1800 sqlite3_finalize(stmt_insert);
1805 "SELECT sha1, size FROM fscache ORDER BY actime;",
1806 -1, &stmt_select, NULL);
1808 "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1809 "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1810 -1, &stmt_insert, NULL);
1811 while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1812 const string hash = string(
1813 reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1814 sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1815 sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1816 sqlite3_bind_int64(stmt_insert, 3, seq++);
1820 int retval = sqlite3_step(stmt_insert);
1821 if (retval != SQLITE_DONE) {
1824 "could not insert into cache catalog (%d - %s)",
1825 retval, sqlite3_errstr(retval));
1828 sqlite3_reset(stmt_insert);
1832 sql =
"DELETE FROM fscache;";
1833 sqlerr = sqlite3_exec(
database_, sql.c_str(), NULL, NULL, NULL);
1834 if (sqlerr != SQLITE_OK) {
1843 "rebuilding finished, sequence %" PRIu64
", gauge %" PRIu64,
1847 if (stmt_insert) sqlite3_finalize(stmt_insert);
1848 if (stmt_select) sqlite3_finalize(stmt_select);
1849 if (dirp) closedir(dirp);
1859 int back_channel[2],
1860 const string &channel_id)
1876 if (success !=
'S') {
1878 "failed to register quota back channel (%c)", success);
1915 static_cast<void *>(
this)) != 0)
1952 cmd.StoreHash(hash);
1958 int back_channel[2],
1959 const string &channel_id)
1971 close(back_channel[0]);
1979 bool result =
false;
1985 PANIC(
kLogStderr,
"Error: quota manager could not read from cachemanager pipe");
virtual uint32_t GetProtocolRevision()
void SetLogSyslogFacility(const int local_facility)
sqlite3_stmt * stmt_list_pinned_
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
std::vector< std::string > DoList(const CommandType list_command)
virtual uint64_t GetCleanupRate(uint64_t period_s)
sqlite3_stmt * stmt_list_
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
virtual uint64_t GetSize()
virtual bool Cleanup(const uint64_t leave_size)
static const unsigned kSqliteMemPerThread
virtual uint64_t GetMaxFileSize()
std::string ToString(const bool with_suffix=false) const
uint64_t cleanup_threshold_
static const uint32_t kProtocolRevision
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
bool DoCleanup(const uint64_t leave_size)
bool SafeWrite(int fd, const void *buf, size_t nbyte)
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
int BindReturnPipe(int pipe_wronly)
bool Contains(const std::string &hash_str)
perf::MultiRecorder cleanup_recorder_
void SetLogMicroSyslog(const std::string &filename)
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
unsigned char digest[digest_size_]
virtual std::vector< std::string > ListCatalogs()
void CheckHighPinWatermark()
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
bool FileExists(const std::string &path)
static Watchdog * Create(FnOnCrash on_crash)
int64_t String2Int64(const string &value)
unsigned GetDigestSize() const
std::string workspace_dir_
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
bool ReadHalfPipe(int fd, void *buf, size_t nbyte, unsigned timeout_ms)
#define GetLogDebugFile()
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
vector< string > SplitString(const string &str, char delim)
void Nonblock2Block(int filedes)
virtual uint64_t GetSizePinned()
void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
virtual uint64_t GetCapacity()
void SetSize(const uint64_t new_size)
shash::Any RetrieveHash() const
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
sqlite3_stmt * stmt_touch_
string StringifyInt(const int64_t value)
std::string GetLogMicroSyslog()
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
sqlite3_stmt * stmt_list_volatile_
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
static int MainCacheManager(int argc, char **argv)
std::map< shash::Md5, int > back_channels_
static const unsigned kMaxDescription
bool CloseAllFildes(const std::set< int > &preserve_fildes)
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
static const unsigned kLockFileMagicNumber
unsigned char digest[shash::kMaxDigestSize]
std::string MakePathWithoutSuffix() const
sqlite3_stmt * stmt_list_catalogs_
bool ExecAsDaemon(const std::vector< std::string > &command_line, pid_t *child_pid)
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
uint64_t GetNoTicks(uint32_t retrospect_s) const
Any MkFromHexPtr(const HexPtr hex, const char suffix)
void SetCacheMgrPid(pid_t pid_)
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
std::map< shash::Any, uint64_t > pinned_chunks_
sqlite3_stmt * stmt_unpin_
void CloseReturnPipe(int pipe[2])
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
const unsigned kDigestSizes[]
void UnlockBackChannels()
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
static const uint64_t kVolatileFlag
void UnlockFile(const int filedes)
sqlite3_stmt * stmt_size_
int GetLogSyslogFacility()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)