CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quota_posix.cc
Go to the documentation of this file.
1 
15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
17 
18 #include "cvmfs_config.h"
19 #include "quota_posix.h"
20 
21 #include <dirent.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdint.h>
28 #include <sys/dir.h>
29 #include <sys/stat.h>
30 #ifndef __APPLE__
31 #include <sys/statfs.h>
32 #endif
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 #include <cassert>
39 #include <cstdio>
40 #include <cstdlib>
41 #include <cstring>
42 
43 #include <map>
44 #include <set>
45 #include <string>
46 #include <vector>
47 
48 #include "crypto/hash.h"
49 #include "duplex_sqlite3.h"
50 #include "monitor.h"
51 #include "statistics.h"
52 #include "util/concurrency.h"
53 #include "util/exception.h"
54 #include "util/logging.h"
55 #include "util/platform.h"
56 #include "util/pointer.h"
57 #include "util/posix.h"
58 #include "util/smalloc.h"
59 #include "util/string.h"
60 
61 using namespace std; // NOLINT
62 
63 
64 int PosixQuotaManager::BindReturnPipe(int pipe_wronly) {
65  if (!shared_)
66  return pipe_wronly;
67 
68  // Connect writer's end
69  int result =
70  open((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str(),
71  O_WRONLY | O_NONBLOCK);
72  if (result >= 0) {
73  Nonblock2Block(result);
74  } else {
76  "failed to bind return pipe (%d)", errno);
77  }
78  return result;
79 }
80 
81 
83  const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84  if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86  "high watermark of pinned files (%" PRIu64 "M > %" PRIu64 "M)",
87  pinned_/(1024*1024), watermark/(1024*1024));
88  BroadcastBackchannels("R"); // clients: please release pinned catalogs
89  }
90 }
91 
92 
94  DIR *dirp = opendir(workspace_dir_.c_str());
95  assert(dirp != NULL);
96 
97  platform_dirent64 *dent;
98  bool found_leftovers = false;
99  while ((dent = platform_readdir(dirp)) != NULL) {
100  const string name = dent->d_name;
101  const string path = workspace_dir_ + "/" + name;
102  platform_stat64 info;
103  int retval = platform_stat(path.c_str(), &info);
104  if (retval != 0)
105  continue;
106  if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) == "pipe")) {
107  if (!found_leftovers) {
109  "removing left-over FIFOs from cache directory");
110  }
111  found_leftovers = true;
112  unlink(path.c_str());
113  }
114  }
115  closedir(dirp);
116 }
117 
118 
125 bool PosixQuotaManager::Cleanup(const uint64_t leave_size) {
126  if (!spawned_)
127  return DoCleanup(leave_size);
128 
129  bool result;
130  int pipe_cleanup[2];
131  MakeReturnPipe(pipe_cleanup);
132 
133  LruCommand cmd;
134  cmd.command_type = kCleanup;
135  cmd.size = leave_size;
136  cmd.return_pipe = pipe_cleanup[1];
137 
138  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
139  ReadHalfPipe(pipe_cleanup[0], &result, sizeof(result));
140  CloseReturnPipe(pipe_cleanup);
141 
142  return result;
143 }
144 
145 
147  if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148  if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149  if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150  if (stmt_list_) sqlite3_finalize(stmt_list_);
151  if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152  if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153  if (stmt_size_) sqlite3_finalize(stmt_size_);
154  if (stmt_touch_) sqlite3_finalize(stmt_touch_);
155  if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
156  if (stmt_block_) sqlite3_finalize(stmt_block_);
157  if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
158  if (stmt_new_) sqlite3_finalize(stmt_new_);
159  if (database_) sqlite3_close(database_);
160  UnlockFile(fd_lock_cachedb_);
161 
162  stmt_list_catalogs_ = NULL;
163  stmt_list_pinned_ = NULL;
164  stmt_list_volatile_ = NULL;
165  stmt_list_ = NULL;
166  stmt_rm_ = NULL;
167  stmt_size_ = NULL;
168  stmt_touch_ = NULL;
169  stmt_unpin_ = NULL;
170  stmt_block_ = NULL;
171  stmt_unblock_ = NULL;
172  stmt_new_ = NULL;
173  database_ = NULL;
174 
175  pinned_chunks_.clear();
176 }
177 
178 
180  if (shared_) {
181  close(pipe[0]);
182  UnlinkReturnPipe(pipe[1]);
183  } else {
184  ClosePipe(pipe);
185  }
186 }
187 
188 
189 bool PosixQuotaManager::Contains(const string &hash_str) {
190  bool result = false;
191 
192  sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
193  SQLITE_STATIC);
194  if (sqlite3_step(stmt_size_) == SQLITE_ROW)
195  result = true;
196  sqlite3_reset(stmt_size_);
197  LogCvmfs(kLogQuota, kLogDebug, "contains %s returns %d",
198  hash_str.c_str(), result);
199 
200  return result;
201 }
202 
203 
205  if ((limit_ == 0) || (gauge_ >= limit_))
206  return;
207 
208  struct statvfs vfs_info;
209  int retval = statvfs((cache_dir_ + "/cachedb").c_str(), &vfs_info);
210  if (retval != 0) {
212  "failed to query %s for free space (%d)",
213  cache_dir_.c_str(), errno);
214  return;
215  }
216  int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
217  LogCvmfs(kLogQuota, kLogDebug, "free space: %" PRId64 " MB",
218  free_space_byte / (1024 * 1024));
219 
220  int64_t required_byte = limit_ - gauge_;
221  if (free_space_byte < required_byte) {
223  "too little free space on the file system hosting the cache,"
224  " %" PRId64 " MB available",
225  free_space_byte / (1024 * 1024));
226  }
227 }
228 
229 
231  const string &cache_workspace,
232  const uint64_t limit,
233  const uint64_t cleanup_threshold,
234  const bool rebuild_database)
235 {
236  if (cleanup_threshold >= limit) {
237  LogCvmfs(kLogQuota, kLogDebug, "invalid parameters: limit %" PRIu64 ", "
238  "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
239  return NULL;
240  }
241 
242  PosixQuotaManager *quota_manager =
243  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
244 
245  // Initialize cache catalog
246  if (!quota_manager->InitDatabase(rebuild_database)) {
247  delete quota_manager;
248  return NULL;
249  }
250  quota_manager->CheckFreeSpace();
251  MakePipe(quota_manager->pipe_lru_);
252 
253  quota_manager->protocol_revision_ = kProtocolRevision;
254  quota_manager->initialized_ = true;
255  return quota_manager;
256 }
257 
258 
263  const std::string &exe_path,
264  const std::string &cache_workspace,
265  const uint64_t limit,
266  const uint64_t cleanup_threshold,
267  bool foreground)
268 {
269  string cache_dir;
270  string workspace_dir;
271  ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
272 
273  // Create lock file: only one fuse client at a time
274  const int fd_lockfile = LockFile(workspace_dir + "/lock_cachemgr");
275  if (fd_lockfile < 0) {
276  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
277  (workspace_dir + "/lock_cachemgr").c_str(), errno);
278  return NULL;
279  }
280 
281  PosixQuotaManager *quota_mgr =
282  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
283  quota_mgr->shared_ = true;
284  quota_mgr->spawned_ = true;
285 
286  // Try to connect to pipe
287  const string fifo_path = workspace_dir + "/cachemgr";
288  LogCvmfs(kLogQuota, kLogDebug, "trying to connect to existing pipe");
289  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
290  if (quota_mgr->pipe_lru_[1] >= 0) {
291  LogCvmfs(kLogQuota, kLogDebug, "connected to existing cache manager pipe");
292  quota_mgr->initialized_ = true;
293  Nonblock2Block(quota_mgr->pipe_lru_[1]);
294  UnlockFile(fd_lockfile);
295  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
297  "received limit %" PRIu64 ", threshold %" PRIu64,
298  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
299  if (FileExists(workspace_dir + "/cachemgr.protocol")) {
300  quota_mgr->protocol_revision_ = quota_mgr->GetProtocolRevision();
301  LogCvmfs(kLogQuota, kLogDebug, "connected protocol revision %u",
302  quota_mgr->protocol_revision_);
303  } else {
304  LogCvmfs(kLogQuota, kLogDebug, "connected to ancient cache manager");
305  }
306  return quota_mgr;
307  }
308  const int connect_error = errno;
309 
310  // Lock file: let existing cache manager finish first
311  const int fd_lockfile_fifo = LockFile(workspace_dir + "/lock_cachemgr.fifo");
312  if (fd_lockfile_fifo < 0) {
313  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
314  (workspace_dir + "/lock_cachemgr.fifo").c_str(), errno);
315  UnlockFile(fd_lockfile);
316  delete quota_mgr;
317  return NULL;
318  }
319  UnlockFile(fd_lockfile_fifo);
320 
321  if (connect_error == ENXIO) {
322  LogCvmfs(kLogQuota, kLogDebug, "left-over FIFO found, unlinking");
323  unlink(fifo_path.c_str());
324  }
325 
326  // Creating a new FIFO for the cache manager (to be bound later)
327  int retval = mkfifo(fifo_path.c_str(), 0600);
328  if (retval != 0) {
329  LogCvmfs(kLogQuota, kLogDebug, "failed to create cache manager FIFO (%d)",
330  errno);
331  UnlockFile(fd_lockfile);
332  delete quota_mgr;
333  return NULL;
334  }
335 
336  // Create new cache manager
337  int pipe_boot[2];
338  int pipe_handshake[2];
339  MakePipe(pipe_boot);
340  MakePipe(pipe_handshake);
341 
342  vector<string> command_line;
343  command_line.push_back(exe_path);
344  command_line.push_back("__cachemgr__");
345  command_line.push_back(cache_workspace);
346  command_line.push_back(StringifyInt(pipe_boot[1]));
347  command_line.push_back(StringifyInt(pipe_handshake[0]));
348  command_line.push_back(StringifyInt(limit));
349  command_line.push_back(StringifyInt(cleanup_threshold));
350  command_line.push_back(StringifyInt(foreground));
351  command_line.push_back(StringifyInt(GetLogSyslogLevel()));
352  command_line.push_back(StringifyInt(GetLogSyslogFacility()));
353  command_line.push_back(GetLogDebugFile() + ":" + GetLogMicroSyslog());
354 
355  set<int> preserve_filedes;
356  preserve_filedes.insert(0);
357  preserve_filedes.insert(1);
358  preserve_filedes.insert(2);
359  preserve_filedes.insert(pipe_boot[1]);
360  preserve_filedes.insert(pipe_handshake[0]);
361 
362  retval = ManagedExec(command_line, preserve_filedes, map<int, int>(), false);
363  if (!retval) {
364  UnlockFile(fd_lockfile);
365  ClosePipe(pipe_boot);
366  ClosePipe(pipe_handshake);
367  delete quota_mgr;
368  LogCvmfs(kLogQuota, kLogDebug, "failed to start cache manager");
369  return NULL;
370  }
371 
372  // Wait for cache manager to be ready
373  close(pipe_boot[1]);
374  close(pipe_handshake[0]);
375  char buf;
376  if (read(pipe_boot[0], &buf, 1) != 1) {
377  UnlockFile(fd_lockfile);
378  close(pipe_boot[0]);
379  close(pipe_handshake[1]);
380  delete quota_mgr;
382  "cache manager did not start");
383  return NULL;
384  }
385  close(pipe_boot[0]);
386 
387  // Connect write end
388  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
389  if (quota_mgr->pipe_lru_[1] < 0) {
391  "failed to connect to newly created FIFO (%d)", errno);
392  close(pipe_handshake[1]);
393  UnlockFile(fd_lockfile);
394  delete quota_mgr;
395  return NULL;
396  }
397 
398  // Finalize handshake
399  buf = 'C';
400  if (write(pipe_handshake[1], &buf, 1) != 1) {
401  UnlockFile(fd_lockfile);
402  close(pipe_handshake[1]);
403  LogCvmfs(kLogQuota, kLogDebug, "could not finalize handshake");
404  delete quota_mgr;
405  return NULL;
406  }
407  close(pipe_handshake[1]);
408 
409  Nonblock2Block(quota_mgr->pipe_lru_[1]);
410  LogCvmfs(kLogQuota, kLogDebug, "connected to a new cache manager");
411  quota_mgr->protocol_revision_ = kProtocolRevision;
412 
413  UnlockFile(fd_lockfile);
414 
415  quota_mgr->initialized_ = true;
416  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
417  LogCvmfs(kLogQuota, kLogDebug, "received limit %" PRIu64 ", "
418  "threshold %" PRIu64,
419  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
420  return quota_mgr;
421 }
422 
423 
424 bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
425  if (gauge_ <= leave_size)
426  return true;
427 
428  // TODO(jblomer) transaction
430  "clean up cache until at most %lu KB is used", leave_size/1024);
431  LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_);
432  cleanup_recorder_.Tick();
433 
434  bool result;
435  string hash_str;
436  vector<string> trash;
437 
438  do {
439  sqlite3_reset(stmt_lru_);
440  if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
441  LogCvmfs(kLogQuota, kLogDebug, "could not get lru-entry");
442  break;
443  }
444 
445  hash_str = string(reinterpret_cast<const char *>(
446  sqlite3_column_text(stmt_lru_, 0)));
447  LogCvmfs(kLogQuota, kLogDebug, "removing %s", hash_str.c_str());
448  shash::Any hash = shash::MkFromHexPtr(shash::HexPtr(hash_str));
449 
450  // That's a critical condition. We must not delete a not yet inserted
451  // pinned file as it is already reserved (but will be inserted later).
452  // Instead, set the pin bit in the db to not run into an endless loop
453  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
454  trash.push_back(cache_dir_ + "/" + hash.MakePathWithoutSuffix());
455  gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
456  LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64,
457  hash_str.c_str(), gauge_);
458 
459  sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
460  SQLITE_STATIC);
461  result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
462  sqlite3_reset(stmt_rm_);
463 
464  if (!result) {
466  "failed to find %s in cache database (%d). "
467  "Cache database is out of sync. "
468  "Restart cvmfs with clean cache.", hash_str.c_str(), result);
469  return false;
470  }
471  } else {
472  sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
473  SQLITE_STATIC);
474  result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
475  sqlite3_reset(stmt_block_);
476  assert(result);
477  }
478  } while (gauge_ > leave_size);
479 
480  result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
481  sqlite3_reset(stmt_unblock_);
482  assert(result);
483 
484  // Double fork avoids zombie, forked removal process must not flush file
485  // buffers
486  if (!trash.empty()) {
487  if (async_delete_) {
488  pid_t pid;
489  int statloc;
490  if ((pid = fork()) == 0) {
491  // TODO(jblomer): eviciting files in the cache should perhaps become a
492  // thread. This would also allow to block the chunks and prevent the
493  // race with re-insertion. Then again, a thread can block umount.
494 #ifndef DEBUGMSG
495  CloseAllFildes(std::set<int>());
496 #endif
497  if (fork() == 0) {
498  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
499  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
500  unlink(trash[i].c_str());
501  }
502  _exit(0);
503  }
504  _exit(0);
505  } else {
506  if (pid > 0)
507  waitpid(pid, &statloc, 0);
508  else
509  return false;
510  }
511  } else { // !async_delete_
512  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
513  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
514  unlink(trash[i].c_str());
515  }
516  }
517  }
518 
519  if (gauge_ > leave_size) {
521  "request to clean until %" PRIu64 ", "
522  "but effective gauge is %" PRIu64, leave_size, gauge_);
523  return false;
524  }
525  return true;
526 }
527 
528 
530  const shash::Any &hash,
531  const uint64_t size,
532  const string &description,
533  const CommandType command_type)
534 {
535  const string hash_str = hash.ToString();
536  LogCvmfs(kLogQuota, kLogDebug, "insert into lru %s, path %s, method %d",
537  hash_str.c_str(), description.c_str(), command_type);
538  const unsigned desc_length = (description.length() > kMaxDescription) ?
539  kMaxDescription : description.length();
540 
541  LruCommand *cmd =
542  reinterpret_cast<LruCommand *>(alloca(sizeof(LruCommand) + desc_length));
543  new (cmd) LruCommand;
544  cmd->command_type = command_type;
545  cmd->SetSize(size);
546  cmd->StoreHash(hash);
547  cmd->desc_length = desc_length;
548  memcpy(reinterpret_cast<char *>(cmd)+sizeof(LruCommand),
549  &description[0], desc_length);
550  WritePipe(pipe_lru_[1], cmd, sizeof(LruCommand) + desc_length);
551 }
552 
553 
554 vector<string> PosixQuotaManager::DoList(const CommandType list_command) {
555  vector<string> result;
556 
557  int pipe_list[2];
558  MakeReturnPipe(pipe_list);
559  char description_buffer[kMaxDescription];
560 
561  LruCommand cmd;
562  cmd.command_type = list_command;
563  cmd.return_pipe = pipe_list[1];
564  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
565 
566  int length;
567  do {
568  ReadHalfPipe(pipe_list[0], &length, sizeof(length));
569  if (length > 0) {
570  ReadPipe(pipe_list[0], description_buffer, length);
571  result.push_back(string(description_buffer, length));
572  }
573  } while (length >= 0);
574 
575  CloseReturnPipe(pipe_list);
576  return result;
577 }
578 
579 
581  if (limit_ != (uint64_t)(-1))
582  return limit_;
583 
584  // Unrestricted cache, look at free space on cache dir fs
585  struct statfs info;
586  if (statfs(".", &info) == 0) {
587  return info.f_bavail * info.f_bsize;
588  } else {
590  "failed to query file system info of cache (%d)", errno);
591  return limit_;
592  }
593 }
594 
595 
596 void PosixQuotaManager::GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
597 {
598  int pipe_limits[2];
599  MakeReturnPipe(pipe_limits);
600 
601  LruCommand cmd;
602  cmd.command_type = kLimits;
603  cmd.return_pipe = pipe_limits[1];
604  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
605  ReadHalfPipe(pipe_limits[0], limit, sizeof(*limit));
606  ReadPipe(pipe_limits[0], cleanup_threshold, sizeof(*cleanup_threshold));
607  CloseReturnPipe(pipe_limits);
608 }
609 
610 
616  return limit_ - cleanup_threshold_;
617 }
618 
619 
621  if (!shared_ || !spawned_) {
622  return getpid();
623  }
624 
625  pid_t result;
626  int pipe_pid[2];
627  MakeReturnPipe(pipe_pid);
628 
629  LruCommand cmd;
630  cmd.command_type = kPid;
631  cmd.return_pipe = pipe_pid[1];
632  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
633  ReadHalfPipe(pipe_pid[0], &result, sizeof(result));
634  CloseReturnPipe(pipe_pid);
635  return result;
636 }
637 
638 
640  int pipe_revision[2];
641  MakeReturnPipe(pipe_revision);
642 
643  LruCommand cmd;
644  cmd.command_type = kGetProtocolRevision;
645  cmd.return_pipe = pipe_revision[1];
646  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
647 
648  uint32_t revision;
649  ReadHalfPipe(pipe_revision[0], &revision, sizeof(revision));
650  CloseReturnPipe(pipe_revision);
651  return revision;
652 }
653 
654 
658 void PosixQuotaManager::GetSharedStatus(uint64_t *gauge, uint64_t *pinned) {
659  int pipe_status[2];
660  MakeReturnPipe(pipe_status);
661 
662  LruCommand cmd;
663  cmd.command_type = kStatus;
664  cmd.return_pipe = pipe_status[1];
665  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
666  ReadHalfPipe(pipe_status[0], gauge, sizeof(*gauge));
667  ReadPipe(pipe_status[0], pinned, sizeof(*pinned));
668  CloseReturnPipe(pipe_status);
669 }
670 
671 
673  if (!spawned_) return gauge_;
674  uint64_t gauge, size_pinned;
675  GetSharedStatus(&gauge, &size_pinned);
676  return gauge;
677 }
678 
679 
681  if (!spawned_) return pinned_;
682  uint64_t gauge, size_pinned;
683  GetSharedStatus(&gauge, &size_pinned);
684  return size_pinned;
685 }
686 
687 
688 uint64_t PosixQuotaManager::GetCleanupRate(uint64_t period_s) {
689  if (!spawned_ || (protocol_revision_ < 2)) return 0;
690  uint64_t cleanup_rate;
691 
692  int pipe_cleanup_rate[2];
693  MakeReturnPipe(pipe_cleanup_rate);
694  LruCommand cmd;
695  cmd.command_type = kCleanupRate;
696  cmd.size = period_s;
697  cmd.return_pipe = pipe_cleanup_rate[1];
698  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
699  ReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate, sizeof(cleanup_rate));
700  CloseReturnPipe(pipe_cleanup_rate);
701 
702  return cleanup_rate;
703 }
704 
705 
706 bool PosixQuotaManager::InitDatabase(const bool rebuild_database) {
707  string sql;
708  sqlite3_stmt *stmt;
709 
710  fd_lock_cachedb_ = LockFile(workspace_dir_ + "/lock_cachedb");
711  if (fd_lock_cachedb_ < 0) {
712  LogCvmfs(kLogQuota, kLogDebug, "failed to create cachedb lock");
713  return false;
714  }
715 
716  bool retry = false;
717  const string db_file = cache_dir_ + "/cachedb";
718  if (rebuild_database) {
719  LogCvmfs(kLogQuota, kLogDebug, "rebuild database, unlinking existing (%s)",
720  db_file.c_str());
721  unlink(db_file.c_str());
722  unlink((db_file + "-journal").c_str());
723  }
724 
725  init_recover:
726  int err = sqlite3_open(db_file.c_str(), &database_);
727  if (err != SQLITE_OK) {
728  LogCvmfs(kLogQuota, kLogDebug, "could not open cache database (%d)", err);
729  goto init_database_fail;
730  }
731  // TODO(reneme): make this a `QuotaDatabase : public sqlite::Database`
732  sql = "PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
733  "PRAGMA auto_vacuum=1; "
734  "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
735  " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
736  "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
737  "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
738  " ON cache_catalog (acseq); "
739  "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
740  "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
741  "CREATE INDEX idx_fscache_actime ON fscache (actime); "
742  "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
743  " CONSTRAINT pk_properties PRIMARY KEY(key));";
744  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
745  if (err != SQLITE_OK) {
746  if (!retry) {
747  retry = true;
748  sqlite3_close(database_);
749  unlink(db_file.c_str());
750  unlink((db_file + "-journal").c_str());
752  "LRU database corrupted, re-building");
753  goto init_recover;
754  }
755  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
756  sql.c_str());
757  goto init_database_fail;
758  }
759 
760  // If this an old cache catalog,
761  // add and initialize new columns to cache_catalog
762  sql = "ALTER TABLE cache_catalog ADD type INTEGER; "
763  "ALTER TABLE cache_catalog ADD pinned INTEGER";
764  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
765  if (err == SQLITE_OK) {
766  sql = "UPDATE cache_catalog SET type=" + StringifyInt(kFileRegular) + ";";
767  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
768  if (err != SQLITE_OK) {
770  "could not init cache database (failed: %s)", sql.c_str());
771  goto init_database_fail;
772  }
773  }
774 
775  // Set pinned back
776  sql = "UPDATE cache_catalog SET pinned=0;";
777  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
778  if (err != SQLITE_OK) {
779  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
780  sql.c_str());
781  goto init_database_fail;
782  }
783 
784  // Set schema version
785  sql = "INSERT OR REPLACE INTO properties (key, value) "
786  "VALUES ('schema', '1.0')";
787  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
788  if (err != SQLITE_OK) {
789  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
790  sql.c_str());
791  goto init_database_fail;
792  }
793 
794  // If cache catalog is empty, recreate from file system
795  sql = "SELECT count(*) FROM cache_catalog;";
796  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
797  if (sqlite3_step(stmt) == SQLITE_ROW) {
798  if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
800  "CernVM-FS: building lru cache database...");
801  if (!RebuildDatabase()) {
803  "could not build cache database from file system");
804  sqlite3_finalize(stmt);
805  goto init_database_fail;
806  }
807  }
808  sqlite3_finalize(stmt);
809  } else {
810  LogCvmfs(kLogQuota, kLogDebug, "could not select on cache catalog");
811  sqlite3_finalize(stmt);
812  goto init_database_fail;
813  }
814 
815  // How many bytes do we already have in cache?
816  sql = "SELECT sum(size) FROM cache_catalog;";
817  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
818  if (sqlite3_step(stmt) == SQLITE_ROW) {
819  gauge_ = sqlite3_column_int64(stmt, 0);
820  } else {
821  LogCvmfs(kLogQuota, kLogDebug, "could not determine cache size");
822  sqlite3_finalize(stmt);
823  goto init_database_fail;
824  }
825  sqlite3_finalize(stmt);
826 
827  // Highest seq-no?
828  sql = "SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
829  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
830  if (sqlite3_step(stmt) == SQLITE_ROW) {
831  seq_ = sqlite3_column_int64(stmt, 0)+1;
832  } else {
833  LogCvmfs(kLogQuota, kLogDebug, "could not determine highest seq-no");
834  sqlite3_finalize(stmt);
835  goto init_database_fail;
836  }
837  sqlite3_finalize(stmt);
838 
839  // Prepare touch, new, remove statements
840  sqlite3_prepare_v2(database_,
841  "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
842  "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
843  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=0 "
844  "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
845  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=2 "
846  "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
847  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=1 "
848  "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
849  sqlite3_prepare_v2(database_,
850  "INSERT OR REPLACE INTO cache_catalog "
851  "(sha1, size, acseq, path, type, pinned) "
852  "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
853  -1, &stmt_new_, NULL);
854  sqlite3_prepare_v2(database_,
855  "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
856  -1, &stmt_size_, NULL);
857  sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;",
858  -1, &stmt_rm_, NULL);
859  sqlite3_prepare_v2(database_,
860  "SELECT sha1, size FROM cache_catalog WHERE "
861  "acseq=(SELECT min(acseq) "
862  "FROM cache_catalog WHERE pinned<>2);",
863  -1, &stmt_lru_, NULL);
864  sqlite3_prepare_v2(database_,
865  ("SELECT path FROM cache_catalog WHERE type=" +
866  StringifyInt(kFileRegular) +
867  ";").c_str(), -1, &stmt_list_, NULL);
868  sqlite3_prepare_v2(database_,
869  "SELECT path FROM cache_catalog WHERE pinned<>0;",
870  -1, &stmt_list_pinned_, NULL);
871  sqlite3_prepare_v2(database_,
872  "SELECT path FROM cache_catalog WHERE acseq < 0;",
873  -1, &stmt_list_volatile_, NULL);
874  sqlite3_prepare_v2(database_,
875  ("SELECT path FROM cache_catalog WHERE type=" +
876  StringifyInt(kFileCatalog) +
877  ";").c_str(), -1, &stmt_list_catalogs_, NULL);
878  return true;
879 
880  init_database_fail:
881  sqlite3_close(database_);
882  database_ = NULL;
883  UnlockFile(fd_lock_cachedb_);
884  return false;
885 }
886 
887 
893  const shash::Any &any_hash,
894  const uint64_t size,
895  const string &description)
896 {
897  DoInsert(any_hash, size, description, kInsert);
898 }
899 
900 
907  const shash::Any &any_hash,
908  const uint64_t size,
909  const string &description)
910 {
911  DoInsert(any_hash, size, description, kInsertVolatile);
912 }
913 
914 
918 vector<string> PosixQuotaManager::List() {
919  return DoList(kList);
920 }
921 
922 
927  return DoList(kListPinned);
928 }
929 
930 
935  return DoList(kListCatalogs);
936 }
937 
938 
943  return DoList(kListVolatile);
944 }
945 
946 
950 int PosixQuotaManager::MainCacheManager(int argc, char **argv) {
951  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
952  int retval;
953 
954  PosixQuotaManager shared_manager(0, 0, "");
955  shared_manager.shared_ = true;
956  shared_manager.spawned_ = true;
957  shared_manager.pinned_ = 0;
958 
959  // Process command line arguments
960  ParseDirectories(string(argv[2]),
961  &shared_manager.cache_dir_,
962  &shared_manager.workspace_dir_);
963  int pipe_boot = String2Int64(argv[3]);
964  int pipe_handshake = String2Int64(argv[4]);
965  shared_manager.limit_ = String2Int64(argv[5]);
966  shared_manager.cleanup_threshold_ = String2Int64(argv[6]);
967  int foreground = String2Int64(argv[7]);
968  int syslog_level = String2Int64(argv[8]);
969  int syslog_facility = String2Int64(argv[9]);
970  vector<string> logfiles = SplitString(argv[10], ':');
971 
972  SetLogSyslogLevel(syslog_level);
973  SetLogSyslogFacility(syslog_facility);
974  if ((logfiles.size() > 0) && (logfiles[0] != ""))
975  SetLogDebugFile(logfiles[0] + ".cachemgr");
976  if (logfiles.size() > 1)
977  SetLogMicroSyslog(logfiles[1]);
978 
979  if (!foreground)
980  Daemonize();
981 
982  UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
983  assert(watchdog.IsValid());
984  watchdog->Spawn("./stacktrace.cachemgr");
985 
986  // Initialize pipe, open non-blocking as cvmfs is not yet connected
987  const int fd_lockfile_fifo =
988  LockFile(shared_manager.workspace_dir_ + "/lock_cachemgr.fifo");
989  if (fd_lockfile_fifo < 0) {
990  LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, "could not open lock file "
991  "%s (%d)",
992  (shared_manager.workspace_dir_ + "/lock_cachemgr.fifo").c_str(),
993  errno);
994  return 1;
995  }
996  const string crash_guard = shared_manager.cache_dir_ + "/cachemgr.running";
997  const bool rebuild = FileExists(crash_guard);
998  retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
999  if (retval < 0) {
1001  "failed to create shared cache manager crash guard");
1002  UnlockFile(fd_lockfile_fifo);
1003  return 1;
1004  }
1005  close(retval);
1006 
1007  // Redirect SQlite temp directory to cache (global variable)
1008  const string tmp_dir = shared_manager.workspace_dir_;
1009  sqlite3_temp_directory =
1010  static_cast<char *>(sqlite3_malloc(tmp_dir.length() + 1));
1011  snprintf(sqlite3_temp_directory, tmp_dir.length() + 1, "%s", tmp_dir.c_str());
1012 
1013  // Cleanup leftover named pipes
1014  shared_manager.CleanupPipes();
1015 
1016  if (!shared_manager.InitDatabase(rebuild)) {
1017  UnlockFile(fd_lockfile_fifo);
1018  return 1;
1019  }
1020  shared_manager.CheckFreeSpace();
1021 
1022  // Save protocol revision to file. If the file is not found, it indicates
1023  // to the client that the cache manager is from times before the protocol
1024  // was versioned.
1025  const string protocol_revision_path =
1026  shared_manager.workspace_dir_ + "/cachemgr.protocol";
1027  retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1028  if (retval < 0) {
1030  "failed to open protocol revision file (%d)", errno);
1031  UnlockFile(fd_lockfile_fifo);
1032  return 1;
1033  }
1034  const string revision = StringifyInt(kProtocolRevision);
1035  int written = write(retval, revision.data(), revision.length());
1036  close(retval);
1037  if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1039  "failed to write protocol revision (%d)", errno);
1040  UnlockFile(fd_lockfile_fifo);
1041  return 1;
1042  }
1043 
1044  const string fifo_path = shared_manager.workspace_dir_ + "/cachemgr";
1045  shared_manager.pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1046  if (shared_manager.pipe_lru_[0] < 0) {
1047  LogCvmfs(kLogQuota, kLogDebug, "failed to listen on FIFO %s (%d)",
1048  fifo_path.c_str(), errno);
1049  UnlockFile(fd_lockfile_fifo);
1050  return 1;
1051  }
1052  Nonblock2Block(shared_manager.pipe_lru_[0]);
1053  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager listening");
1054 
1055  char buf = 'C';
1056  WritePipe(pipe_boot, &buf, 1);
1057  close(pipe_boot);
1058 
1059  ReadPipe(pipe_handshake, &buf, 1);
1060  close(pipe_handshake);
1061  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager handshake done");
1062 
1063  // Ensure that broken pipes from clients do not kill the cache manager
1064  signal(SIGPIPE, SIG_IGN);
1065  // Don't let Ctrl-C ungracefully kill interactive session
1066  signal(SIGINT, SIG_IGN);
1067 
1068  shared_manager.MainCommandServer(&shared_manager);
1069  unlink(fifo_path.c_str());
1070  unlink(protocol_revision_path.c_str());
1071  shared_manager.CloseDatabase();
1072  unlink(crash_guard.c_str());
1073  UnlockFile(fd_lockfile_fifo);
1074 
1075  if (sqlite3_temp_directory) {
1076  sqlite3_free(sqlite3_temp_directory);
1077  sqlite3_temp_directory = NULL;
1078  }
1079 
1080  return 0;
1081 }
1082 
1083 
1085  PosixQuotaManager *quota_mgr = static_cast<PosixQuotaManager *>(data);
1086 
1087  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1088  sqlite3_soft_heap_limit(quota_mgr->kSqliteMemPerThread);
1089 
1090  LruCommand command_buffer[kCommandBufferSize];
1091  char description_buffer[kCommandBufferSize*kMaxDescription];
1092  unsigned num_commands = 0;
1093 
1094  while (read(quota_mgr->pipe_lru_[0], &command_buffer[num_commands],
1095  sizeof(command_buffer[0])) == sizeof(command_buffer[0]))
1096  {
1097  const CommandType command_type = command_buffer[num_commands].command_type;
1098  LogCvmfs(kLogQuota, kLogDebug, "received command %d", command_type);
1099  const uint64_t size = command_buffer[num_commands].GetSize();
1100 
1101  // Inserts and pins come with a description (usually a path)
1102  if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1103  (command_type == kPin) || (command_type == kPinRegular))
1104  {
1105  const int desc_length = command_buffer[num_commands].desc_length;
1106  ReadPipe(quota_mgr->pipe_lru_[0],
1107  &description_buffer[kMaxDescription*num_commands], desc_length);
1108  }
1109 
1110  // The protocol revision is returned immediately
1111  if (command_type == kGetProtocolRevision) {
1112  int return_pipe =
1113  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1114  if (return_pipe < 0)
1115  continue;
1116  WritePipe(return_pipe, &quota_mgr->kProtocolRevision,
1117  sizeof(quota_mgr->kProtocolRevision));
1118  quota_mgr->UnbindReturnPipe(return_pipe);
1119  continue;
1120  }
1121 
1122  // The cleanup rate is returned immediately
1123  if (command_type == kCleanupRate) {
1124  int return_pipe =
1125  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1126  if (return_pipe < 0)
1127  continue;
1128  uint64_t period_s = size; // use the size field to transmit the period
1129  uint64_t rate = quota_mgr->cleanup_recorder_.GetNoTicks(period_s);
1130  WritePipe(return_pipe, &rate, sizeof(rate));
1131  quota_mgr->UnbindReturnPipe(return_pipe);
1132  continue;
1133  }
1134 
1135  // Reservations are handled immediately and "out of band"
1136  if (command_type == kReserve) {
1137  bool success = true;
1138  int return_pipe =
1139  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1140  if (return_pipe < 0)
1141  continue;
1142 
1143  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1144  const string hash_str(hash.ToString());
1145  LogCvmfs(kLogQuota, kLogDebug, "reserve %lu bytes for %s",
1146  size, hash_str.c_str());
1147 
1148  if (quota_mgr->pinned_chunks_.find(hash) ==
1149  quota_mgr->pinned_chunks_.end())
1150  {
1151  if ((quota_mgr->pinned_ + size) > quota_mgr->cleanup_threshold_) {
1153  "failed to insert %s (pinned), no space", hash_str.c_str());
1154  success = false;
1155  } else {
1156  quota_mgr->pinned_chunks_[hash] = size;
1157  quota_mgr->pinned_ += size;
1158  quota_mgr->CheckHighPinWatermark();
1159  }
1160  }
1161 
1162  WritePipe(return_pipe, &success, sizeof(success));
1163  quota_mgr->UnbindReturnPipe(return_pipe);
1164  continue;
1165  }
1166 
1167  // Back channels are also handled out of band
1168  if (command_type == kRegisterBackChannel) {
1169  int return_pipe =
1170  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1171  if (return_pipe < 0)
1172  continue;
1173 
1174  quota_mgr->UnlinkReturnPipe(command_buffer[num_commands].return_pipe);
1175  Block2Nonblock(return_pipe); // back channels are opportunistic
1176  shash::Md5 hash;
1177  memcpy(hash.digest, command_buffer[num_commands].digest,
1179 
1180  quota_mgr->LockBackChannels();
1181  map<shash::Md5, int>::const_iterator iter =
1182  quota_mgr->back_channels_.find(hash);
1183  if (iter != quota_mgr->back_channels_.end()) {
1185  "closing left-over back channel %s", hash.ToString().c_str());
1186  close(iter->second);
1187  }
1188  quota_mgr->back_channels_[hash] = return_pipe;
1189  quota_mgr->UnlockBackChannels();
1190 
1191  char success = 'S';
1192  WritePipe(return_pipe, &success, sizeof(success));
1193  LogCvmfs(kLogQuota, kLogDebug, "register back channel %s on fd %d",
1194  hash.ToString().c_str(), return_pipe);
1195 
1196  continue;
1197  }
1198 
1199  if (command_type == kUnregisterBackChannel) {
1200  shash::Md5 hash;
1201  memcpy(hash.digest, command_buffer[num_commands].digest,
1203 
1204  quota_mgr->LockBackChannels();
1205  map<shash::Md5, int>::iterator iter =
1206  quota_mgr->back_channels_.find(hash);
1207  if (iter != quota_mgr->back_channels_.end()) {
1209  "closing back channel %s", hash.ToString().c_str());
1210  close(iter->second);
1211  quota_mgr->back_channels_.erase(iter);
1212  } else {
1214  "did not find back channel %s", hash.ToString().c_str());
1215  }
1216  quota_mgr->UnlockBackChannels();
1217 
1218  continue;
1219  }
1220 
1221  // Unpinnings are also handled immediately with respect to the pinned gauge
1222  if (command_type == kUnpin) {
1223  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1224  const string hash_str(hash.ToString());
1225 
1226  map<shash::Any, uint64_t>::iterator iter =
1227  quota_mgr->pinned_chunks_.find(hash);
1228  if (iter != quota_mgr->pinned_chunks_.end()) {
1229  quota_mgr->pinned_ -= iter->second;
1230  quota_mgr->pinned_chunks_.erase(iter);
1231  // It can happen that files get pinned that were removed from the cache
1232  // (see cache.cc). We fix this at this point, where we remove such
1233  // entries from the cache database.
1234  if (!FileExists(quota_mgr->cache_dir_ + "/" +
1235  hash.MakePathWithoutSuffix()))
1236  {
1238  "remove orphaned pinned hash %s from cache database",
1239  hash_str.c_str());
1240  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1241  hash_str.length(), SQLITE_STATIC);
1242  int retval;
1243  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1244  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1245  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1246  hash_str.length(), SQLITE_STATIC);
1247  retval = sqlite3_step(quota_mgr->stmt_rm_);
1248  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1249  quota_mgr->gauge_ -= size;
1250  } else {
1252  "failed to delete %s (%d)", hash_str.c_str(), retval);
1253  }
1254  sqlite3_reset(quota_mgr->stmt_rm_);
1255  }
1256  sqlite3_reset(quota_mgr->stmt_size_);
1257  }
1258  } else {
1259  LogCvmfs(kLogQuota, kLogDebug, "this chunk was not pinned");
1260  }
1261  }
1262 
1263  // Immediate commands trigger flushing of the buffer
1264  bool immediate_command = (command_type == kCleanup) ||
1265  (command_type == kList) || (command_type == kListPinned) ||
1266  (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1267  (command_type == kRemove) || (command_type == kStatus) ||
1268  (command_type == kLimits) || (command_type == kPid);
1269  if (!immediate_command) num_commands++;
1270 
1271  if ((num_commands == kCommandBufferSize) || immediate_command)
1272  {
1273  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1274  description_buffer);
1275  if (!immediate_command) num_commands = 0;
1276  }
1277 
1278  if (immediate_command) {
1279  // Process cleanup, listings
1280  int return_pipe =
1281  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1282  if (return_pipe < 0) {
1283  num_commands = 0;
1284  continue;
1285  }
1286 
1287  int retval;
1288  sqlite3_stmt *this_stmt_list = NULL;
1289  switch (command_type) {
1290  case kRemove: {
1291  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1292  const string hash_str = hash.ToString();
1293  LogCvmfs(kLogQuota, kLogDebug, "manually removing %s",
1294  hash_str.c_str());
1295  bool success = false;
1296 
1297  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1298  hash_str.length(), SQLITE_STATIC);
1299  int retval;
1300  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1301  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1302  uint64_t is_pinned = sqlite3_column_int64(quota_mgr->stmt_size_, 1);
1303 
1304  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1305  hash_str.length(), SQLITE_STATIC);
1306  retval = sqlite3_step(quota_mgr->stmt_rm_);
1307  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1308  success = true;
1309  quota_mgr->gauge_ -= size;
1310  if (is_pinned) {
1311  quota_mgr->pinned_chunks_.erase(hash);
1312  quota_mgr->pinned_ -= size;
1313  }
1314  } else {
1316  "failed to delete %s (%d)", hash_str.c_str(), retval);
1317  }
1318  sqlite3_reset(quota_mgr->stmt_rm_);
1319  } else {
1320  // File does not exist
1321  success = true;
1322  }
1323  sqlite3_reset(quota_mgr->stmt_size_);
1324 
1325  WritePipe(return_pipe, &success, sizeof(success));
1326  break; }
1327  case kCleanup:
1328  retval = quota_mgr->DoCleanup(size);
1329  WritePipe(return_pipe, &retval, sizeof(retval));
1330  break;
1331  case kList:
1332  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_;
1333  case kListPinned:
1334  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_pinned_;
1335  case kListCatalogs:
1336  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_catalogs_;
1337  case kListVolatile:
1338  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_volatile_;
1339 
1340  // Pipe back the list, one by one
1341  int length;
1342  while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1343  string path = "(NULL)";
1344  if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1345  path = string(
1346  reinterpret_cast<const char *>(
1347  sqlite3_column_text(this_stmt_list, 0)));
1348  }
1349  length = path.length();
1350  WritePipe(return_pipe, &length, sizeof(length));
1351  if (length > 0)
1352  WritePipe(return_pipe, &path[0], length);
1353  }
1354  length = -1;
1355  WritePipe(return_pipe, &length, sizeof(length));
1356  sqlite3_reset(this_stmt_list);
1357  break;
1358  case kStatus:
1359  WritePipe(return_pipe, &quota_mgr->gauge_, sizeof(quota_mgr->gauge_));
1360  WritePipe(return_pipe, &quota_mgr->pinned_,
1361  sizeof(quota_mgr->pinned_));
1362  break;
1363  case kLimits:
1364  WritePipe(return_pipe, &quota_mgr->limit_, sizeof(quota_mgr->limit_));
1365  WritePipe(return_pipe, &quota_mgr->cleanup_threshold_,
1366  sizeof(quota_mgr->cleanup_threshold_));
1367  break;
1368  case kPid: {
1369  pid_t pid = getpid();
1370  WritePipe(return_pipe, &pid, sizeof(pid));
1371  break;
1372  }
1373  default:
1374  PANIC(NULL); // other types are handled by the bunch processor
1375  }
1376  quota_mgr->UnbindReturnPipe(return_pipe);
1377  num_commands = 0;
1378  }
1379  }
1380 
1381  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager (%d)", errno);
1382  close(quota_mgr->pipe_lru_[0]);
1383  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1384  description_buffer);
1385 
1386  // Unpin
1387  command_buffer[0].command_type = kTouch;
1388  for (map<shash::Any, uint64_t>::const_iterator i =
1389  quota_mgr->pinned_chunks_.begin(),
1390  iEnd = quota_mgr->pinned_chunks_.end(); i != iEnd; ++i)
1391  {
1392  command_buffer[0].StoreHash(i->first);
1393  quota_mgr->ProcessCommandBunch(1, command_buffer, description_buffer);
1394  }
1395 
1396  return NULL;
1397 }
1398 
1399 
1401  if (!shared_) {
1402  MakePipe(pipe);
1403  return;
1404  }
1405 
1406  // Create FIFO in cache directory, store path name (number) in pipe write end
1407  int i = 0;
1408  int retval;
1409  do {
1410  retval = mkfifo((workspace_dir_ + "/pipe" + StringifyInt(i)).c_str(), 0600);
1411  pipe[1] = i;
1412  i++;
1413  } while ((retval == -1) && (errno == EEXIST));
1414  assert(retval == 0);
1415 
1416  // Connect reader's end
1417  pipe[0] = open((workspace_dir_ + "/pipe" + StringifyInt(pipe[1])).c_str(),
1418  O_RDONLY | O_NONBLOCK);
1419  assert(pipe[0] >= 0);
1420  Nonblock2Block(pipe[0]);
1421 }
1422 
1423 
1425  const std::string cache_workspace,
1426  std::string *cache_dir,
1427  std::string *workspace_dir)
1428 {
1429  vector<string> dir_tokens(SplitString(cache_workspace, ':'));
1430  switch (dir_tokens.size()) {
1431  case 1:
1432  *cache_dir = *workspace_dir = dir_tokens[0];
1433  break;
1434  case 2:
1435  *cache_dir = dir_tokens[0];
1436  *workspace_dir = dir_tokens[1];
1437  break;
1438  default:
1439  PANIC(NULL);
1440  }
1441 }
1442 
1443 
1450  const shash::Any &hash,
1451  const uint64_t size,
1452  const string &description,
1453  const bool is_catalog)
1454 {
1455  assert((size > 0) || !is_catalog);
1456 
1457  const string hash_str = hash.ToString();
1458  LogCvmfs(kLogQuota, kLogDebug, "pin into lru %s, path %s",
1459  hash_str.c_str(), description.c_str());
1460 
1461  // Has to run when not yet spawned (cvmfs initialization)
1462  if (!spawned_) {
1463  // Code duplication here
1464  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1465  if (pinned_ + size > cleanup_threshold_) {
1466  LogCvmfs(kLogQuota, kLogDebug, "failed to insert %s (pinned), no space",
1467  hash_str.c_str());
1468  return false;
1469  } else {
1470  pinned_chunks_[hash] = size;
1471  pinned_ += size;
1472  CheckHighPinWatermark();
1473  }
1474  }
1475  bool exists = Contains(hash_str);
1476  if (!exists && (gauge_ + size > limit_)) {
1477  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1478  gauge_, size);
1479  int retval = DoCleanup(cleanup_threshold_);
1480  assert(retval != 0);
1481  }
1482  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1483  SQLITE_STATIC);
1484  sqlite3_bind_int64(stmt_new_, 2, size);
1485  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1486  sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1487  SQLITE_STATIC);
1488  sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1489  sqlite3_bind_int64(stmt_new_, 6, 1);
1490  int retval = sqlite3_step(stmt_new_);
1491  assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1492  sqlite3_reset(stmt_new_);
1493  if (!exists) gauge_ += size;
1494  return true;
1495  }
1496 
1497  int pipe_reserve[2];
1498  MakeReturnPipe(pipe_reserve);
1499 
1500  LruCommand cmd;
1501  cmd.command_type = kReserve;
1502  cmd.SetSize(size);
1503  cmd.StoreHash(hash);
1504  cmd.return_pipe = pipe_reserve[1];
1505  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1506  bool result;
1507  ReadHalfPipe(pipe_reserve[0], &result, sizeof(result));
1508  CloseReturnPipe(pipe_reserve);
1509 
1510  if (!result) return false;
1511  DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1512 
1513  return true;
1514 }
1515 
1516 
1518  const uint64_t limit,
1519  const uint64_t cleanup_threshold,
1520  const string &cache_workspace)
1521  : shared_(false)
1522  , spawned_(false)
1523  , limit_(limit)
1524  , cleanup_threshold_(cleanup_threshold)
1525  , gauge_(0)
1526  , pinned_(0)
1527  , seq_(0)
1528  , cache_dir_() // initialized in body
1529  , workspace_dir_() // initialized in body
1530  , fd_lock_cachedb_(-1)
1531  , async_delete_(true)
1532  , database_(NULL)
1533  , stmt_touch_(NULL)
1534  , stmt_unpin_(NULL)
1535  , stmt_block_(NULL)
1536  , stmt_unblock_(NULL)
1537  , stmt_new_(NULL)
1538  , stmt_lru_(NULL)
1539  , stmt_size_(NULL)
1540  , stmt_rm_(NULL)
1541  , stmt_list_(NULL)
1542  , stmt_list_pinned_(NULL)
1543  , stmt_list_catalogs_(NULL)
1544  , stmt_list_volatile_(NULL)
1545  , initialized_(false)
1546 {
1547  ParseDirectories(cache_workspace, &cache_dir_, &workspace_dir_);
1548  pipe_lru_[0] = pipe_lru_[1] = -1;
1549  cleanup_recorder_.AddRecorder(1, 90); // last 1.5 min with second resolution
1550  // last 1.5 h with minute resolution
1551  cleanup_recorder_.AddRecorder(60, 90*60);
1552  // last 18 hours with 20 min resolution
1553  cleanup_recorder_.AddRecorder(20*60, 60*60*18);
1554  // last 4 days with hour resolution
1555  cleanup_recorder_.AddRecorder(60*60, 60*60*24*4);
1556 }
1557 
1558 
1560  if (!initialized_) return;
1561 
1562  if (shared_) {
1563  // Most of cleanup is done elsewhen by shared cache manager
1564  close(pipe_lru_[1]);
1565  return;
1566  }
1567 
1568  if (spawned_) {
1569  char fin = 0;
1570  WritePipe(pipe_lru_[1], &fin, 1);
1571  close(pipe_lru_[1]);
1572  pthread_join(thread_lru_, NULL);
1573  } else {
1575  }
1576 
1577  CloseDatabase();
1578 }
1579 
1580 
1582  const unsigned num,
1583  const LruCommand *commands,
1584  const char *descriptions)
1585 {
1586  int retval = sqlite3_exec(database_, "BEGIN", NULL, NULL, NULL);
1587  assert(retval == SQLITE_OK);
1588 
1589  for (unsigned i = 0; i < num; ++i) {
1590  const shash::Any hash = commands[i].RetrieveHash();
1591  const string hash_str = hash.ToString();
1592  const unsigned size = commands[i].GetSize();
1593  LogCvmfs(kLogQuota, kLogDebug, "processing %s (%d)",
1594  hash_str.c_str(), commands[i].command_type);
1595 
1596  bool exists;
1597  switch (commands[i].command_type) {
1598  case kTouch:
1599  sqlite3_bind_int64(stmt_touch_, 1, seq_++);
1600  sqlite3_bind_text(stmt_touch_, 2, &hash_str[0], hash_str.length(),
1601  SQLITE_STATIC);
1602  retval = sqlite3_step(stmt_touch_);
1603  LogCvmfs(kLogQuota, kLogDebug, "touching %s (%ld): %d",
1604  hash_str.c_str(), seq_-1, retval);
1605  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1606  PANIC(kLogSyslogErr, "failed to update %s in cachedb, error %d",
1607  hash_str.c_str(), retval);
1608  }
1609  sqlite3_reset(stmt_touch_);
1610  break;
1611  case kUnpin:
1612  sqlite3_bind_text(stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1613  SQLITE_STATIC);
1614  retval = sqlite3_step(stmt_unpin_);
1615  LogCvmfs(kLogQuota, kLogDebug, "unpinning %s: %d",
1616  hash_str.c_str(), retval);
1617  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1618  PANIC(kLogSyslogErr, "failed to unpin %s in cachedb, error %d",
1619  hash_str.c_str(), retval);
1620  }
1621  sqlite3_reset(stmt_unpin_);
1622  break;
1623  case kPin:
1624  case kPinRegular:
1625  case kInsert:
1626  case kInsertVolatile:
1627  // It could already be in, check
1628  exists = Contains(hash_str);
1629 
1630  // Cleanup, move to trash and unlink
1631  if (!exists && (gauge_ + size > limit_)) {
1632  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %u",
1633  gauge_, size);
1634  retval = DoCleanup(cleanup_threshold_);
1635  assert(retval != 0);
1636  }
1637 
1638  // Insert or replace
1639  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1640  SQLITE_STATIC);
1641  sqlite3_bind_int64(stmt_new_, 2, size);
1642  if (commands[i].command_type == kInsertVolatile) {
1643  sqlite3_bind_int64(stmt_new_, 3, (seq_++) | kVolatileFlag);
1644  } else {
1645  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1646  }
1647  sqlite3_bind_text(stmt_new_, 4, &descriptions[i*kMaxDescription],
1648  commands[i].desc_length, SQLITE_STATIC);
1649  sqlite3_bind_int64(stmt_new_, 5, (commands[i].command_type == kPin) ?
1651  sqlite3_bind_int64(stmt_new_, 6,
1652  ((commands[i].command_type == kPin) ||
1653  (commands[i].command_type == kPinRegular)) ? 1 : 0);
1654  retval = sqlite3_step(stmt_new_);
1655  LogCvmfs(kLogQuota, kLogDebug, "insert or replace %s, method %d: %d",
1656  hash_str.c_str(), commands[i].command_type, retval);
1657  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1658  PANIC(kLogSyslogErr, "failed to insert %s in cachedb, error %d",
1659  hash_str.c_str(), retval);
1660  }
1661  sqlite3_reset(stmt_new_);
1662 
1663  if (!exists) gauge_ += size;
1664  break;
1665  default:
1666  // other types should have been taken care of by event loop
1667  PANIC(NULL);
1668  }
1669  }
1670 
1671  retval = sqlite3_exec(database_, "COMMIT", NULL, NULL, NULL);
1672  if (retval != SQLITE_OK) {
1673  PANIC(kLogSyslogErr, "failed to commit to cachedb, error %d", retval);
1674  }
1675 }
1676 
1677 
1679  bool result = false;
1680  string sql;
1681  sqlite3_stmt *stmt_select = NULL;
1682  sqlite3_stmt *stmt_insert = NULL;
1683  int sqlerr;
1684  int seq = 0;
1685  char hex[4];
1686  struct stat info;
1687  platform_dirent64 *d;
1688  DIR *dirp = NULL;
1689  string path;
1690 
1691  LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, "re-building cache database");
1692 
1693  // Empty cache catalog and fscache
1694  sql = "DELETE FROM cache_catalog; DELETE FROM fscache;";
1695  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1696  if (sqlerr != SQLITE_OK) {
1697  LogCvmfs(kLogQuota, kLogDebug, "could not clear cache database");
1698  goto build_return;
1699  }
1700 
1701  gauge_ = 0;
1702 
1703  // Insert files from cache sub-directories 00 - ff
1704  // TODO(jblomer): fs_traversal
1705  sqlite3_prepare_v2(database_, "INSERT INTO fscache (sha1, size, actime) "
1706  "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1707 
1708  for (int i = 0; i <= 0xff; i++) {
1709  snprintf(hex, sizeof(hex), "%02x", i);
1710  path = cache_dir_ + "/" + string(hex);
1711  if ((dirp = opendir(path.c_str())) == NULL) {
1713  "failed to open directory %s (tmpwatch interfering?)",
1714  path.c_str());
1715  goto build_return;
1716  }
1717  while ((d = platform_readdir(dirp)) != NULL) {
1718  string file_path = path + "/" + string(d->d_name);
1719  if (stat(file_path.c_str(), &info) == 0) {
1720  if (!S_ISREG(info.st_mode))
1721  continue;
1722  if (info.st_size == 0) {
1724  "removing empty file %s during automatic cache db rebuild",
1725  file_path.c_str());
1726  unlink(file_path.c_str());
1727  continue;
1728  }
1729 
1730  string hash = string(hex) + string(d->d_name);
1731  sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1732  SQLITE_STATIC);
1733  sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1734  sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1735  if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1736  LogCvmfs(kLogQuota, kLogDebug, "could not insert into temp table");
1737  goto build_return;
1738  }
1739  sqlite3_reset(stmt_insert);
1740 
1741  gauge_ += info.st_size;
1742  } else {
1743  LogCvmfs(kLogQuota, kLogDebug, "could not stat %s", file_path.c_str());
1744  }
1745  }
1746  closedir(dirp);
1747  dirp = NULL;
1748  }
1749  sqlite3_finalize(stmt_insert);
1750  stmt_insert = NULL;
1751 
1752  // Transfer from temp table in cache catalog
1753  sqlite3_prepare_v2(database_,
1754  "SELECT sha1, size FROM fscache ORDER BY actime;",
1755  -1, &stmt_select, NULL);
1756  sqlite3_prepare_v2(database_,
1757  "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1758  "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1759  -1, &stmt_insert, NULL);
1760  while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1761  const string hash = string(
1762  reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1763  sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1764  sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1765  sqlite3_bind_int64(stmt_insert, 3, seq++);
1766  // Might also be a catalog (information is lost)
1767  sqlite3_bind_int64(stmt_insert, 4, kFileRegular);
1768 
1769  int retval = sqlite3_step(stmt_insert);
1770  if (retval != SQLITE_DONE) {
1771  // If the file system hosting the cache is full, we'll likely notice here
1773  "could not insert into cache catalog (%d - %s)",
1774  retval, sqlite3_errstr(retval));
1775  goto build_return;
1776  }
1777  sqlite3_reset(stmt_insert);
1778  }
1779 
1780  // Delete temporary table
1781  sql = "DELETE FROM fscache;";
1782  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1783  if (sqlerr != SQLITE_OK) {
1784  LogCvmfs(kLogQuota, kLogDebug, "could not clear temporary table (%d)",
1785  sqlerr);
1786  goto build_return;
1787  }
1788 
1789  seq_ = seq;
1790  result = true;
1792  "rebuilding finished, sequence %" PRIu64 ", gauge %" PRIu64,
1793  seq_, gauge_);
1794 
1795  build_return:
1796  if (stmt_insert) sqlite3_finalize(stmt_insert);
1797  if (stmt_select) sqlite3_finalize(stmt_select);
1798  if (dirp) closedir(dirp);
1799  return result;
1800 }
1801 
1802 
1808  int back_channel[2],
1809  const string &channel_id)
1810 {
1811  if (protocol_revision_ >= 1) {
1812  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1813  MakeReturnPipe(back_channel);
1814 
1815  LruCommand cmd;
1817  cmd.return_pipe = back_channel[1];
1818  // Not StoreHash(). This is an MD5 hash.
1819  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1820  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1821 
1822  char success;
1823  ReadHalfPipe(back_channel[0], &success, sizeof(success));
1824  // At this point, the named FIFO is unlinked, so don't use CloseReturnPipe
1825  if (success != 'S') {
1827  "failed to register quota back channel (%c)", success);
1828  }
1829  } else {
1830  // Dummy pipe to return valid file descriptors
1831  MakePipe(back_channel);
1832  }
1833 }
1834 
1835 
1840  string hash_str = hash.ToString();
1841 
1842  int pipe_remove[2];
1843  MakeReturnPipe(pipe_remove);
1844 
1845  LruCommand cmd;
1846  cmd.command_type = kRemove;
1847  cmd.return_pipe = pipe_remove[1];
1848  cmd.StoreHash(hash);
1849  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1850 
1851  bool success;
1852  ReadHalfPipe(pipe_remove[0], &success, sizeof(success));
1853  CloseReturnPipe(pipe_remove);
1854 
1855  unlink((cache_dir_ + "/" + hash.MakePathWithoutSuffix()).c_str());
1856 }
1857 
1858 
1860  if (spawned_)
1861  return;
1862 
1863  if (pthread_create(&thread_lru_, NULL, MainCommandServer,
1864  static_cast<void *>(this)) != 0)
1865  {
1866  PANIC(kLogDebug, "could not create lru thread");
1867  }
1868 
1869  spawned_ = true;
1870 }
1871 
1872 
1877  LruCommand cmd;
1878  cmd.command_type = kTouch;
1879  cmd.StoreHash(hash);
1880  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1881 }
1882 
1883 
1885  if (shared_)
1886  close(pipe_wronly);
1887 }
1888 
1889 
1891  if (shared_)
1892  unlink((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str());
1893 }
1894 
1895 
1897  LogCvmfs(kLogQuota, kLogDebug, "Unpin %s", hash.ToString().c_str());
1898 
1899  LruCommand cmd;
1900  cmd.command_type = kUnpin;
1901  cmd.StoreHash(hash);
1902  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1903 }
1904 
1905 
1907  int back_channel[2],
1908  const string &channel_id)
1909 {
1910  if (protocol_revision_ >= 1) {
1911  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1912 
1913  LruCommand cmd;
1915  // Not StoreHash(). This is an MD5 hash.
1916  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1917  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1918 
1919  // Writer's end will be closed by cache manager, FIFO is already unlinked
1920  close(back_channel[0]);
1921  } else {
1922  ClosePipe(back_channel);
1923  }
1924 }
virtual uint32_t GetProtocolRevision()
Definition: quota_posix.cc:639
void SetLogSyslogFacility(const int local_facility)
Definition: logging.cc:183
sqlite3_stmt * stmt_new_
Definition: quota_posix.h:325
sqlite3_stmt * stmt_list_pinned_
Definition: quota_posix.h:330
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
Definition: statistics.cc:267
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
Definition: logging.cc:151
std::vector< std::string > DoList(const CommandType list_command)
Definition: quota_posix.cc:554
struct stat64 platform_stat64
virtual pid_t GetPid()
Definition: quota_posix.cc:620
virtual uint64_t GetCleanupRate(uint64_t period_s)
Definition: quota_posix.cc:688
sqlite3_stmt * stmt_list_
Definition: quota_posix.h:329
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
Definition: quota_posix.cc:596
virtual uint64_t GetSize()
Definition: quota_posix.cc:672
sqlite3_stmt * stmt_rm_
Definition: quota_posix.h:328
virtual bool Cleanup(const uint64_t leave_size)
Definition: quota_posix.cc:125
static const unsigned kSqliteMemPerThread
Definition: quota_posix.h:179
#define PANIC(...)
Definition: exception.h:29
virtual uint64_t GetMaxFileSize()
Definition: quota_posix.cc:615
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
uint64_t cleanup_threshold_
Definition: quota_posix.h:258
static const uint32_t kProtocolRevision
Definition: quota.h:45
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1846
void Daemonize()
Definition: posix.cc:1628
bool DoCleanup(const uint64_t leave_size)
Definition: quota_posix.cc:424
uint64_t GetSize() const
Definition: quota_posix.h:154
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: quota_posix.cc:892
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
Definition: quota_posix.cc:706
int BindReturnPipe(int pipe_wronly)
Definition: quota_posix.cc:64
std::string cache_dir_
Definition: quota_posix.h:279
bool Contains(const std::string &hash_str)
Definition: quota_posix.cc:189
perf::MultiRecorder cleanup_recorder_
Definition: quota_posix.h:318
int platform_stat(const char *path, platform_stat64 *buf)
void SetLogMicroSyslog(const std::string &filename)
Definition: logging.cc:272
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
sqlite3 * database_
Definition: quota_posix.h:320
unsigned char digest[digest_size_]
Definition: hash.h:124
virtual std::vector< std::string > ListCatalogs()
Definition: quota_posix.cc:934
void CheckHighPinWatermark()
Definition: quota_posix.cc:82
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
Definition: quota_posix.cc:658
bool FileExists(const std::string &path)
Definition: posix.cc:791
int GetLogSyslogLevel()
Definition: logging.cc:168
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:70
int64_t String2Int64(const string &value)
Definition: string.cc:222
unsigned GetDigestSize() const
Definition: hash.h:168
std::string workspace_dir_
Definition: quota_posix.h:286
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
Definition: quota_posix.cc:926
pthread_t thread_lru_
Definition: quota_posix.h:301
#define GetLogDebugFile()
void ReadHalfPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:525
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
void Nonblock2Block(int filedes)
Definition: posix.cc:636
virtual uint64_t GetSizePinned()
Definition: quota_posix.cc:680
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
Definition: quota_posix.cc:942
virtual uint64_t GetCapacity()
Definition: quota_posix.cc:580
void SetSize(const uint64_t new_size)
Definition: quota_posix.h:148
void LockBackChannels()
Definition: quota.h:96
shash::Any RetrieveHash() const
Definition: quota_posix.h:168
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
Definition: posix.cc:971
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
Definition: quota_posix.cc:230
sqlite3_stmt * stmt_touch_
Definition: quota_posix.h:321
string StringifyInt(const int64_t value)
Definition: string.cc:78
std::string GetLogMicroSyslog()
Definition: logging.cc:308
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
Definition: quota_posix.cc:529
sqlite3_stmt * stmt_list_volatile_
Definition: quota_posix.h:332
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
Definition: quota.h:109
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
Definition: quota_posix.cc:918
static int MainCacheManager(int argc, char **argv)
Definition: quota_posix.cc:950
std::map< shash::Md5, int > back_channels_
Definition: quota.h:94
static const unsigned kMaxDescription
Definition: quota_posix.h:191
bool CloseAllFildes(const std::set< int > &preserve_fildes)
Definition: posix.cc:1813
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: quota_posix.cc:906
unsigned char digest[shash::kMaxDigestSize]
Definition: quota_posix.h:132
std::string MakePathWithoutSuffix() const
Definition: hash.h:335
sqlite3_stmt * stmt_list_catalogs_
Definition: quota_posix.h:331
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
Definition: quota_posix.cc:262
uint64_t GetNoTicks(uint32_t retrospect_s) const
Definition: statistics.cc:272
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
platform_dirent64 * platform_readdir(DIR *dirp)
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:510
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
Definition: posix.cc:647
std::map< shash::Any, uint64_t > pinned_chunks_
Definition: quota_posix.h:291
sqlite3_stmt * stmt_unpin_
Definition: quota_posix.h:322
void CloseReturnPipe(int pipe[2])
Definition: quota_posix.cc:179
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
virtual void Spawn()
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:551
const unsigned kDigestSizes[]
Definition: hash.h:69
void UnlockBackChannels()
Definition: quota.h:100
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
Definition: quota_posix.h:160
static const uint64_t kVolatileFlag
Definition: quota_posix.h:204
void UnlockFile(const int filedes)
Definition: posix.cc:995
sqlite3_stmt * stmt_size_
Definition: quota_posix.h:327
struct dirent64 platform_dirent64
int GetLogSyslogFacility()
Definition: logging.cc:214
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528