CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quota_posix.cc
Go to the documentation of this file.
1 
15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
17 
18 #include "cvmfs_config.h"
19 #include "quota_posix.h"
20 
21 #include <dirent.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdint.h>
28 #include <sys/dir.h>
29 #include <sys/stat.h>
30 #ifndef __APPLE__
31 #include <sys/statfs.h>
32 #endif
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 #include <cassert>
39 #include <cstdio>
40 #include <cstdlib>
41 #include <cstring>
42 
43 #include <map>
44 #include <set>
45 #include <string>
46 #include <vector>
47 
48 #include "duplex_sqlite3.h"
49 #include "hash.h"
50 #include "logging.h"
51 #include "monitor.h"
52 #include "platform.h"
53 #include "smalloc.h"
54 #include "statistics.h"
55 #include "util/exception.h"
56 #include "util/pointer.h"
57 #include "util/posix.h"
58 #include "util/string.h"
59 #include "util_concurrency.h"
60 
61 using namespace std; // NOLINT
62 
63 
64 int PosixQuotaManager::BindReturnPipe(int pipe_wronly) {
65  if (!shared_)
66  return pipe_wronly;
67 
68  // Connect writer's end
69  int result =
70  open((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str(),
71  O_WRONLY | O_NONBLOCK);
72  if (result >= 0) {
73  Nonblock2Block(result);
74  } else {
76  "failed to bind return pipe (%d)", errno);
77  }
78  return result;
79 }
80 
81 
83  const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84  if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86  "high watermark of pinned files (%" PRIu64 "M > %" PRIu64 "M)",
87  pinned_/(1024*1024), watermark/(1024*1024));
88  BroadcastBackchannels("R"); // clients: please release pinned catalogs
89  }
90 }
91 
92 
94  DIR *dirp = opendir(workspace_dir_.c_str());
95  assert(dirp != NULL);
96 
97  platform_dirent64 *dent;
98  bool found_leftovers = false;
99  while ((dent = platform_readdir(dirp)) != NULL) {
100  const string name = dent->d_name;
101  const string path = workspace_dir_ + "/" + name;
102  platform_stat64 info;
103  int retval = platform_stat(path.c_str(), &info);
104  if (retval != 0)
105  continue;
106  if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) == "pipe")) {
107  if (!found_leftovers) {
109  "removing left-over FIFOs from cache directory");
110  }
111  found_leftovers = true;
112  unlink(path.c_str());
113  }
114  }
115  closedir(dirp);
116 }
117 
118 
125 bool PosixQuotaManager::Cleanup(const uint64_t leave_size) {
126  if (!spawned_)
127  return DoCleanup(leave_size);
128 
129  bool result;
130  int pipe_cleanup[2];
131  MakeReturnPipe(pipe_cleanup);
132 
133  LruCommand cmd;
134  cmd.command_type = kCleanup;
135  cmd.size = leave_size;
136  cmd.return_pipe = pipe_cleanup[1];
137 
138  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
139  ReadHalfPipe(pipe_cleanup[0], &result, sizeof(result));
140  CloseReturnPipe(pipe_cleanup);
141 
142  return result;
143 }
144 
145 
147  if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148  if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149  if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150  if (stmt_list_) sqlite3_finalize(stmt_list_);
151  if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152  if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153  if (stmt_size_) sqlite3_finalize(stmt_size_);
154  if (stmt_touch_) sqlite3_finalize(stmt_touch_);
155  if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
156  if (stmt_block_) sqlite3_finalize(stmt_block_);
157  if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
158  if (stmt_new_) sqlite3_finalize(stmt_new_);
159  if (database_) sqlite3_close(database_);
160  UnlockFile(fd_lock_cachedb_);
161 
162  stmt_list_catalogs_ = NULL;
163  stmt_list_pinned_ = NULL;
164  stmt_list_volatile_ = NULL;
165  stmt_list_ = NULL;
166  stmt_rm_ = NULL;
167  stmt_size_ = NULL;
168  stmt_touch_ = NULL;
169  stmt_unpin_ = NULL;
170  stmt_block_ = NULL;
171  stmt_unblock_ = NULL;
172  stmt_new_ = NULL;
173  database_ = NULL;
174 
175  pinned_chunks_.clear();
176 }
177 
178 
180  if (shared_) {
181  close(pipe[0]);
182  UnlinkReturnPipe(pipe[1]);
183  } else {
184  ClosePipe(pipe);
185  }
186 }
187 
188 
189 bool PosixQuotaManager::Contains(const string &hash_str) {
190  bool result = false;
191 
192  sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
193  SQLITE_STATIC);
194  if (sqlite3_step(stmt_size_) == SQLITE_ROW)
195  result = true;
196  sqlite3_reset(stmt_size_);
197  LogCvmfs(kLogQuota, kLogDebug, "contains %s returns %d",
198  hash_str.c_str(), result);
199 
200  return result;
201 }
202 
203 
205  if ((limit_ == 0) || (gauge_ >= limit_))
206  return;
207 
208  struct statvfs vfs_info;
209  int retval = statvfs((cache_dir_ + "/cachedb").c_str(), &vfs_info);
210  if (retval != 0) {
212  "failed to query %s for free space (%d)",
213  cache_dir_.c_str(), errno);
214  return;
215  }
216  int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
217  LogCvmfs(kLogQuota, kLogDebug, "free space: %" PRId64 " MB",
218  free_space_byte / (1024 * 1024));
219 
220  int64_t required_byte = limit_ - gauge_;
221  if (free_space_byte < required_byte) {
223  "too little free space on the file system hosting the cache,"
224  " %" PRId64 " MB available",
225  free_space_byte / (1024 * 1024));
226  }
227 }
228 
229 
231  const string &cache_workspace,
232  const uint64_t limit,
233  const uint64_t cleanup_threshold,
234  const bool rebuild_database)
235 {
236  if (cleanup_threshold >= limit) {
237  LogCvmfs(kLogQuota, kLogDebug, "invalid parameters: limit %" PRIu64 ", "
238  "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
239  return NULL;
240  }
241 
242  PosixQuotaManager *quota_manager =
243  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
244 
245  // Initialize cache catalog
246  if (!quota_manager->InitDatabase(rebuild_database)) {
247  delete quota_manager;
248  return NULL;
249  }
250  quota_manager->CheckFreeSpace();
251  MakePipe(quota_manager->pipe_lru_);
252 
253  quota_manager->protocol_revision_ = kProtocolRevision;
254  quota_manager->initialized_ = true;
255  return quota_manager;
256 }
257 
258 
263  const std::string &exe_path,
264  const std::string &cache_workspace,
265  const uint64_t limit,
266  const uint64_t cleanup_threshold,
267  bool foreground)
268 {
269  string cache_dir;
270  string workspace_dir;
271  ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
272 
273  // Create lock file: only one fuse client at a time
274  const int fd_lockfile = LockFile(workspace_dir + "/lock_cachemgr");
275  if (fd_lockfile < 0) {
276  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
277  (workspace_dir + "/lock_cachemgr").c_str(), errno);
278  return NULL;
279  }
280 
281  PosixQuotaManager *quota_mgr =
282  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
283  quota_mgr->shared_ = true;
284  quota_mgr->spawned_ = true;
285 
286  // Try to connect to pipe
287  const string fifo_path = workspace_dir + "/cachemgr";
288  LogCvmfs(kLogQuota, kLogDebug, "trying to connect to existing pipe");
289  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
290  if (quota_mgr->pipe_lru_[1] >= 0) {
291  LogCvmfs(kLogQuota, kLogDebug, "connected to existing cache manager pipe");
292  quota_mgr->initialized_ = true;
293  Nonblock2Block(quota_mgr->pipe_lru_[1]);
294  UnlockFile(fd_lockfile);
295  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
297  "received limit %" PRIu64 ", threshold %" PRIu64,
298  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
299  if (FileExists(workspace_dir + "/cachemgr.protocol")) {
300  quota_mgr->protocol_revision_ = quota_mgr->GetProtocolRevision();
301  LogCvmfs(kLogQuota, kLogDebug, "connected protocol revision %u",
302  quota_mgr->protocol_revision_);
303  } else {
304  LogCvmfs(kLogQuota, kLogDebug, "connected to ancient cache manager");
305  }
306  return quota_mgr;
307  }
308  const int connect_error = errno;
309 
310  // Lock file: let existing cache manager finish first
311  const int fd_lockfile_fifo = LockFile(workspace_dir + "/lock_cachemgr.fifo");
312  if (fd_lockfile_fifo < 0) {
313  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
314  (workspace_dir + "/lock_cachemgr.fifo").c_str(), errno);
315  UnlockFile(fd_lockfile);
316  delete quota_mgr;
317  return NULL;
318  }
319  UnlockFile(fd_lockfile_fifo);
320 
321  if (connect_error == ENXIO) {
322  LogCvmfs(kLogQuota, kLogDebug, "left-over FIFO found, unlinking");
323  unlink(fifo_path.c_str());
324  }
325 
326  // Creating a new FIFO for the cache manager (to be bound later)
327  int retval = mkfifo(fifo_path.c_str(), 0600);
328  if (retval != 0) {
329  LogCvmfs(kLogQuota, kLogDebug, "failed to create cache manager FIFO (%d)",
330  errno);
331  UnlockFile(fd_lockfile);
332  delete quota_mgr;
333  return NULL;
334  }
335 
336  // Create new cache manager
337  int pipe_boot[2];
338  int pipe_handshake[2];
339  MakePipe(pipe_boot);
340  MakePipe(pipe_handshake);
341 
342  vector<string> command_line;
343  command_line.push_back(exe_path);
344  command_line.push_back("__cachemgr__");
345  command_line.push_back(cache_workspace);
346  command_line.push_back(StringifyInt(pipe_boot[1]));
347  command_line.push_back(StringifyInt(pipe_handshake[0]));
348  command_line.push_back(StringifyInt(limit));
349  command_line.push_back(StringifyInt(cleanup_threshold));
350  command_line.push_back(StringifyInt(foreground));
351  command_line.push_back(StringifyInt(GetLogSyslogLevel()));
352  command_line.push_back(StringifyInt(GetLogSyslogFacility()));
353  command_line.push_back(GetLogDebugFile() + ":" + GetLogMicroSyslog());
354 
355  set<int> preserve_filedes;
356  preserve_filedes.insert(0);
357  preserve_filedes.insert(1);
358  preserve_filedes.insert(2);
359  preserve_filedes.insert(pipe_boot[1]);
360  preserve_filedes.insert(pipe_handshake[0]);
361 
362  retval = ManagedExec(command_line, preserve_filedes, map<int, int>(), false);
363  if (!retval) {
364  UnlockFile(fd_lockfile);
365  ClosePipe(pipe_boot);
366  ClosePipe(pipe_handshake);
367  delete quota_mgr;
368  LogCvmfs(kLogQuota, kLogDebug, "failed to start cache manager");
369  return NULL;
370  }
371 
372  // Wait for cache manager to be ready
373  close(pipe_boot[1]);
374  close(pipe_handshake[0]);
375  char buf;
376  if (read(pipe_boot[0], &buf, 1) != 1) {
377  UnlockFile(fd_lockfile);
378  close(pipe_boot[0]);
379  close(pipe_handshake[1]);
380  delete quota_mgr;
382  "cache manager did not start");
383  return NULL;
384  }
385  close(pipe_boot[0]);
386 
387  // Connect write end
388  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
389  if (quota_mgr->pipe_lru_[1] < 0) {
391  "failed to connect to newly created FIFO (%d)", errno);
392  close(pipe_handshake[1]);
393  UnlockFile(fd_lockfile);
394  delete quota_mgr;
395  return NULL;
396  }
397 
398  // Finalize handshake
399  buf = 'C';
400  if (write(pipe_handshake[1], &buf, 1) != 1) {
401  UnlockFile(fd_lockfile);
402  close(pipe_handshake[1]);
403  LogCvmfs(kLogQuota, kLogDebug, "could not finalize handshake");
404  delete quota_mgr;
405  return NULL;
406  }
407  close(pipe_handshake[1]);
408 
409  Nonblock2Block(quota_mgr->pipe_lru_[1]);
410  LogCvmfs(kLogQuota, kLogDebug, "connected to a new cache manager");
411  quota_mgr->protocol_revision_ = kProtocolRevision;
412 
413  UnlockFile(fd_lockfile);
414 
415  quota_mgr->initialized_ = true;
416  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
417  LogCvmfs(kLogQuota, kLogDebug, "received limit %" PRIu64 ", "
418  "threshold %" PRIu64,
419  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
420  return quota_mgr;
421 }
422 
423 
424 bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
425  if (gauge_ <= leave_size)
426  return true;
427 
428  // TODO(jblomer) transaction
430  "clean up cache until at most %lu KB is used", leave_size/1024);
431  LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_);
432  cleanup_recorder_.Tick();
433 
434  bool result;
435  string hash_str;
436  vector<string> trash;
437 
438  do {
439  sqlite3_reset(stmt_lru_);
440  if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
441  LogCvmfs(kLogQuota, kLogDebug, "could not get lru-entry");
442  break;
443  }
444 
445  hash_str = string(reinterpret_cast<const char *>(
446  sqlite3_column_text(stmt_lru_, 0)));
447  LogCvmfs(kLogQuota, kLogDebug, "removing %s", hash_str.c_str());
448  shash::Any hash = shash::MkFromHexPtr(shash::HexPtr(hash_str));
449 
450  // That's a critical condition. We must not delete a not yet inserted
451  // pinned file as it is already reserved (but will be inserted later).
452  // Instead, set the pin bit in the db to not run into an endless loop
453  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
454  trash.push_back(cache_dir_ + "/" + hash.MakePathWithoutSuffix());
455  gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
456  LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64,
457  hash_str.c_str(), gauge_);
458 
459  sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
460  SQLITE_STATIC);
461  result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
462  sqlite3_reset(stmt_rm_);
463 
464  if (!result) {
466  "failed to find %s in cache database (%d). "
467  "Cache database is out of sync. "
468  "Restart cvmfs with clean cache.", hash_str.c_str(), result);
469  return false;
470  }
471  } else {
472  sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
473  SQLITE_STATIC);
474  result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
475  sqlite3_reset(stmt_block_);
476  assert(result);
477  }
478  } while (gauge_ > leave_size);
479 
480  result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
481  sqlite3_reset(stmt_unblock_);
482  assert(result);
483 
484  // Double fork avoids zombie, forked removal process must not flush file
485  // buffers
486  if (!trash.empty()) {
487  if (async_delete_) {
488  pid_t pid;
489  int statloc;
490  if ((pid = fork()) == 0) {
491  // TODO(jblomer): eviciting files in the cache should perhaps become a
492  // thread. This would also allow to block the chunks and prevent the
493  // race with re-insertion. Then again, a thread can block umount.
494 #ifndef DEBUGMSG
495  int max_fd = sysconf(_SC_OPEN_MAX);
496  for (int i = 0; i < max_fd; ++i)
497  close(i);
498 #endif
499  if (fork() == 0) {
500  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
501  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
502  unlink(trash[i].c_str());
503  }
504  _exit(0);
505  }
506  _exit(0);
507  } else {
508  if (pid > 0)
509  waitpid(pid, &statloc, 0);
510  else
511  return false;
512  }
513  } else { // !async_delete_
514  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
515  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
516  unlink(trash[i].c_str());
517  }
518  }
519  }
520 
521  if (gauge_ > leave_size) {
523  "request to clean until %" PRIu64 ", "
524  "but effective gauge is %" PRIu64, leave_size, gauge_);
525  return false;
526  }
527  return true;
528 }
529 
530 
532  const shash::Any &hash,
533  const uint64_t size,
534  const string &description,
535  const CommandType command_type)
536 {
537  const string hash_str = hash.ToString();
538  LogCvmfs(kLogQuota, kLogDebug, "insert into lru %s, path %s, method %d",
539  hash_str.c_str(), description.c_str(), command_type);
540  const unsigned desc_length = (description.length() > kMaxDescription) ?
541  kMaxDescription : description.length();
542 
543  LruCommand *cmd =
544  reinterpret_cast<LruCommand *>(alloca(sizeof(LruCommand) + desc_length));
545  new (cmd) LruCommand;
546  cmd->command_type = command_type;
547  cmd->SetSize(size);
548  cmd->StoreHash(hash);
549  cmd->desc_length = desc_length;
550  memcpy(reinterpret_cast<char *>(cmd)+sizeof(LruCommand),
551  &description[0], desc_length);
552  WritePipe(pipe_lru_[1], cmd, sizeof(LruCommand) + desc_length);
553 }
554 
555 
556 vector<string> PosixQuotaManager::DoList(const CommandType list_command) {
557  vector<string> result;
558 
559  int pipe_list[2];
560  MakeReturnPipe(pipe_list);
561  char description_buffer[kMaxDescription];
562 
563  LruCommand cmd;
564  cmd.command_type = list_command;
565  cmd.return_pipe = pipe_list[1];
566  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
567 
568  int length;
569  do {
570  ReadHalfPipe(pipe_list[0], &length, sizeof(length));
571  if (length > 0) {
572  ReadPipe(pipe_list[0], description_buffer, length);
573  result.push_back(string(description_buffer, length));
574  }
575  } while (length >= 0);
576 
577  CloseReturnPipe(pipe_list);
578  return result;
579 }
580 
581 
583  if (limit_ != (uint64_t)(-1))
584  return limit_;
585 
586  // Unrestricted cache, look at free space on cache dir fs
587  struct statfs info;
588  if (statfs(".", &info) == 0) {
589  return info.f_bavail * info.f_bsize;
590  } else {
592  "failed to query file system info of cache (%d)", errno);
593  return limit_;
594  }
595 }
596 
597 
598 void PosixQuotaManager::GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
599 {
600  int pipe_limits[2];
601  MakeReturnPipe(pipe_limits);
602 
603  LruCommand cmd;
604  cmd.command_type = kLimits;
605  cmd.return_pipe = pipe_limits[1];
606  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
607  ReadHalfPipe(pipe_limits[0], limit, sizeof(*limit));
608  ReadPipe(pipe_limits[0], cleanup_threshold, sizeof(*cleanup_threshold));
609  CloseReturnPipe(pipe_limits);
610 }
611 
612 
618  return limit_ - cleanup_threshold_;
619 }
620 
621 
623  if (!shared_ || !spawned_) {
624  return getpid();
625  }
626 
627  pid_t result;
628  int pipe_pid[2];
629  MakeReturnPipe(pipe_pid);
630 
631  LruCommand cmd;
632  cmd.command_type = kPid;
633  cmd.return_pipe = pipe_pid[1];
634  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
635  ReadHalfPipe(pipe_pid[0], &result, sizeof(result));
636  CloseReturnPipe(pipe_pid);
637  return result;
638 }
639 
640 
642  int pipe_revision[2];
643  MakeReturnPipe(pipe_revision);
644 
645  LruCommand cmd;
646  cmd.command_type = kGetProtocolRevision;
647  cmd.return_pipe = pipe_revision[1];
648  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
649 
650  uint32_t revision;
651  ReadHalfPipe(pipe_revision[0], &revision, sizeof(revision));
652  CloseReturnPipe(pipe_revision);
653  return revision;
654 }
655 
656 
660 void PosixQuotaManager::GetSharedStatus(uint64_t *gauge, uint64_t *pinned) {
661  int pipe_status[2];
662  MakeReturnPipe(pipe_status);
663 
664  LruCommand cmd;
665  cmd.command_type = kStatus;
666  cmd.return_pipe = pipe_status[1];
667  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
668  ReadHalfPipe(pipe_status[0], gauge, sizeof(*gauge));
669  ReadPipe(pipe_status[0], pinned, sizeof(*pinned));
670  CloseReturnPipe(pipe_status);
671 }
672 
673 
675  if (!spawned_) return gauge_;
676  uint64_t gauge, size_pinned;
677  GetSharedStatus(&gauge, &size_pinned);
678  return gauge;
679 }
680 
681 
683  if (!spawned_) return pinned_;
684  uint64_t gauge, size_pinned;
685  GetSharedStatus(&gauge, &size_pinned);
686  return size_pinned;
687 }
688 
689 
690 uint64_t PosixQuotaManager::GetCleanupRate(uint64_t period_s) {
691  if (!spawned_ || (protocol_revision_ < 2)) return 0;
692  uint64_t cleanup_rate;
693 
694  int pipe_cleanup_rate[2];
695  MakeReturnPipe(pipe_cleanup_rate);
696  LruCommand cmd;
697  cmd.command_type = kCleanupRate;
698  cmd.size = period_s;
699  cmd.return_pipe = pipe_cleanup_rate[1];
700  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
701  ReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate, sizeof(cleanup_rate));
702  CloseReturnPipe(pipe_cleanup_rate);
703 
704  return cleanup_rate;
705 }
706 
707 
708 bool PosixQuotaManager::InitDatabase(const bool rebuild_database) {
709  string sql;
710  sqlite3_stmt *stmt;
711 
712  fd_lock_cachedb_ = LockFile(workspace_dir_ + "/lock_cachedb");
713  if (fd_lock_cachedb_ < 0) {
714  LogCvmfs(kLogQuota, kLogDebug, "failed to create cachedb lock");
715  return false;
716  }
717 
718  bool retry = false;
719  const string db_file = cache_dir_ + "/cachedb";
720  if (rebuild_database) {
721  LogCvmfs(kLogQuota, kLogDebug, "rebuild database, unlinking existing (%s)",
722  db_file.c_str());
723  unlink(db_file.c_str());
724  unlink((db_file + "-journal").c_str());
725  }
726 
727  init_recover:
728  int err = sqlite3_open(db_file.c_str(), &database_);
729  if (err != SQLITE_OK) {
730  LogCvmfs(kLogQuota, kLogDebug, "could not open cache database (%d)", err);
731  goto init_database_fail;
732  }
733  // TODO(reneme): make this a `QuotaDatabase : public sqlite::Database`
734  sql = "PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
735  "PRAGMA auto_vacuum=1; "
736  "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
737  " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
738  "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
739  "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
740  " ON cache_catalog (acseq); "
741  "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
742  "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
743  "CREATE INDEX idx_fscache_actime ON fscache (actime); "
744  "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
745  " CONSTRAINT pk_properties PRIMARY KEY(key));";
746  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
747  if (err != SQLITE_OK) {
748  if (!retry) {
749  retry = true;
750  sqlite3_close(database_);
751  unlink(db_file.c_str());
752  unlink((db_file + "-journal").c_str());
754  "LRU database corrupted, re-building");
755  goto init_recover;
756  }
757  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
758  sql.c_str());
759  goto init_database_fail;
760  }
761 
762  // If this an old cache catalog,
763  // add and initialize new columns to cache_catalog
764  sql = "ALTER TABLE cache_catalog ADD type INTEGER; "
765  "ALTER TABLE cache_catalog ADD pinned INTEGER";
766  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
767  if (err == SQLITE_OK) {
768  sql = "UPDATE cache_catalog SET type=" + StringifyInt(kFileRegular) + ";";
769  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
770  if (err != SQLITE_OK) {
772  "could not init cache database (failed: %s)", sql.c_str());
773  goto init_database_fail;
774  }
775  }
776 
777  // Set pinned back
778  sql = "UPDATE cache_catalog SET pinned=0;";
779  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
780  if (err != SQLITE_OK) {
781  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
782  sql.c_str());
783  goto init_database_fail;
784  }
785 
786  // Set schema version
787  sql = "INSERT OR REPLACE INTO properties (key, value) "
788  "VALUES ('schema', '1.0')";
789  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
790  if (err != SQLITE_OK) {
791  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
792  sql.c_str());
793  goto init_database_fail;
794  }
795 
796  // If cache catalog is empty, recreate from file system
797  sql = "SELECT count(*) FROM cache_catalog;";
798  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
799  if (sqlite3_step(stmt) == SQLITE_ROW) {
800  if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
802  "CernVM-FS: building lru cache database...");
803  if (!RebuildDatabase()) {
805  "could not build cache database from file system");
806  sqlite3_finalize(stmt);
807  goto init_database_fail;
808  }
809  }
810  sqlite3_finalize(stmt);
811  } else {
812  LogCvmfs(kLogQuota, kLogDebug, "could not select on cache catalog");
813  sqlite3_finalize(stmt);
814  goto init_database_fail;
815  }
816 
817  // How many bytes do we already have in cache?
818  sql = "SELECT sum(size) FROM cache_catalog;";
819  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
820  if (sqlite3_step(stmt) == SQLITE_ROW) {
821  gauge_ = sqlite3_column_int64(stmt, 0);
822  } else {
823  LogCvmfs(kLogQuota, kLogDebug, "could not determine cache size");
824  sqlite3_finalize(stmt);
825  goto init_database_fail;
826  }
827  sqlite3_finalize(stmt);
828 
829  // Highest seq-no?
830  sql = "SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
831  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
832  if (sqlite3_step(stmt) == SQLITE_ROW) {
833  seq_ = sqlite3_column_int64(stmt, 0)+1;
834  } else {
835  LogCvmfs(kLogQuota, kLogDebug, "could not determine highest seq-no");
836  sqlite3_finalize(stmt);
837  goto init_database_fail;
838  }
839  sqlite3_finalize(stmt);
840 
841  // Prepare touch, new, remove statements
842  sqlite3_prepare_v2(database_,
843  "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
844  "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
845  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=0 "
846  "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
847  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=2 "
848  "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
849  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=1 "
850  "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
851  sqlite3_prepare_v2(database_,
852  "INSERT OR REPLACE INTO cache_catalog "
853  "(sha1, size, acseq, path, type, pinned) "
854  "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
855  -1, &stmt_new_, NULL);
856  sqlite3_prepare_v2(database_,
857  "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
858  -1, &stmt_size_, NULL);
859  sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;",
860  -1, &stmt_rm_, NULL);
861  sqlite3_prepare_v2(database_,
862  "SELECT sha1, size FROM cache_catalog WHERE "
863  "acseq=(SELECT min(acseq) "
864  "FROM cache_catalog WHERE pinned<>2);",
865  -1, &stmt_lru_, NULL);
866  sqlite3_prepare_v2(database_,
867  ("SELECT path FROM cache_catalog WHERE type=" +
868  StringifyInt(kFileRegular) +
869  ";").c_str(), -1, &stmt_list_, NULL);
870  sqlite3_prepare_v2(database_,
871  "SELECT path FROM cache_catalog WHERE pinned<>0;",
872  -1, &stmt_list_pinned_, NULL);
873  sqlite3_prepare_v2(database_,
874  "SELECT path FROM cache_catalog WHERE acseq < 0;",
875  -1, &stmt_list_volatile_, NULL);
876  sqlite3_prepare_v2(database_,
877  ("SELECT path FROM cache_catalog WHERE type=" +
878  StringifyInt(kFileCatalog) +
879  ";").c_str(), -1, &stmt_list_catalogs_, NULL);
880  return true;
881 
882  init_database_fail:
883  sqlite3_close(database_);
884  database_ = NULL;
885  UnlockFile(fd_lock_cachedb_);
886  return false;
887 }
888 
889 
895  const shash::Any &any_hash,
896  const uint64_t size,
897  const string &description)
898 {
899  DoInsert(any_hash, size, description, kInsert);
900 }
901 
902 
909  const shash::Any &any_hash,
910  const uint64_t size,
911  const string &description)
912 {
913  DoInsert(any_hash, size, description, kInsertVolatile);
914 }
915 
916 
920 vector<string> PosixQuotaManager::List() {
921  return DoList(kList);
922 }
923 
924 
929  return DoList(kListPinned);
930 }
931 
932 
937  return DoList(kListCatalogs);
938 }
939 
940 
945  return DoList(kListVolatile);
946 }
947 
948 
952 int PosixQuotaManager::MainCacheManager(int argc, char **argv) {
953  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
954  int retval;
955 
956  UniquePtr<Watchdog> watchdog(Watchdog::Create("./stacktrace.cachemgr"));
957  assert(watchdog.IsValid());
958  watchdog->Spawn();
959 
960  PosixQuotaManager shared_manager(0, 0, "");
961  shared_manager.shared_ = true;
962  shared_manager.spawned_ = true;
963  shared_manager.pinned_ = 0;
964 
965  // Process command line arguments
966  ParseDirectories(string(argv[2]),
967  &shared_manager.cache_dir_,
968  &shared_manager.workspace_dir_);
969  int pipe_boot = String2Int64(argv[3]);
970  int pipe_handshake = String2Int64(argv[4]);
971  shared_manager.limit_ = String2Int64(argv[5]);
972  shared_manager.cleanup_threshold_ = String2Int64(argv[6]);
973  int foreground = String2Int64(argv[7]);
974  int syslog_level = String2Int64(argv[8]);
975  int syslog_facility = String2Int64(argv[9]);
976  vector<string> logfiles = SplitString(argv[10], ':');
977 
978  SetLogSyslogLevel(syslog_level);
979  SetLogSyslogFacility(syslog_facility);
980  if ((logfiles.size() > 0) && (logfiles[0] != ""))
981  SetLogDebugFile(logfiles[0] + ".cachemgr");
982  if (logfiles.size() > 1)
983  SetLogMicroSyslog(logfiles[1]);
984 
985  if (!foreground)
986  Daemonize();
987 
988  // Initialize pipe, open non-blocking as cvmfs is not yet connected
989  const int fd_lockfile_fifo =
990  LockFile(shared_manager.workspace_dir_ + "/lock_cachemgr.fifo");
991  if (fd_lockfile_fifo < 0) {
992  LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, "could not open lock file "
993  "%s (%d)",
994  (shared_manager.workspace_dir_ + "/lock_cachemgr.fifo").c_str(),
995  errno);
996  return 1;
997  }
998  const string crash_guard = shared_manager.cache_dir_ + "/cachemgr.running";
999  const bool rebuild = FileExists(crash_guard);
1000  retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1001  if (retval < 0) {
1003  "failed to create shared cache manager crash guard");
1004  UnlockFile(fd_lockfile_fifo);
1005  return 1;
1006  }
1007  close(retval);
1008 
1009  // Redirect SQlite temp directory to cache (global variable)
1010  const string tmp_dir = shared_manager.workspace_dir_;
1011  sqlite3_temp_directory =
1012  static_cast<char *>(sqlite3_malloc(tmp_dir.length() + 1));
1013  snprintf(sqlite3_temp_directory, tmp_dir.length() + 1, "%s", tmp_dir.c_str());
1014 
1015  // Cleanup leftover named pipes
1016  shared_manager.CleanupPipes();
1017 
1018  if (!shared_manager.InitDatabase(rebuild)) {
1019  UnlockFile(fd_lockfile_fifo);
1020  return 1;
1021  }
1022  shared_manager.CheckFreeSpace();
1023 
1024  // Save protocol revision to file. If the file is not found, it indicates
1025  // to the client that the cache manager is from times before the protocol
1026  // was versioned.
1027  const string protocol_revision_path =
1028  shared_manager.workspace_dir_ + "/cachemgr.protocol";
1029  retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1030  if (retval < 0) {
1032  "failed to open protocol revision file (%d)", errno);
1033  UnlockFile(fd_lockfile_fifo);
1034  return 1;
1035  }
1036  const string revision = StringifyInt(kProtocolRevision);
1037  int written = write(retval, revision.data(), revision.length());
1038  close(retval);
1039  if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1041  "failed to write protocol revision (%d)", errno);
1042  UnlockFile(fd_lockfile_fifo);
1043  return 1;
1044  }
1045 
1046  const string fifo_path = shared_manager.workspace_dir_ + "/cachemgr";
1047  shared_manager.pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1048  if (shared_manager.pipe_lru_[0] < 0) {
1049  LogCvmfs(kLogQuota, kLogDebug, "failed to listen on FIFO %s (%d)",
1050  fifo_path.c_str(), errno);
1051  UnlockFile(fd_lockfile_fifo);
1052  return 1;
1053  }
1054  Nonblock2Block(shared_manager.pipe_lru_[0]);
1055  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager listening");
1056 
1057  char buf = 'C';
1058  WritePipe(pipe_boot, &buf, 1);
1059  close(pipe_boot);
1060 
1061  ReadPipe(pipe_handshake, &buf, 1);
1062  close(pipe_handshake);
1063  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager handshake done");
1064 
1065  // Ensure that broken pipes from clients do not kill the cache manager
1066  signal(SIGPIPE, SIG_IGN);
1067  // Don't let Ctrl-C ungracefully kill interactive session
1068  signal(SIGINT, SIG_IGN);
1069 
1070  shared_manager.MainCommandServer(&shared_manager);
1071  unlink(fifo_path.c_str());
1072  unlink(protocol_revision_path.c_str());
1073  shared_manager.CloseDatabase();
1074  unlink(crash_guard.c_str());
1075  UnlockFile(fd_lockfile_fifo);
1076 
1077  if (sqlite3_temp_directory) {
1078  sqlite3_free(sqlite3_temp_directory);
1079  sqlite3_temp_directory = NULL;
1080  }
1081 
1082  return 0;
1083 }
1084 
1085 
1087  PosixQuotaManager *quota_mgr = static_cast<PosixQuotaManager *>(data);
1088 
1089  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1090  sqlite3_soft_heap_limit(quota_mgr->kSqliteMemPerThread);
1091 
1092  LruCommand command_buffer[kCommandBufferSize];
1093  char description_buffer[kCommandBufferSize*kMaxDescription];
1094  unsigned num_commands = 0;
1095 
1096  while (read(quota_mgr->pipe_lru_[0], &command_buffer[num_commands],
1097  sizeof(command_buffer[0])) == sizeof(command_buffer[0]))
1098  {
1099  const CommandType command_type = command_buffer[num_commands].command_type;
1100  LogCvmfs(kLogQuota, kLogDebug, "received command %d", command_type);
1101  const uint64_t size = command_buffer[num_commands].GetSize();
1102 
1103  // Inserts and pins come with a description (usually a path)
1104  if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1105  (command_type == kPin) || (command_type == kPinRegular))
1106  {
1107  const int desc_length = command_buffer[num_commands].desc_length;
1108  ReadPipe(quota_mgr->pipe_lru_[0],
1109  &description_buffer[kMaxDescription*num_commands], desc_length);
1110  }
1111 
1112  // The protocol revision is returned immediately
1113  if (command_type == kGetProtocolRevision) {
1114  int return_pipe =
1115  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1116  if (return_pipe < 0)
1117  continue;
1118  WritePipe(return_pipe, &quota_mgr->kProtocolRevision,
1119  sizeof(quota_mgr->kProtocolRevision));
1120  quota_mgr->UnbindReturnPipe(return_pipe);
1121  continue;
1122  }
1123 
1124  // The cleanup rate is returned immediately
1125  if (command_type == kCleanupRate) {
1126  int return_pipe =
1127  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1128  if (return_pipe < 0)
1129  continue;
1130  uint64_t period_s = size; // use the size field to transmit the period
1131  uint64_t rate = quota_mgr->cleanup_recorder_.GetNoTicks(period_s);
1132  WritePipe(return_pipe, &rate, sizeof(rate));
1133  quota_mgr->UnbindReturnPipe(return_pipe);
1134  continue;
1135  }
1136 
1137  // Reservations are handled immediately and "out of band"
1138  if (command_type == kReserve) {
1139  bool success = true;
1140  int return_pipe =
1141  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1142  if (return_pipe < 0)
1143  continue;
1144 
1145  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1146  const string hash_str(hash.ToString());
1147  LogCvmfs(kLogQuota, kLogDebug, "reserve %d bytes for %s",
1148  size, hash_str.c_str());
1149 
1150  if (quota_mgr->pinned_chunks_.find(hash) ==
1151  quota_mgr->pinned_chunks_.end())
1152  {
1153  if ((quota_mgr->pinned_ + size) > quota_mgr->cleanup_threshold_) {
1155  "failed to insert %s (pinned), no space", hash_str.c_str());
1156  success = false;
1157  } else {
1158  quota_mgr->pinned_chunks_[hash] = size;
1159  quota_mgr->pinned_ += size;
1160  quota_mgr->CheckHighPinWatermark();
1161  }
1162  }
1163 
1164  WritePipe(return_pipe, &success, sizeof(success));
1165  quota_mgr->UnbindReturnPipe(return_pipe);
1166  continue;
1167  }
1168 
1169  // Back channels are also handled out of band
1170  if (command_type == kRegisterBackChannel) {
1171  int return_pipe =
1172  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1173  if (return_pipe < 0)
1174  continue;
1175 
1176  quota_mgr->UnlinkReturnPipe(command_buffer[num_commands].return_pipe);
1177  Block2Nonblock(return_pipe); // back channels are opportunistic
1178  shash::Md5 hash;
1179  memcpy(hash.digest, command_buffer[num_commands].digest,
1181 
1182  quota_mgr->LockBackChannels();
1183  map<shash::Md5, int>::const_iterator iter =
1184  quota_mgr->back_channels_.find(hash);
1185  if (iter != quota_mgr->back_channels_.end()) {
1187  "closing left-over back channel %s", hash.ToString().c_str());
1188  close(iter->second);
1189  }
1190  quota_mgr->back_channels_[hash] = return_pipe;
1191  quota_mgr->UnlockBackChannels();
1192 
1193  char success = 'S';
1194  WritePipe(return_pipe, &success, sizeof(success));
1195  LogCvmfs(kLogQuota, kLogDebug, "register back channel %s on fd %d",
1196  hash.ToString().c_str(), return_pipe);
1197 
1198  continue;
1199  }
1200 
1201  if (command_type == kUnregisterBackChannel) {
1202  shash::Md5 hash;
1203  memcpy(hash.digest, command_buffer[num_commands].digest,
1205 
1206  quota_mgr->LockBackChannels();
1207  map<shash::Md5, int>::iterator iter =
1208  quota_mgr->back_channels_.find(hash);
1209  if (iter != quota_mgr->back_channels_.end()) {
1211  "closing back channel %s", hash.ToString().c_str());
1212  close(iter->second);
1213  quota_mgr->back_channels_.erase(iter);
1214  } else {
1216  "did not find back channel %s", hash.ToString().c_str());
1217  }
1218  quota_mgr->UnlockBackChannels();
1219 
1220  continue;
1221  }
1222 
1223  // Unpinnings are also handled immediately with respect to the pinned gauge
1224  if (command_type == kUnpin) {
1225  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1226  const string hash_str(hash.ToString());
1227 
1228  map<shash::Any, uint64_t>::iterator iter =
1229  quota_mgr->pinned_chunks_.find(hash);
1230  if (iter != quota_mgr->pinned_chunks_.end()) {
1231  quota_mgr->pinned_ -= iter->second;
1232  quota_mgr->pinned_chunks_.erase(iter);
1233  // It can happen that files get pinned that were removed from the cache
1234  // (see cache.cc). We fix this at this point, where we remove such
1235  // entries from the cache database.
1236  if (!FileExists(quota_mgr->cache_dir_ + "/" +
1237  hash.MakePathWithoutSuffix()))
1238  {
1240  "remove orphaned pinned hash %s from cache database",
1241  hash_str.c_str());
1242  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1243  hash_str.length(), SQLITE_STATIC);
1244  int retval;
1245  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1246  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1247  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1248  hash_str.length(), SQLITE_STATIC);
1249  retval = sqlite3_step(quota_mgr->stmt_rm_);
1250  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1251  quota_mgr->gauge_ -= size;
1252  } else {
1254  "failed to delete %s (%d)", hash_str.c_str(), retval);
1255  }
1256  sqlite3_reset(quota_mgr->stmt_rm_);
1257  }
1258  sqlite3_reset(quota_mgr->stmt_size_);
1259  }
1260  } else {
1261  LogCvmfs(kLogQuota, kLogDebug, "this chunk was not pinned");
1262  }
1263  }
1264 
1265  // Immediate commands trigger flushing of the buffer
1266  bool immediate_command = (command_type == kCleanup) ||
1267  (command_type == kList) || (command_type == kListPinned) ||
1268  (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1269  (command_type == kRemove) || (command_type == kStatus) ||
1270  (command_type == kLimits) || (command_type == kPid);
1271  if (!immediate_command) num_commands++;
1272 
1273  if ((num_commands == kCommandBufferSize) || immediate_command)
1274  {
1275  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1276  description_buffer);
1277  if (!immediate_command) num_commands = 0;
1278  }
1279 
1280  if (immediate_command) {
1281  // Process cleanup, listings
1282  int return_pipe =
1283  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1284  if (return_pipe < 0) {
1285  num_commands = 0;
1286  continue;
1287  }
1288 
1289  int retval;
1290  sqlite3_stmt *this_stmt_list = NULL;
1291  switch (command_type) {
1292  case kRemove: {
1293  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1294  const string hash_str = hash.ToString();
1295  LogCvmfs(kLogQuota, kLogDebug, "manually removing %s",
1296  hash_str.c_str());
1297  bool success = false;
1298 
1299  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1300  hash_str.length(), SQLITE_STATIC);
1301  int retval;
1302  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1303  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1304  uint64_t is_pinned = sqlite3_column_int64(quota_mgr->stmt_size_, 1);
1305 
1306  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1307  hash_str.length(), SQLITE_STATIC);
1308  retval = sqlite3_step(quota_mgr->stmt_rm_);
1309  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1310  success = true;
1311  quota_mgr->gauge_ -= size;
1312  if (is_pinned) {
1313  quota_mgr->pinned_chunks_.erase(hash);
1314  quota_mgr->pinned_ -= size;
1315  }
1316  } else {
1318  "failed to delete %s (%d)", hash_str.c_str(), retval);
1319  }
1320  sqlite3_reset(quota_mgr->stmt_rm_);
1321  } else {
1322  // File does not exist
1323  success = true;
1324  }
1325  sqlite3_reset(quota_mgr->stmt_size_);
1326 
1327  WritePipe(return_pipe, &success, sizeof(success));
1328  break; }
1329  case kCleanup:
1330  retval = quota_mgr->DoCleanup(size);
1331  WritePipe(return_pipe, &retval, sizeof(retval));
1332  break;
1333  case kList:
1334  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_;
1335  case kListPinned:
1336  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_pinned_;
1337  case kListCatalogs:
1338  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_catalogs_;
1339  case kListVolatile:
1340  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_volatile_;
1341 
1342  // Pipe back the list, one by one
1343  int length;
1344  while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1345  string path = "(NULL)";
1346  if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1347  path = string(
1348  reinterpret_cast<const char *>(
1349  sqlite3_column_text(this_stmt_list, 0)));
1350  }
1351  length = path.length();
1352  WritePipe(return_pipe, &length, sizeof(length));
1353  if (length > 0)
1354  WritePipe(return_pipe, &path[0], length);
1355  }
1356  length = -1;
1357  WritePipe(return_pipe, &length, sizeof(length));
1358  sqlite3_reset(this_stmt_list);
1359  break;
1360  case kStatus:
1361  WritePipe(return_pipe, &quota_mgr->gauge_, sizeof(quota_mgr->gauge_));
1362  WritePipe(return_pipe, &quota_mgr->pinned_,
1363  sizeof(quota_mgr->pinned_));
1364  break;
1365  case kLimits:
1366  WritePipe(return_pipe, &quota_mgr->limit_, sizeof(quota_mgr->limit_));
1367  WritePipe(return_pipe, &quota_mgr->cleanup_threshold_,
1368  sizeof(quota_mgr->cleanup_threshold_));
1369  break;
1370  case kPid: {
1371  pid_t pid = getpid();
1372  WritePipe(return_pipe, &pid, sizeof(pid));
1373  break;
1374  }
1375  default:
1376  PANIC(NULL); // other types are handled by the bunch processor
1377  }
1378  quota_mgr->UnbindReturnPipe(return_pipe);
1379  num_commands = 0;
1380  }
1381  }
1382 
1383  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager (%d)", errno);
1384  close(quota_mgr->pipe_lru_[0]);
1385  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1386  description_buffer);
1387 
1388  // Unpin
1389  command_buffer[0].command_type = kTouch;
1390  for (map<shash::Any, uint64_t>::const_iterator i =
1391  quota_mgr->pinned_chunks_.begin(),
1392  iEnd = quota_mgr->pinned_chunks_.end(); i != iEnd; ++i)
1393  {
1394  command_buffer[0].StoreHash(i->first);
1395  quota_mgr->ProcessCommandBunch(1, command_buffer, description_buffer);
1396  }
1397 
1398  return NULL;
1399 }
1400 
1401 
1403  if (!shared_) {
1404  MakePipe(pipe);
1405  return;
1406  }
1407 
1408  // Create FIFO in cache directory, store path name (number) in pipe write end
1409  int i = 0;
1410  int retval;
1411  do {
1412  retval = mkfifo((workspace_dir_ + "/pipe" + StringifyInt(i)).c_str(), 0600);
1413  pipe[1] = i;
1414  i++;
1415  } while ((retval == -1) && (errno == EEXIST));
1416  assert(retval == 0);
1417 
1418  // Connect reader's end
1419  pipe[0] = open((workspace_dir_ + "/pipe" + StringifyInt(pipe[1])).c_str(),
1420  O_RDONLY | O_NONBLOCK);
1421  assert(pipe[0] >= 0);
1422  Nonblock2Block(pipe[0]);
1423 }
1424 
1425 
1427  const std::string cache_workspace,
1428  std::string *cache_dir,
1429  std::string *workspace_dir)
1430 {
1431  vector<string> dir_tokens(SplitString(cache_workspace, ':'));
1432  switch (dir_tokens.size()) {
1433  case 1:
1434  *cache_dir = *workspace_dir = dir_tokens[0];
1435  break;
1436  case 2:
1437  *cache_dir = dir_tokens[0];
1438  *workspace_dir = dir_tokens[1];
1439  break;
1440  default:
1441  PANIC(NULL);
1442  }
1443 }
1444 
1445 
1452  const shash::Any &hash,
1453  const uint64_t size,
1454  const string &description,
1455  const bool is_catalog)
1456 {
1457  assert((size > 0) || !is_catalog);
1458 
1459  const string hash_str = hash.ToString();
1460  LogCvmfs(kLogQuota, kLogDebug, "pin into lru %s, path %s",
1461  hash_str.c_str(), description.c_str());
1462 
1463  // Has to run when not yet spawned (cvmfs initialization)
1464  if (!spawned_) {
1465  // Code duplication here
1466  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1467  if (pinned_ + size > cleanup_threshold_) {
1468  LogCvmfs(kLogQuota, kLogDebug, "failed to insert %s (pinned), no space",
1469  hash_str.c_str());
1470  return false;
1471  } else {
1472  pinned_chunks_[hash] = size;
1473  pinned_ += size;
1474  CheckHighPinWatermark();
1475  }
1476  }
1477  bool exists = Contains(hash_str);
1478  if (!exists && (gauge_ + size > limit_)) {
1479  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1480  gauge_, size);
1481  int retval = DoCleanup(cleanup_threshold_);
1482  assert(retval != 0);
1483  }
1484  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1485  SQLITE_STATIC);
1486  sqlite3_bind_int64(stmt_new_, 2, size);
1487  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1488  sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1489  SQLITE_STATIC);
1490  sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1491  sqlite3_bind_int64(stmt_new_, 6, 1);
1492  int retval = sqlite3_step(stmt_new_);
1493  assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1494  sqlite3_reset(stmt_new_);
1495  if (!exists) gauge_ += size;
1496  return true;
1497  }
1498 
1499  int pipe_reserve[2];
1500  MakeReturnPipe(pipe_reserve);
1501 
1502  LruCommand cmd;
1503  cmd.command_type = kReserve;
1504  cmd.SetSize(size);
1505  cmd.StoreHash(hash);
1506  cmd.return_pipe = pipe_reserve[1];
1507  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1508  bool result;
1509  ReadHalfPipe(pipe_reserve[0], &result, sizeof(result));
1510  CloseReturnPipe(pipe_reserve);
1511 
1512  if (!result) return false;
1513  DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1514 
1515  return true;
1516 }
1517 
1518 
1520  const uint64_t limit,
1521  const uint64_t cleanup_threshold,
1522  const string &cache_workspace)
1523  : shared_(false)
1524  , spawned_(false)
1525  , limit_(limit)
1526  , cleanup_threshold_(cleanup_threshold)
1527  , gauge_(0)
1528  , pinned_(0)
1529  , seq_(0)
1530  , cache_dir_() // initialized in body
1531  , workspace_dir_() // initialized in body
1532  , fd_lock_cachedb_(-1)
1533  , async_delete_(true)
1534  , database_(NULL)
1535  , stmt_touch_(NULL)
1536  , stmt_unpin_(NULL)
1537  , stmt_block_(NULL)
1538  , stmt_unblock_(NULL)
1539  , stmt_new_(NULL)
1540  , stmt_lru_(NULL)
1541  , stmt_size_(NULL)
1542  , stmt_rm_(NULL)
1543  , stmt_list_(NULL)
1544  , stmt_list_pinned_(NULL)
1545  , stmt_list_catalogs_(NULL)
1546  , stmt_list_volatile_(NULL)
1547  , initialized_(false)
1548 {
1549  ParseDirectories(cache_workspace, &cache_dir_, &workspace_dir_);
1550  pipe_lru_[0] = pipe_lru_[1] = -1;
1551  cleanup_recorder_.AddRecorder(1, 90); // last 1.5 min with second resolution
1552  // last 1.5 h with minute resolution
1553  cleanup_recorder_.AddRecorder(60, 90*60);
1554  // last 18 hours with 20 min resolution
1555  cleanup_recorder_.AddRecorder(20*60, 60*60*18);
1556  // last 4 days with hour resolution
1557  cleanup_recorder_.AddRecorder(60*60, 60*60*24*4);
1558 }
1559 
1560 
1562  if (!initialized_) return;
1563 
1564  if (shared_) {
1565  // Most of cleanup is done elsewhen by shared cache manager
1566  close(pipe_lru_[1]);
1567  return;
1568  }
1569 
1570  if (spawned_) {
1571  char fin = 0;
1572  WritePipe(pipe_lru_[1], &fin, 1);
1573  close(pipe_lru_[1]);
1574  pthread_join(thread_lru_, NULL);
1575  } else {
1577  }
1578 
1579  CloseDatabase();
1580 }
1581 
1582 
1584  const unsigned num,
1585  const LruCommand *commands,
1586  const char *descriptions)
1587 {
1588  int retval = sqlite3_exec(database_, "BEGIN", NULL, NULL, NULL);
1589  assert(retval == SQLITE_OK);
1590 
1591  for (unsigned i = 0; i < num; ++i) {
1592  const shash::Any hash = commands[i].RetrieveHash();
1593  const string hash_str = hash.ToString();
1594  const unsigned size = commands[i].GetSize();
1595  LogCvmfs(kLogQuota, kLogDebug, "processing %s (%d)",
1596  hash_str.c_str(), commands[i].command_type);
1597 
1598  bool exists;
1599  switch (commands[i].command_type) {
1600  case kTouch:
1601  sqlite3_bind_int64(stmt_touch_, 1, seq_++);
1602  sqlite3_bind_text(stmt_touch_, 2, &hash_str[0], hash_str.length(),
1603  SQLITE_STATIC);
1604  retval = sqlite3_step(stmt_touch_);
1605  LogCvmfs(kLogQuota, kLogDebug, "touching %s (%ld): %d",
1606  hash_str.c_str(), seq_-1, retval);
1607  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1608  PANIC(kLogSyslogErr, "failed to update %s in cachedb, error %d",
1609  hash_str.c_str(), retval);
1610  }
1611  sqlite3_reset(stmt_touch_);
1612  break;
1613  case kUnpin:
1614  sqlite3_bind_text(stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1615  SQLITE_STATIC);
1616  retval = sqlite3_step(stmt_unpin_);
1617  LogCvmfs(kLogQuota, kLogDebug, "unpinning %s: %d",
1618  hash_str.c_str(), retval);
1619  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1620  PANIC(kLogSyslogErr, "failed to unpin %s in cachedb, error %d",
1621  hash_str.c_str(), retval);
1622  }
1623  sqlite3_reset(stmt_unpin_);
1624  break;
1625  case kPin:
1626  case kPinRegular:
1627  case kInsert:
1628  case kInsertVolatile:
1629  // It could already be in, check
1630  exists = Contains(hash_str);
1631 
1632  // Cleanup, move to trash and unlink
1633  if (!exists && (gauge_ + size > limit_)) {
1634  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1635  gauge_, size);
1636  retval = DoCleanup(cleanup_threshold_);
1637  assert(retval != 0);
1638  }
1639 
1640  // Insert or replace
1641  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1642  SQLITE_STATIC);
1643  sqlite3_bind_int64(stmt_new_, 2, size);
1644  if (commands[i].command_type == kInsertVolatile) {
1645  sqlite3_bind_int64(stmt_new_, 3, (seq_++) | kVolatileFlag);
1646  } else {
1647  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1648  }
1649  sqlite3_bind_text(stmt_new_, 4, &descriptions[i*kMaxDescription],
1650  commands[i].desc_length, SQLITE_STATIC);
1651  sqlite3_bind_int64(stmt_new_, 5, (commands[i].command_type == kPin) ?
1653  sqlite3_bind_int64(stmt_new_, 6,
1654  ((commands[i].command_type == kPin) ||
1655  (commands[i].command_type == kPinRegular)) ? 1 : 0);
1656  retval = sqlite3_step(stmt_new_);
1657  LogCvmfs(kLogQuota, kLogDebug, "insert or replace %s, method %d: %d",
1658  hash_str.c_str(), commands[i].command_type, retval);
1659  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1660  PANIC(kLogSyslogErr, "failed to insert %s in cachedb, error %d",
1661  hash_str.c_str(), retval);
1662  }
1663  sqlite3_reset(stmt_new_);
1664 
1665  if (!exists) gauge_ += size;
1666  break;
1667  default:
1668  // other types should have been taken care of by event loop
1669  PANIC(NULL);
1670  }
1671  }
1672 
1673  retval = sqlite3_exec(database_, "COMMIT", NULL, NULL, NULL);
1674  if (retval != SQLITE_OK) {
1675  PANIC(kLogSyslogErr, "failed to commit to cachedb, error %d", retval);
1676  }
1677 }
1678 
1679 
1681  bool result = false;
1682  string sql;
1683  sqlite3_stmt *stmt_select = NULL;
1684  sqlite3_stmt *stmt_insert = NULL;
1685  int sqlerr;
1686  int seq = 0;
1687  char hex[4];
1688  struct stat info;
1689  platform_dirent64 *d;
1690  DIR *dirp = NULL;
1691  string path;
1692 
1693  LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, "re-building cache database");
1694 
1695  // Empty cache catalog and fscache
1696  sql = "DELETE FROM cache_catalog; DELETE FROM fscache;";
1697  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1698  if (sqlerr != SQLITE_OK) {
1699  LogCvmfs(kLogQuota, kLogDebug, "could not clear cache database");
1700  goto build_return;
1701  }
1702 
1703  gauge_ = 0;
1704 
1705  // Insert files from cache sub-directories 00 - ff
1706  // TODO(jblomer): fs_traversal
1707  sqlite3_prepare_v2(database_, "INSERT INTO fscache (sha1, size, actime) "
1708  "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1709 
1710  for (int i = 0; i <= 0xff; i++) {
1711  snprintf(hex, sizeof(hex), "%02x", i);
1712  path = cache_dir_ + "/" + string(hex);
1713  if ((dirp = opendir(path.c_str())) == NULL) {
1715  "failed to open directory %s (tmpwatch interfering?)",
1716  path.c_str());
1717  goto build_return;
1718  }
1719  while ((d = platform_readdir(dirp)) != NULL) {
1720  string file_path = path + "/" + string(d->d_name);
1721  if (stat(file_path.c_str(), &info) == 0) {
1722  if (!S_ISREG(info.st_mode))
1723  continue;
1724  if (info.st_size == 0) {
1726  "removing empty file %s during automatic cache db rebuild",
1727  file_path.c_str());
1728  unlink(file_path.c_str());
1729  continue;
1730  }
1731 
1732  string hash = string(hex) + string(d->d_name);
1733  sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1734  SQLITE_STATIC);
1735  sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1736  sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1737  if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1738  LogCvmfs(kLogQuota, kLogDebug, "could not insert into temp table");
1739  goto build_return;
1740  }
1741  sqlite3_reset(stmt_insert);
1742 
1743  gauge_ += info.st_size;
1744  } else {
1745  LogCvmfs(kLogQuota, kLogDebug, "could not stat %s", file_path.c_str());
1746  }
1747  }
1748  closedir(dirp);
1749  dirp = NULL;
1750  }
1751  sqlite3_finalize(stmt_insert);
1752  stmt_insert = NULL;
1753 
1754  // Transfer from temp table in cache catalog
1755  sqlite3_prepare_v2(database_,
1756  "SELECT sha1, size FROM fscache ORDER BY actime;",
1757  -1, &stmt_select, NULL);
1758  sqlite3_prepare_v2(database_,
1759  "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1760  "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1761  -1, &stmt_insert, NULL);
1762  while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1763  const string hash = string(
1764  reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1765  sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1766  sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1767  sqlite3_bind_int64(stmt_insert, 3, seq++);
1768  // Might also be a catalog (information is lost)
1769  sqlite3_bind_int64(stmt_insert, 4, kFileRegular);
1770 
1771  int retval = sqlite3_step(stmt_insert);
1772  if (retval != SQLITE_DONE) {
1773  // If the file system hosting the cache is full, we'll likely notice here
1775  "could not insert into cache catalog (%d - %s)",
1776  retval, sqlite3_errstr(retval));
1777  goto build_return;
1778  }
1779  sqlite3_reset(stmt_insert);
1780  }
1781 
1782  // Delete temporary table
1783  sql = "DELETE FROM fscache;";
1784  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1785  if (sqlerr != SQLITE_OK) {
1786  LogCvmfs(kLogQuota, kLogDebug, "could not clear temporary table (%d)",
1787  sqlerr);
1788  goto build_return;
1789  }
1790 
1791  seq_ = seq;
1792  result = true;
1794  "rebuilding finished, seqence %" PRIu64 ", gauge %" PRIu64,
1795  seq_, gauge_);
1796 
1797  build_return:
1798  if (stmt_insert) sqlite3_finalize(stmt_insert);
1799  if (stmt_select) sqlite3_finalize(stmt_select);
1800  if (dirp) closedir(dirp);
1801  return result;
1802 }
1803 
1804 
1810  int back_channel[2],
1811  const string &channel_id)
1812 {
1813  if (protocol_revision_ >= 1) {
1814  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1815  MakeReturnPipe(back_channel);
1816 
1817  LruCommand cmd;
1819  cmd.return_pipe = back_channel[1];
1820  // Not StoreHash(). This is an MD5 hash.
1821  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1822  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1823 
1824  char success;
1825  ReadHalfPipe(back_channel[0], &success, sizeof(success));
1826  // At this point, the named FIFO is unlinked, so don't use CloseReturnPipe
1827  if (success != 'S') {
1829  "failed to register quota back channel (%c)", success);
1830  }
1831  } else {
1832  // Dummy pipe to return valid file descriptors
1833  MakePipe(back_channel);
1834  }
1835 }
1836 
1837 
1842  string hash_str = hash.ToString();
1843 
1844  int pipe_remove[2];
1845  MakeReturnPipe(pipe_remove);
1846 
1847  LruCommand cmd;
1848  cmd.command_type = kRemove;
1849  cmd.return_pipe = pipe_remove[1];
1850  cmd.StoreHash(hash);
1851  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1852 
1853  bool success;
1854  ReadHalfPipe(pipe_remove[0], &success, sizeof(success));
1855  CloseReturnPipe(pipe_remove);
1856 
1857  unlink((cache_dir_ + "/" + hash.MakePathWithoutSuffix()).c_str());
1858 }
1859 
1860 
1862  if (spawned_)
1863  return;
1864 
1865  if (pthread_create(&thread_lru_, NULL, MainCommandServer,
1866  static_cast<void *>(this)) != 0)
1867  {
1868  PANIC(kLogDebug, "could not create lru thread");
1869  }
1870 
1871  spawned_ = true;
1872 }
1873 
1874 
1879  LruCommand cmd;
1880  cmd.command_type = kTouch;
1881  cmd.StoreHash(hash);
1882  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1883 }
1884 
1885 
1887  if (shared_)
1888  close(pipe_wronly);
1889 }
1890 
1891 
1893  if (shared_)
1894  unlink((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str());
1895 }
1896 
1897 
1899  LogCvmfs(kLogQuota, kLogDebug, "Unpin %s", hash.ToString().c_str());
1900 
1901  LruCommand cmd;
1902  cmd.command_type = kUnpin;
1903  cmd.StoreHash(hash);
1904  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1905 }
1906 
1907 
1909  int back_channel[2],
1910  const string &channel_id)
1911 {
1912  if (protocol_revision_ >= 1) {
1913  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1914 
1915  LruCommand cmd;
1917  // Not StoreHash(). This is an MD5 hash.
1918  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1919  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1920 
1921  // Writer's end will be closed by cache manager, FIFO is already unlinked
1922  close(back_channel[0]);
1923  } else {
1924  ClosePipe(back_channel);
1925  }
1926 }
virtual uint32_t GetProtocolRevision()
Definition: quota_posix.cc:641
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
void SetLogSyslogFacility(const int local_facility)
Definition: logging.cc:129
sqlite3_stmt * stmt_new_
Definition: quota_posix.h:325
sqlite3_stmt * stmt_list_pinned_
Definition: quota_posix.h:330
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
Definition: statistics.cc:246
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
Definition: logging.cc:97
std::vector< std::string > DoList(const CommandType list_command)
Definition: quota_posix.cc:556
struct stat64 platform_stat64
virtual pid_t GetPid()
Definition: quota_posix.cc:622
virtual uint64_t GetCleanupRate(uint64_t period_s)
Definition: quota_posix.cc:690
sqlite3_stmt * stmt_list_
Definition: quota_posix.h:329
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
Definition: quota_posix.cc:598
static Watchdog * Create(const std::string &crash_dump_path)
Definition: monitor.cc:59
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
virtual uint64_t GetSize()
Definition: quota_posix.cc:674
sqlite3_stmt * stmt_rm_
Definition: quota_posix.h:328
virtual bool Cleanup(const uint64_t leave_size)
Definition: quota_posix.cc:125
static const unsigned kSqliteMemPerThread
Definition: quota_posix.h:179
#define PANIC(...)
Definition: exception.h:26
virtual uint64_t GetMaxFileSize()
Definition: quota_posix.cc:617
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:245
uint64_t cleanup_threshold_
Definition: quota_posix.h:258
static const uint32_t kProtocolRevision
Definition: quota.h:45
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1779
void Daemonize()
Definition: posix.cc:1637
bool DoCleanup(const uint64_t leave_size)
Definition: quota_posix.cc:424
uint64_t GetSize() const
Definition: quota_posix.h:154
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: quota_posix.cc:894
void Spawn()
Definition: monitor.cc:376
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
Definition: quota_posix.cc:708
int BindReturnPipe(int pipe_wronly)
Definition: quota_posix.cc:64
std::string cache_dir_
Definition: quota_posix.h:279
bool Contains(const std::string &hash_str)
Definition: quota_posix.cc:189
perf::MultiRecorder cleanup_recorder_
Definition: quota_posix.h:318
int platform_stat(const char *path, platform_stat64 *buf)
void SetLogMicroSyslog(const std::string &filename)
Definition: logging.cc:214
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
sqlite3 * database_
Definition: quota_posix.h:320
unsigned char digest[digest_size_]
Definition: hash.h:122
virtual std::vector< std::string > ListCatalogs()
Definition: quota_posix.cc:936
void CheckHighPinWatermark()
Definition: quota_posix.cc:82
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
Definition: quota_posix.cc:660
bool FileExists(const std::string &path)
Definition: posix.cc:816
int GetLogSyslogLevel()
Definition: logging.cc:114
int64_t String2Int64(const string &value)
Definition: string.cc:222
unsigned GetDigestSize() const
Definition: hash.h:164
std::string workspace_dir_
Definition: quota_posix.h:286
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
Definition: quota_posix.cc:928
pthread_t thread_lru_
Definition: quota_posix.h:301
#define GetLogDebugFile()
void ReadHalfPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:558
void Nonblock2Block(int filedes)
Definition: posix.cc:669
virtual uint64_t GetSizePinned()
Definition: quota_posix.cc:682
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
Definition: quota_posix.cc:944
virtual uint64_t GetCapacity()
Definition: quota_posix.cc:582
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
void SetSize(const uint64_t new_size)
Definition: quota_posix.h:148
void LockBackChannels()
Definition: quota.h:96
shash::Any RetrieveHash() const
Definition: quota_posix.h:168
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
Definition: posix.cc:996
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
Definition: quota_posix.cc:230
sqlite3_stmt * stmt_touch_
Definition: quota_posix.h:321
string StringifyInt(const int64_t value)
Definition: string.cc:78
std::string GetLogMicroSyslog()
Definition: logging.cc:250
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
Definition: quota_posix.cc:531
sqlite3_stmt * stmt_list_volatile_
Definition: quota_posix.h:332
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
Definition: quota.h:109
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
Definition: quota_posix.cc:920
static int MainCacheManager(int argc, char **argv)
Definition: quota_posix.cc:952
std::map< shash::Md5, int > back_channels_
Definition: quota.h:94
static const unsigned kMaxDescription
Definition: quota_posix.h:191
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: quota_posix.cc:908
unsigned char digest[shash::kMaxDigestSize]
Definition: quota_posix.h:132
std::string MakePathWithoutSuffix() const
Definition: hash.h:331
sqlite3_stmt * stmt_list_catalogs_
Definition: quota_posix.h:331
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
Definition: quota_posix.cc:262
uint64_t GetNoTicks(uint32_t retrospect_s) const
Definition: statistics.cc:251
platform_dirent64 * platform_readdir(DIR *dirp)
virtual ~PosixQuotaManager()
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
Definition: posix.cc:680
std::map< shash::Any, uint64_t > pinned_chunks_
Definition: quota_posix.h:291
sqlite3_stmt * stmt_unpin_
Definition: quota_posix.h:322
void CloseReturnPipe(int pipe[2])
Definition: quota_posix.cc:179
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:534
virtual void Spawn()
static void size_t size
Definition: smalloc.h:47
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:546
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:584
const unsigned kDigestSizes[]
Definition: hash.h:67
void UnlockBackChannels()
Definition: quota.h:100
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
Definition: quota_posix.h:160
static const uint64_t kVolatileFlag
Definition: quota_posix.h:204
void UnlockFile(const int filedes)
Definition: posix.cc:1020
sqlite3_stmt * stmt_size_
Definition: quota_posix.h:327
struct dirent64 platform_dirent64
int GetLogSyslogFacility()
Definition: logging.cc:160