CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quota_posix.cc
Go to the documentation of this file.
1 
15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
17 
18 
19 #include "quota_posix.h"
20 
21 #include <dirent.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdint.h>
28 #include <sys/dir.h>
29 #include <sys/stat.h>
30 #ifndef __APPLE__
31 #include <sys/statfs.h>
32 #endif
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 #include <cassert>
39 #include <cstdio>
40 #include <cstdlib>
41 #include <cstring>
42 #include <limits>
43 #include <map>
44 #include <set>
45 #include <string>
46 #include <vector>
47 
48 #include "crypto/hash.h"
49 #include "duplex_sqlite3.h"
50 #include "monitor.h"
51 #include "statistics.h"
52 #include "util/concurrency.h"
53 #include "util/exception.h"
54 #include "util/logging.h"
55 #include "util/platform.h"
56 #include "util/pointer.h"
57 #include "util/posix.h"
58 #include "util/smalloc.h"
59 #include "util/string.h"
60 
61 using namespace std; // NOLINT
62 
63 
64 int PosixQuotaManager::BindReturnPipe(int pipe_wronly) {
65  if (!shared_)
66  return pipe_wronly;
67 
68  // Connect writer's end
69  const int result =
70  open((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str(),
71  O_WRONLY | O_NONBLOCK);
72  if (result >= 0) {
73  Nonblock2Block(result);
74  } else {
76  "failed to bind return pipe (%d)", errno);
77  }
78  return result;
79 }
80 
81 
83  const uint64_t watermark = kHighPinWatermark * cleanup_threshold_ / 100;
84  if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86  "high watermark of pinned files (%" PRIu64 "M > %" PRIu64 "M)",
87  pinned_ / (1024 * 1024), watermark / (1024 * 1024));
88  BroadcastBackchannels("R"); // clients: please release pinned catalogs
89  }
90 }
91 
92 
94  DIR *dirp = opendir(workspace_dir_.c_str());
95  assert(dirp != NULL);
96 
97  platform_dirent64 *dent;
98  bool found_leftovers = false;
99  while ((dent = platform_readdir(dirp)) != NULL) {
100  const string name = dent->d_name;
101  const string path = workspace_dir_ + "/" + name;
102  platform_stat64 info;
103  const int retval = platform_stat(path.c_str(), &info);
104  if (retval != 0)
105  continue;
106  if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) == "pipe")) {
107  if (!found_leftovers) {
109  "removing left-over FIFOs from cache directory");
110  }
111  found_leftovers = true;
112  unlink(path.c_str());
113  }
114  }
115  closedir(dirp);
116 }
117 
118 
125 bool PosixQuotaManager::Cleanup(const uint64_t leave_size) {
126  if (!spawned_)
127  return DoCleanup(leave_size);
128 
129  bool result;
130  int pipe_cleanup[2];
131  MakeReturnPipe(pipe_cleanup);
132 
133  LruCommand cmd;
134  cmd.command_type = kCleanup;
135  cmd.size = leave_size;
136  cmd.return_pipe = pipe_cleanup[1];
137 
138  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
139  ManagedReadHalfPipe(pipe_cleanup[0], &result, sizeof(result));
140  CloseReturnPipe(pipe_cleanup);
141 
142  return result;
143 }
144 
145 
147  if (stmt_list_catalogs_)
148  sqlite3_finalize(stmt_list_catalogs_);
149  if (stmt_list_pinned_)
150  sqlite3_finalize(stmt_list_pinned_);
151  if (stmt_list_volatile_)
152  sqlite3_finalize(stmt_list_volatile_);
153  if (stmt_list_)
154  sqlite3_finalize(stmt_list_);
155  if (stmt_lru_)
156  sqlite3_finalize(stmt_lru_);
157  if (stmt_rm_)
158  sqlite3_finalize(stmt_rm_);
159  if (stmt_rm_batch_)
160  sqlite3_finalize(stmt_rm_batch_);
161  if (stmt_size_)
162  sqlite3_finalize(stmt_size_);
163  if (stmt_touch_)
164  sqlite3_finalize(stmt_touch_);
165  if (stmt_unpin_)
166  sqlite3_finalize(stmt_unpin_);
167  if (stmt_block_)
168  sqlite3_finalize(stmt_block_);
169  if (stmt_unblock_)
170  sqlite3_finalize(stmt_unblock_);
171  if (stmt_new_)
172  sqlite3_finalize(stmt_new_);
173  if (database_)
174  sqlite3_close(database_);
175  UnlockFile(fd_lock_cachedb_);
176 
177  stmt_list_catalogs_ = NULL;
178  stmt_list_pinned_ = NULL;
179  stmt_list_volatile_ = NULL;
180  stmt_list_ = NULL;
181  stmt_rm_ = NULL;
182  stmt_rm_batch_ = NULL;
183  stmt_size_ = NULL;
184  stmt_touch_ = NULL;
185  stmt_unpin_ = NULL;
186  stmt_block_ = NULL;
187  stmt_unblock_ = NULL;
188  stmt_new_ = NULL;
189  database_ = NULL;
190 
191  pinned_chunks_.clear();
192 }
193 
194 
196  if (shared_) {
197  close(pipe[0]);
198  UnlinkReturnPipe(pipe[1]);
199  } else {
200  ClosePipe(pipe);
201  }
202 }
203 
204 
205 bool PosixQuotaManager::Contains(const string &hash_str) {
206  bool result = false;
207 
208  sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
209  SQLITE_STATIC);
210  if (sqlite3_step(stmt_size_) == SQLITE_ROW)
211  result = true;
212  sqlite3_reset(stmt_size_);
213  LogCvmfs(kLogQuota, kLogDebug, "contains %s returns %d", hash_str.c_str(),
214  result);
215 
216  return result;
217 }
218 
219 
221  if ((limit_ == 0) || (gauge_ >= limit_))
222  return;
223 
224  struct statvfs vfs_info;
225  const int retval = statvfs((cache_dir_ + "/cachedb").c_str(), &vfs_info);
226  if (retval != 0) {
228  "failed to query %s for free space (%d)", cache_dir_.c_str(),
229  errno);
230  return;
231  }
232  const int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
233  LogCvmfs(kLogQuota, kLogDebug, "free space: %" PRId64 " MB",
234  free_space_byte / (1024 * 1024));
235 
236  const int64_t required_byte = limit_ - gauge_;
237  if (free_space_byte < required_byte) {
239  "too little free space on the file system hosting the cache,"
240  " %" PRId64 " MB available",
241  free_space_byte / (1024 * 1024));
242  }
243 }
244 
245 
246 PosixQuotaManager *PosixQuotaManager::Create(const string &cache_workspace,
247  const uint64_t limit,
248  const uint64_t cleanup_threshold,
249  const bool rebuild_database) {
250  if (cleanup_threshold >= limit) {
252  "invalid parameters: limit %" PRIu64 ", "
253  "cleanup_threshold %" PRIu64,
254  limit, cleanup_threshold);
255  return NULL;
256  }
257 
258  PosixQuotaManager *quota_manager = new PosixQuotaManager(
259  limit, cleanup_threshold, cache_workspace);
260 
261  // Initialize cache catalog
262  if (!quota_manager->InitDatabase(rebuild_database)) {
263  delete quota_manager;
264  return NULL;
265  }
266  quota_manager->CheckFreeSpace();
267  MakePipe(quota_manager->pipe_lru_);
268 
269  quota_manager->protocol_revision_ = kProtocolRevision;
270  quota_manager->initialized_ = true;
271  return quota_manager;
272 }
273 
274 
279  const std::string &exe_path,
280  const std::string &cache_workspace,
281  const uint64_t limit,
282  const uint64_t cleanup_threshold,
283  bool foreground) {
284  string cache_dir;
285  string workspace_dir;
286  ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
287 
288  pid_t new_cachemgr_pid;
289 
290  // Create lock file: only one fuse client at a time
291  const int fd_lockfile = LockFile(workspace_dir + "/lock_cachemgr");
292  if (fd_lockfile < 0) {
293  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
294  (workspace_dir + "/lock_cachemgr").c_str(), errno);
295  return NULL;
296  }
297 
298  PosixQuotaManager *quota_mgr = new PosixQuotaManager(limit, cleanup_threshold,
299  cache_workspace);
300  quota_mgr->shared_ = true;
301  quota_mgr->spawned_ = true;
302 
303  // Try to connect to pipe
304  const string fifo_path = workspace_dir + "/cachemgr";
305  LogCvmfs(kLogQuota, kLogDebug, "trying to connect to existing pipe");
306  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
307  if (quota_mgr->pipe_lru_[1] >= 0) {
308  const int fd_lockfile_rw = open((workspace_dir + "/lock_cachemgr").c_str(),
309  O_RDWR, 0600);
310  unsigned lockfile_magicnumber = 0;
311  const ssize_t result_mn = SafeRead(fd_lockfile_rw, &lockfile_magicnumber,
312  sizeof(lockfile_magicnumber));
313  const ssize_t result = SafeRead(fd_lockfile_rw, &new_cachemgr_pid,
314  sizeof(new_cachemgr_pid));
315  close(fd_lockfile_rw);
316 
317  if ((lockfile_magicnumber != kLockFileMagicNumber) || (result < 0)
318  || (result_mn < 0)
319  || (static_cast<size_t>(result) < sizeof(new_cachemgr_pid))) {
320  if (result != 0) {
322  "could not read cache manager pid from lockfile");
323  UnlockFile(fd_lockfile);
324  delete quota_mgr;
325  return NULL;
326  } else {
327  // support reload from old versions of the cache manager
328  // lock file is empty in this case, try a plain ReadHalfPipe to get pid
329  quota_mgr->SetCacheMgrPid(quota_mgr->GetPid());
330  }
331  } else {
332  quota_mgr->SetCacheMgrPid(new_cachemgr_pid);
333  }
334 
335 
336  LogCvmfs(kLogQuota, kLogDebug, "connected to existing cache manager pipe");
337  quota_mgr->initialized_ = true;
338  Nonblock2Block(quota_mgr->pipe_lru_[1]);
339  UnlockFile(fd_lockfile);
340  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
342  "received limit %" PRIu64 ", threshold %" PRIu64,
343  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
344  if (FileExists(workspace_dir + "/cachemgr.protocol")) {
345  quota_mgr->protocol_revision_ = quota_mgr->GetProtocolRevision();
346  LogCvmfs(kLogQuota, kLogDebug, "connected protocol revision %u",
347  quota_mgr->protocol_revision_);
348  } else {
349  LogCvmfs(kLogQuota, kLogDebug, "connected to ancient cache manager");
350  }
351  return quota_mgr;
352  }
353  const int connect_error = errno;
354 
355  // Lock file: let existing cache manager finish first
356  const int fd_lockfile_fifo = LockFile(workspace_dir + "/lock_cachemgr.fifo");
357  if (fd_lockfile_fifo < 0) {
358  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
359  (workspace_dir + "/lock_cachemgr.fifo").c_str(), errno);
360  UnlockFile(fd_lockfile);
361  delete quota_mgr;
362  return NULL;
363  }
364  UnlockFile(fd_lockfile_fifo);
365 
366  if (connect_error == ENXIO) {
367  LogCvmfs(kLogQuota, kLogDebug, "left-over FIFO found, unlinking");
368  unlink(fifo_path.c_str());
369  }
370 
371  // Creating a new FIFO for the cache manager (to be bound later)
372  int retval = mkfifo(fifo_path.c_str(), 0600);
373  if (retval != 0) {
374  LogCvmfs(kLogQuota, kLogDebug, "failed to create cache manager FIFO (%d)",
375  errno);
376  UnlockFile(fd_lockfile);
377  delete quota_mgr;
378  return NULL;
379  }
380 
381  // Create new cache manager
382  int pipe_boot[2];
383  int pipe_handshake[2];
384  MakePipe(pipe_boot);
385  MakePipe(pipe_handshake);
386 
387  vector<string> command_line;
388  command_line.push_back(exe_path);
389  command_line.push_back("__cachemgr__");
390  command_line.push_back(cache_workspace);
391  command_line.push_back(StringifyInt(pipe_boot[1]));
392  command_line.push_back(StringifyInt(pipe_handshake[0]));
393  command_line.push_back(StringifyInt(limit));
394  command_line.push_back(StringifyInt(cleanup_threshold));
395  // do not propagate foreground in order to reliably get pid from exec
396  // instead, daemonize right here
397  command_line.push_back(StringifyInt(true)); // foreground
398  command_line.push_back(StringifyInt(GetLogSyslogLevel()));
399  command_line.push_back(StringifyInt(GetLogSyslogFacility()));
400  command_line.push_back(GetLogDebugFile() + ":" + GetLogMicroSyslog());
401 
402  set<int> preserve_filedes;
403  preserve_filedes.insert(0);
404  preserve_filedes.insert(1);
405  preserve_filedes.insert(2);
406  preserve_filedes.insert(pipe_boot[1]);
407  preserve_filedes.insert(pipe_handshake[0]);
408 
409  if (foreground) {
410  retval = ManagedExec(command_line, preserve_filedes, map<int, int>(),
411  /*drop_credentials*/ false,
412  /*clear_env*/ false,
413  /*double_fork*/ true, &new_cachemgr_pid);
414  } else {
415  retval = ExecAsDaemon(command_line, &new_cachemgr_pid);
416  }
417  if (!retval) {
418  UnlockFile(fd_lockfile);
419  ClosePipe(pipe_boot);
420  ClosePipe(pipe_handshake);
421  delete quota_mgr;
422  LogCvmfs(kLogQuota, kLogDebug, "failed to start cache manager");
423  return NULL;
424  }
425  LogCvmfs(kLogQuota, kLogDebug, "new cache manager pid: %d", new_cachemgr_pid);
426  quota_mgr->SetCacheMgrPid(new_cachemgr_pid);
427  const int fd_lockfile_rw = open((workspace_dir + "/lock_cachemgr").c_str(),
428  O_RDWR | O_TRUNC, 0600);
429  const unsigned magic_number = PosixQuotaManager::kLockFileMagicNumber;
430  const bool result_mn = SafeWrite(fd_lockfile_rw, &magic_number,
431  sizeof(magic_number));
432  const bool result = SafeWrite(fd_lockfile_rw, &new_cachemgr_pid,
433  sizeof(new_cachemgr_pid));
434  if (!result || !result_mn) {
435  PANIC(kLogSyslogErr, "could not write cache manager pid to lockfile");
436  }
437 
438  close(fd_lockfile_rw);
439  // Wait for cache manager to be ready
440  close(pipe_boot[1]);
441  close(pipe_handshake[0]);
442  char buf;
443  if (read(pipe_boot[0], &buf, 1) != 1) {
444  UnlockFile(fd_lockfile);
445  close(pipe_boot[0]);
446  close(pipe_handshake[1]);
447  delete quota_mgr;
449  "cache manager did not start");
450  return NULL;
451  }
452  close(pipe_boot[0]);
453 
454  // Connect write end
455  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
456  if (quota_mgr->pipe_lru_[1] < 0) {
458  "failed to connect to newly created FIFO (%d)", errno);
459  close(pipe_handshake[1]);
460  UnlockFile(fd_lockfile);
461  delete quota_mgr;
462  return NULL;
463  }
464 
465  // Finalize handshake
466  buf = 'C';
467  if (write(pipe_handshake[1], &buf, 1) != 1) {
468  UnlockFile(fd_lockfile);
469  close(pipe_handshake[1]);
470  LogCvmfs(kLogQuota, kLogDebug, "could not finalize handshake");
471  delete quota_mgr;
472  return NULL;
473  }
474  close(pipe_handshake[1]);
475 
476  Nonblock2Block(quota_mgr->pipe_lru_[1]);
477  LogCvmfs(kLogQuota, kLogDebug, "connected to a new cache manager");
478  quota_mgr->protocol_revision_ = kProtocolRevision;
479 
480  UnlockFile(fd_lockfile);
481 
482  quota_mgr->initialized_ = true;
483  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
485  "received limit %" PRIu64 ", "
486  "threshold %" PRIu64,
487  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
488  return quota_mgr;
489 }
490 
491 
492 bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
493  if (gauge_ <= leave_size)
494  return true;
495 
496  // TODO(jblomer) transaction
498  "clean up cache until at most %lu KB is used", leave_size / 1024);
499  LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_);
500  cleanup_recorder_.Tick();
501 
502  bool result;
503  vector<string> trash;
504 
505  // Note that volatile files start counting from the smallest int64 number:
506  // the absolute sequence number with the first bit set in two's complement.
507  // So -1 can be a marker that will never appear in the database.
508  int64_t max_acseq = -1;
509  do {
510  sqlite3_reset(stmt_lru_);
511  sqlite3_bind_int64(stmt_lru_, 1,
512  (max_acseq == -1) ? std::numeric_limits<int64_t>::min()
513  : (max_acseq + 1));
514 
515  std::vector<EvictCandidate> candidates;
516  candidates.reserve(kEvictBatchSize);
517  string hash_str;
518  unsigned i = 0;
519  while (sqlite3_step(stmt_lru_) == SQLITE_ROW) {
520  hash_str = reinterpret_cast<const char *>(
521  sqlite3_column_text(stmt_lru_, 0));
522  LogCvmfs(kLogQuota, kLogDebug, "add %s to candidates for eviction",
523  hash_str.c_str());
524  candidates.push_back(
526  sqlite3_column_int64(stmt_lru_, 1),
527  sqlite3_column_int64(stmt_lru_, 2)));
528  i++;
529  }
530  if (candidates.empty()) {
531  LogCvmfs(kLogQuota, kLogDebug, "no more entries to evict");
532  break;
533  }
534 
535  const unsigned N = candidates.size();
536  for (i = 0; i < N; ++i) {
537  // That's a critical condition. We must not delete a not yet inserted
538  // pinned file as it is already reserved (but will be inserted later).
539  // Instead, set the pin bit in the db to not run into an endless loop
540  if (pinned_chunks_.find(candidates[i].hash) != pinned_chunks_.end()) {
541  hash_str = candidates[i].hash.ToString();
542  LogCvmfs(kLogQuota, kLogDebug, "skip %s for eviction",
543  hash_str.c_str());
544  sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
545  SQLITE_STATIC);
546  result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
547  sqlite3_reset(stmt_block_);
548  assert(result);
549  continue;
550  }
551 
552  trash.push_back(cache_dir_ + "/"
553  + candidates[i].hash.MakePathWithoutSuffix());
554  gauge_ -= candidates[i].size;
555  max_acseq = candidates[i].acseq;
556  LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64,
557  candidates[i].hash.ToString().c_str(), gauge_);
558 
559  if (gauge_ <= leave_size)
560  break;
561  }
562  } while (gauge_ > leave_size);
563 
564  if (max_acseq != -1) {
565  sqlite3_bind_int64(stmt_rm_batch_, 1, max_acseq);
566  result = (sqlite3_step(stmt_rm_batch_) == SQLITE_DONE);
567  assert(result);
568  sqlite3_reset(stmt_rm_batch_);
569 
570  result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
571  sqlite3_reset(stmt_unblock_);
572  assert(result);
573  }
574 
575  if (!EmptyTrash(trash))
576  return false;
577 
578  if (gauge_ > leave_size) {
580  "request to clean until %" PRIu64 ", "
581  "but effective gauge is %" PRIu64,
582  leave_size, gauge_);
583  return false;
584  }
585  return true;
586 }
587 
588 bool PosixQuotaManager::EmptyTrash(const std::vector<std::string> &trash) {
589  if (trash.empty())
590  return true;
591 
592  if (async_delete_) {
593  // Double fork avoids zombie, forked removal process must not flush file
594  // buffers
595  pid_t pid;
596  int statloc;
597  if ((pid = fork()) == 0) {
598  // TODO(jblomer): eviciting files in the cache should perhaps become a
599  // thread. This would also allow to block the chunks and prevent the
600  // race with re-insertion. Then again, a thread can block umount.
601 #ifndef DEBUGMSG
602  CloseAllFildes(std::set<int>());
603 #endif
604  if (fork() == 0) {
605  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
606  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
607  unlink(trash[i].c_str());
608  }
609  _exit(0);
610  }
611  _exit(0);
612  } else {
613  if (pid > 0)
614  waitpid(pid, &statloc, 0);
615  else
616  return false;
617  }
618  } else { // !async_delete_
619  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
620  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
621  unlink(trash[i].c_str());
622  }
623  }
624  return true;
625 }
626 
627 
629  const uint64_t size,
630  const string &description,
631  const CommandType command_type) {
632  const string hash_str = hash.ToString();
633  LogCvmfs(kLogQuota, kLogDebug, "insert into lru %s, path %s, method %d",
634  hash_str.c_str(), description.c_str(), command_type);
635  const unsigned desc_length = (description.length() > kMaxDescription)
636  ? kMaxDescription
637  : description.length();
638 
639  LruCommand *cmd = reinterpret_cast<LruCommand *>(
640  alloca(sizeof(LruCommand) + desc_length));
641  new (cmd) LruCommand;
642  cmd->command_type = command_type;
643  cmd->SetSize(size);
644  cmd->StoreHash(hash);
645  cmd->desc_length = desc_length;
646  memcpy(reinterpret_cast<char *>(cmd) + sizeof(LruCommand), &description[0],
647  desc_length);
648  WritePipe(pipe_lru_[1], cmd, sizeof(LruCommand) + desc_length);
649 }
650 
651 
652 vector<string> PosixQuotaManager::DoList(const CommandType list_command) {
653  vector<string> result;
654 
655  int pipe_list[2];
656  MakeReturnPipe(pipe_list);
657  char description_buffer[kMaxDescription];
658 
659  LruCommand cmd;
660  cmd.command_type = list_command;
661  cmd.return_pipe = pipe_list[1];
662  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
663 
664  int length;
665  do {
666  ManagedReadHalfPipe(pipe_list[0], &length, sizeof(length));
667  if (length > 0) {
668  ReadPipe(pipe_list[0], description_buffer, length);
669  result.push_back(string(description_buffer, length));
670  }
671  } while (length >= 0);
672 
673  CloseReturnPipe(pipe_list);
674  return result;
675 }
676 
677 
679  if (limit_ != (uint64_t)(-1))
680  return limit_;
681 
682  // Unrestricted cache, look at free space on cache dir fs
683  struct statfs info;
684  if (statfs(".", &info) == 0) {
685  return info.f_bavail * info.f_bsize;
686  } else {
688  "failed to query file system info of cache (%d)", errno);
689  return limit_;
690  }
691 }
692 
693 
694 void PosixQuotaManager::GetLimits(uint64_t *limit,
695  uint64_t *cleanup_threshold) {
696  int pipe_limits[2];
697  MakeReturnPipe(pipe_limits);
698 
699  LruCommand cmd;
700  cmd.command_type = kLimits;
701  cmd.return_pipe = pipe_limits[1];
702  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
703  ManagedReadHalfPipe(pipe_limits[0], limit, sizeof(*limit));
704  ReadPipe(pipe_limits[0], cleanup_threshold, sizeof(*cleanup_threshold));
705  CloseReturnPipe(pipe_limits);
706 }
707 
708 
714  return limit_ - cleanup_threshold_;
715 }
716 
717 
719  if (!shared_ || !spawned_) {
720  return getpid();
721  }
722  if (cachemgr_pid_) {
723  return cachemgr_pid_;
724  }
725 
726  pid_t result;
727  int pipe_pid[2];
728  MakeReturnPipe(pipe_pid);
729 
730  LruCommand cmd;
731  cmd.command_type = kPid;
732  cmd.return_pipe = pipe_pid[1];
733  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
734  ReadHalfPipe(pipe_pid[0], &result, sizeof(result));
735  CloseReturnPipe(pipe_pid);
736  return result;
737 }
738 
739 
741  int pipe_revision[2];
742  MakeReturnPipe(pipe_revision);
743 
744  LruCommand cmd;
745  cmd.command_type = kGetProtocolRevision;
746  cmd.return_pipe = pipe_revision[1];
747  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
748 
749  uint32_t revision;
750  ManagedReadHalfPipe(pipe_revision[0], &revision, sizeof(revision));
751  CloseReturnPipe(pipe_revision);
752  return revision;
753 }
754 
755 
759 void PosixQuotaManager::GetSharedStatus(uint64_t *gauge, uint64_t *pinned) {
760  int pipe_status[2];
761  MakeReturnPipe(pipe_status);
762 
763  LruCommand cmd;
764  cmd.command_type = kStatus;
765  cmd.return_pipe = pipe_status[1];
766  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
767  ManagedReadHalfPipe(pipe_status[0], gauge, sizeof(*gauge));
768  ReadPipe(pipe_status[0], pinned, sizeof(*pinned));
769  CloseReturnPipe(pipe_status);
770 }
771 
772 bool PosixQuotaManager::SetSharedLimit(uint64_t limit) {
773  int pipe_set_limit[2];
774  bool result;
775  MakeReturnPipe(pipe_set_limit);
776 
777  LruCommand cmd;
778  cmd.command_type = kSetLimit;
779  cmd.size = limit;
780  cmd.return_pipe = pipe_set_limit[1];
781  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
782  ReadHalfPipe(pipe_set_limit[0], &result, sizeof(result));
783  CloseReturnPipe(pipe_set_limit);
784  return result;
785 }
786 
787 
789  if (!spawned_) {
790  limit_ = size;
791  cleanup_threshold_ = size / 2;
793  "Quota limit set to %lu / threshold %lu", limit_,
794  cleanup_threshold_);
795  return true;
796  }
797  return SetSharedLimit(size);
798 }
799 
801  if (!spawned_)
802  return gauge_;
803  uint64_t gauge, size_pinned;
804  GetSharedStatus(&gauge, &size_pinned);
805  return gauge;
806 }
807 
808 
810  if (!spawned_)
811  return pinned_;
812  uint64_t gauge, size_pinned;
813  GetSharedStatus(&gauge, &size_pinned);
814  return size_pinned;
815 }
816 
817 
818 uint64_t PosixQuotaManager::GetCleanupRate(uint64_t period_s) {
819  if (!spawned_ || (protocol_revision_ < 2))
820  return 0;
821  uint64_t cleanup_rate;
822 
823  int pipe_cleanup_rate[2];
824  MakeReturnPipe(pipe_cleanup_rate);
825  LruCommand cmd;
826  cmd.command_type = kCleanupRate;
827  cmd.size = period_s;
828  cmd.return_pipe = pipe_cleanup_rate[1];
829  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
830  ManagedReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate,
831  sizeof(cleanup_rate));
832  CloseReturnPipe(pipe_cleanup_rate);
833 
834  return cleanup_rate;
835 }
836 
837 
838 bool PosixQuotaManager::InitDatabase(const bool rebuild_database) {
839  string sql;
840  sqlite3_stmt *stmt;
841 
842  fd_lock_cachedb_ = LockFile(workspace_dir_ + "/lock_cachedb");
843  if (fd_lock_cachedb_ < 0) {
844  LogCvmfs(kLogQuota, kLogDebug, "failed to create cachedb lock");
845  return false;
846  }
847 
848  bool retry = false;
849  const string db_file = cache_dir_ + "/cachedb";
850  if (rebuild_database) {
851  LogCvmfs(kLogQuota, kLogDebug, "rebuild database, unlinking existing (%s)",
852  db_file.c_str());
853  unlink(db_file.c_str());
854  unlink((db_file + "-journal").c_str());
855  }
856 
857 init_recover:
858  int err = sqlite3_open(db_file.c_str(), &database_);
859  if (err != SQLITE_OK) {
860  LogCvmfs(kLogQuota, kLogDebug, "could not open cache database (%d)", err);
861  goto init_database_fail;
862  }
863  // TODO(reneme): make this a `QuotaDatabase : public sqlite::Database`
864  sql = "PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
865  "PRAGMA auto_vacuum=1; "
866  "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
867  " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
868  "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
869  "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
870  " ON cache_catalog (acseq); "
871  "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
872  "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
873  "CREATE INDEX idx_fscache_actime ON fscache (actime); "
874  "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
875  " CONSTRAINT pk_properties PRIMARY KEY(key));";
876  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
877  if (err != SQLITE_OK) {
878  if (!retry) {
879  retry = true;
880  sqlite3_close(database_);
881  unlink(db_file.c_str());
882  unlink((db_file + "-journal").c_str());
884  "LRU database corrupted, re-building");
885  goto init_recover;
886  }
887  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
888  sql.c_str());
889  goto init_database_fail;
890  }
891 
892  // If this an old cache catalog,
893  // add and initialize new columns to cache_catalog
894  sql = "ALTER TABLE cache_catalog ADD type INTEGER; "
895  "ALTER TABLE cache_catalog ADD pinned INTEGER";
896  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
897  if (err == SQLITE_OK) {
898  sql = "UPDATE cache_catalog SET type=" + StringifyInt(kFileRegular) + ";";
899  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
900  if (err != SQLITE_OK) {
902  "could not init cache database (failed: %s)", sql.c_str());
903  goto init_database_fail;
904  }
905  }
906 
907  // Set pinned back
908  sql = "UPDATE cache_catalog SET pinned=0;";
909  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
910  if (err != SQLITE_OK) {
911  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
912  sql.c_str());
913  goto init_database_fail;
914  }
915 
916  // Set schema version
917  sql = "INSERT OR REPLACE INTO properties (key, value) "
918  "VALUES ('schema', '1.0')";
919  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
920  if (err != SQLITE_OK) {
921  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
922  sql.c_str());
923  goto init_database_fail;
924  }
925 
926  // If cache catalog is empty, recreate from file system
927  sql = "SELECT count(*) FROM cache_catalog;";
928  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
929  if (sqlite3_step(stmt) == SQLITE_ROW) {
930  if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
932  "CernVM-FS: building lru cache database...");
933  if (!RebuildDatabase()) {
935  "could not build cache database from file system");
936  sqlite3_finalize(stmt);
937  goto init_database_fail;
938  }
939  }
940  sqlite3_finalize(stmt);
941  } else {
942  LogCvmfs(kLogQuota, kLogDebug, "could not select on cache catalog");
943  sqlite3_finalize(stmt);
944  goto init_database_fail;
945  }
946 
947  // How many bytes do we already have in cache?
948  sql = "SELECT sum(size) FROM cache_catalog;";
949  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
950  if (sqlite3_step(stmt) == SQLITE_ROW) {
951  gauge_ = sqlite3_column_int64(stmt, 0);
952  } else {
953  LogCvmfs(kLogQuota, kLogDebug, "could not determine cache size");
954  sqlite3_finalize(stmt);
955  goto init_database_fail;
956  }
957  sqlite3_finalize(stmt);
958 
959  // Highest seq-no?
960  sql = "SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
961  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
962  if (sqlite3_step(stmt) == SQLITE_ROW) {
963  seq_ = sqlite3_column_int64(stmt, 0) + 1;
964  } else {
965  LogCvmfs(kLogQuota, kLogDebug, "could not determine highest seq-no");
966  sqlite3_finalize(stmt);
967  goto init_database_fail;
968  }
969  sqlite3_finalize(stmt);
970 
971  // Prepare touch, new, remove statements
972  sqlite3_prepare_v2(database_,
973  "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
974  "WHERE sha1=:sha1;",
975  -1, &stmt_touch_, NULL);
976  sqlite3_prepare_v2(database_,
977  "UPDATE cache_catalog SET pinned=0 "
978  "WHERE sha1=:sha1;",
979  -1, &stmt_unpin_, NULL);
980  sqlite3_prepare_v2(database_,
981  "UPDATE cache_catalog SET pinned=2 "
982  "WHERE sha1=:sha1;",
983  -1, &stmt_block_, NULL);
984  sqlite3_prepare_v2(database_,
985  "UPDATE cache_catalog SET pinned=1 "
986  "WHERE pinned=2;",
987  -1, &stmt_unblock_, NULL);
988  sqlite3_prepare_v2(database_,
989  "INSERT OR REPLACE INTO cache_catalog "
990  "(sha1, size, acseq, path, type, pinned) "
991  "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
992  -1, &stmt_new_, NULL);
993  sqlite3_prepare_v2(database_,
994  "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
995  -1, &stmt_size_, NULL);
996  sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;",
997  -1, &stmt_rm_, NULL);
998  sqlite3_prepare_v2(database_,
999  "DELETE FROM cache_catalog WHERE acseq<=:a AND pinned<>2;",
1000  -1, &stmt_rm_batch_, NULL);
1001  sqlite3_prepare_v2(database_,
1002  (std::string("SELECT sha1, size, acseq FROM cache_catalog "
1003  "WHERE pinned<>2 AND acseq>=:a "
1004  "ORDER BY acseq ASC "
1005  "LIMIT ")
1006  + StringifyInt(kEvictBatchSize) + ";")
1007  .c_str(),
1008  -1, &stmt_lru_, NULL);
1009  sqlite3_prepare_v2(database_,
1010  ("SELECT path FROM cache_catalog WHERE type="
1011  + StringifyInt(kFileRegular) + ";")
1012  .c_str(),
1013  -1, &stmt_list_, NULL);
1014  sqlite3_prepare_v2(database_,
1015  "SELECT path FROM cache_catalog WHERE pinned<>0;", -1,
1016  &stmt_list_pinned_, NULL);
1017  sqlite3_prepare_v2(database_,
1018  "SELECT path FROM cache_catalog WHERE acseq < 0;", -1,
1019  &stmt_list_volatile_, NULL);
1020  sqlite3_prepare_v2(database_,
1021  ("SELECT path FROM cache_catalog WHERE type="
1022  + StringifyInt(kFileCatalog) + ";")
1023  .c_str(),
1024  -1, &stmt_list_catalogs_, NULL);
1025  return true;
1026 
1027 init_database_fail:
1028  sqlite3_close(database_);
1029  database_ = NULL;
1030  UnlockFile(fd_lock_cachedb_);
1031  return false;
1032 }
1033 
1034 
1040  const uint64_t size,
1041  const string &description) {
1042  DoInsert(any_hash, size, description, kInsert);
1043 }
1044 
1045 
1052  const uint64_t size,
1053  const string &description) {
1054  DoInsert(any_hash, size, description, kInsertVolatile);
1055 }
1056 
1057 
1061 vector<string> PosixQuotaManager::List() { return DoList(kList); }
1062 
1063 
1067 vector<string> PosixQuotaManager::ListPinned() { return DoList(kListPinned); }
1068 
1069 
1074  return DoList(kListCatalogs);
1075 }
1076 
1077 
1082  return DoList(kListVolatile);
1083 }
1084 
1085 
1089 int PosixQuotaManager::MainCacheManager(int argc, char **argv) {
1090  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1091  int retval;
1092 
1093  PosixQuotaManager shared_manager(0, 0, "");
1094  shared_manager.shared_ = true;
1095  shared_manager.spawned_ = true;
1096  shared_manager.pinned_ = 0;
1097 
1098  // Process command line arguments
1099  ParseDirectories(string(argv[2]),
1100  &shared_manager.cache_dir_,
1101  &shared_manager.workspace_dir_);
1102  const int pipe_boot = String2Int64(argv[3]);
1103  const int pipe_handshake = String2Int64(argv[4]);
1104  shared_manager.limit_ = String2Int64(argv[5]);
1105  shared_manager.cleanup_threshold_ = String2Int64(argv[6]);
1106  const int foreground = String2Int64(argv[7]);
1107  const int syslog_level = String2Int64(argv[8]);
1108  const int syslog_facility = String2Int64(argv[9]);
1109  vector<string> logfiles = SplitString(argv[10], ':');
1110 
1111  SetLogSyslogLevel(syslog_level);
1112  SetLogSyslogFacility(syslog_facility);
1113  if ((logfiles.size() > 0) && (logfiles[0] != ""))
1114  SetLogDebugFile(logfiles[0] + ".cachemgr");
1115  if (logfiles.size() > 1)
1116  SetLogMicroSyslog(logfiles[1]);
1117 
1118  if (!foreground)
1119  Daemonize();
1120 
1121  const UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
1122  assert(watchdog.IsValid());
1123  watchdog->Spawn("./stacktrace.cachemgr");
1124 
1125  // Initialize pipe, open non-blocking as cvmfs is not yet connected
1126  const int fd_lockfile_fifo = LockFile(shared_manager.workspace_dir_
1127  + "/lock_cachemgr.fifo");
1128  if (fd_lockfile_fifo < 0) {
1130  "could not open lock file "
1131  "%s (%d)",
1132  (shared_manager.workspace_dir_ + "/lock_cachemgr.fifo").c_str(),
1133  errno);
1134  return 1;
1135  }
1136  const string crash_guard = shared_manager.cache_dir_ + "/cachemgr.running";
1137  const bool rebuild = FileExists(crash_guard);
1138  retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1139  if (retval < 0) {
1141  "failed to create shared cache manager crash guard");
1142  UnlockFile(fd_lockfile_fifo);
1143  return 1;
1144  }
1145  close(retval);
1146 
1147  // Redirect SQlite temp directory to cache (global variable)
1148  const string tmp_dir = shared_manager.workspace_dir_;
1149  sqlite3_temp_directory = static_cast<char *>(
1150  sqlite3_malloc(tmp_dir.length() + 1));
1151  snprintf(sqlite3_temp_directory, tmp_dir.length() + 1, "%s", tmp_dir.c_str());
1152 
1153  // Cleanup leftover named pipes
1154  shared_manager.CleanupPipes();
1155 
1156  if (!shared_manager.InitDatabase(rebuild)) {
1157  UnlockFile(fd_lockfile_fifo);
1158  return 1;
1159  }
1160  shared_manager.CheckFreeSpace();
1161 
1162  // Save protocol revision to file. If the file is not found, it indicates
1163  // to the client that the cache manager is from times before the protocol
1164  // was versioned.
1165  const string protocol_revision_path = shared_manager.workspace_dir_
1166  + "/cachemgr.protocol";
1167  retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1168  if (retval < 0) {
1170  "failed to open protocol revision file (%d)", errno);
1171  UnlockFile(fd_lockfile_fifo);
1172  return 1;
1173  }
1174  const string revision = StringifyInt(kProtocolRevision);
1175  const int written = write(retval, revision.data(), revision.length());
1176  close(retval);
1177  if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1179  "failed to write protocol revision (%d)", errno);
1180  UnlockFile(fd_lockfile_fifo);
1181  return 1;
1182  }
1183 
1184  const string fifo_path = shared_manager.workspace_dir_ + "/cachemgr";
1185  shared_manager.pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1186  if (shared_manager.pipe_lru_[0] < 0) {
1187  LogCvmfs(kLogQuota, kLogDebug, "failed to listen on FIFO %s (%d)",
1188  fifo_path.c_str(), errno);
1189  UnlockFile(fd_lockfile_fifo);
1190  return 1;
1191  }
1192  Nonblock2Block(shared_manager.pipe_lru_[0]);
1193  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager listening");
1194 
1195  char buf = 'C';
1196  WritePipe(pipe_boot, &buf, 1);
1197  close(pipe_boot);
1198 
1199  ReadPipe(pipe_handshake, &buf, 1);
1200  close(pipe_handshake);
1201  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager handshake done");
1202 
1203  // Ensure that broken pipes from clients do not kill the cache manager
1204  signal(SIGPIPE, SIG_IGN);
1205  // Don't let Ctrl-C ungracefully kill interactive session
1206  signal(SIGINT, SIG_IGN);
1207 
1208  shared_manager.MainCommandServer(&shared_manager);
1209  unlink(fifo_path.c_str());
1210  unlink(protocol_revision_path.c_str());
1211  shared_manager.CloseDatabase();
1212  unlink(crash_guard.c_str());
1213  UnlockFile(fd_lockfile_fifo);
1214 
1215  if (sqlite3_temp_directory) {
1216  sqlite3_free(sqlite3_temp_directory);
1217  sqlite3_temp_directory = NULL;
1218  }
1219 
1220  return 0;
1221 }
1222 
1223 
1225  PosixQuotaManager *quota_mgr = static_cast<PosixQuotaManager *>(data);
1226 
1227  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1228  sqlite3_soft_heap_limit(quota_mgr->kSqliteMemPerThread);
1229 
1230  LruCommand command_buffer[kCommandBufferSize];
1231  char description_buffer[kCommandBufferSize * kMaxDescription];
1232  unsigned num_commands = 0;
1233 
1234  while (read(quota_mgr->pipe_lru_[0], &command_buffer[num_commands],
1235  sizeof(command_buffer[0]))
1236  == sizeof(command_buffer[0])) {
1237  const CommandType command_type = command_buffer[num_commands].command_type;
1238  LogCvmfs(kLogQuota, kLogDebug, "received command %d", command_type);
1239  const uint64_t size = command_buffer[num_commands].GetSize();
1240 
1241  // Inserts and pins come with a description (usually a path)
1242  if ((command_type == kInsert) || (command_type == kInsertVolatile)
1243  || (command_type == kPin) || (command_type == kPinRegular)) {
1244  const int desc_length = command_buffer[num_commands].desc_length;
1245  ReadPipe(quota_mgr->pipe_lru_[0],
1246  &description_buffer[kMaxDescription * num_commands],
1247  desc_length);
1248  }
1249 
1250  // The protocol revision is returned immediately
1251  if (command_type == kGetProtocolRevision) {
1252  const int return_pipe =
1253  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1254  if (return_pipe < 0)
1255  continue;
1256  WritePipe(return_pipe, &quota_mgr->kProtocolRevision,
1257  sizeof(quota_mgr->kProtocolRevision));
1258  quota_mgr->UnbindReturnPipe(return_pipe);
1259  continue;
1260  }
1261 
1262  // The cleanup rate is returned immediately
1263  if (command_type == kCleanupRate) {
1264  const int return_pipe =
1265  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1266  if (return_pipe < 0)
1267  continue;
1268  const uint64_t period_s =
1269  size; // use the size field to transmit the period
1270  uint64_t rate = quota_mgr->cleanup_recorder_.GetNoTicks(period_s);
1271  WritePipe(return_pipe, &rate, sizeof(rate));
1272  quota_mgr->UnbindReturnPipe(return_pipe);
1273  continue;
1274  }
1275 
1276  if (command_type == kSetLimit) {
1277  const int return_pipe = quota_mgr->BindReturnPipe(
1278  command_buffer[num_commands].return_pipe);
1279  if (return_pipe < 0)
1280  continue;
1281  quota_mgr->limit_ = size; // use the size field to transmit the size
1282  quota_mgr->cleanup_threshold_ = size / 2;
1284  "Quota limit set to %lu / threshold %lu", quota_mgr->limit_,
1285  quota_mgr->cleanup_threshold_);
1286  bool ret = true;
1287  WritePipe(return_pipe, &ret, sizeof(ret));
1288  quota_mgr->UnbindReturnPipe(return_pipe);
1289  continue;
1290  }
1291 
1292  // Reservations are handled immediately and "out of band"
1293  if (command_type == kReserve) {
1294  bool success = true;
1295  const int return_pipe =
1296  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1297  if (return_pipe < 0)
1298  continue;
1299 
1300  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1301  const string hash_str(hash.ToString());
1302  LogCvmfs(kLogQuota, kLogDebug, "reserve %lu bytes for %s", size,
1303  hash_str.c_str());
1304 
1305  if (quota_mgr->pinned_chunks_.find(hash)
1306  == quota_mgr->pinned_chunks_.end()) {
1307  if ((quota_mgr->pinned_ + size) > quota_mgr->cleanup_threshold_) {
1309  "failed to insert %s (pinned), no space", hash_str.c_str());
1310  success = false;
1311  } else {
1312  quota_mgr->pinned_chunks_[hash] = size;
1313  quota_mgr->pinned_ += size;
1314  quota_mgr->CheckHighPinWatermark();
1315  }
1316  }
1317 
1318  WritePipe(return_pipe, &success, sizeof(success));
1319  quota_mgr->UnbindReturnPipe(return_pipe);
1320  continue;
1321  }
1322 
1323  // Back channels are also handled out of band
1324  if (command_type == kRegisterBackChannel) {
1325  const int return_pipe =
1326  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1327  if (return_pipe < 0)
1328  continue;
1329 
1330  quota_mgr->UnlinkReturnPipe(command_buffer[num_commands].return_pipe);
1331  Block2Nonblock(return_pipe); // back channels are opportunistic
1332  shash::Md5 hash;
1333  memcpy(hash.digest, command_buffer[num_commands].digest,
1335 
1336  quota_mgr->LockBackChannels();
1337  const map<shash::Md5, int>::const_iterator iter =
1338  quota_mgr->back_channels_.find(hash);
1339  if (iter != quota_mgr->back_channels_.end()) {
1341  "closing left-over back channel %s", hash.ToString().c_str());
1342  close(iter->second);
1343  }
1344  quota_mgr->back_channels_[hash] = return_pipe;
1345  quota_mgr->UnlockBackChannels();
1346 
1347  char success = 'S';
1348  WritePipe(return_pipe, &success, sizeof(success));
1349  LogCvmfs(kLogQuota, kLogDebug, "register back channel %s on fd %d",
1350  hash.ToString().c_str(), return_pipe);
1351 
1352  continue;
1353  }
1354 
1355  if (command_type == kUnregisterBackChannel) {
1356  shash::Md5 hash;
1357  memcpy(hash.digest, command_buffer[num_commands].digest,
1359 
1360  quota_mgr->LockBackChannels();
1361  const map<shash::Md5, int>::iterator iter =
1362  quota_mgr->back_channels_.find(hash);
1363  if (iter != quota_mgr->back_channels_.end()) {
1364  LogCvmfs(kLogQuota, kLogDebug, "closing back channel %s",
1365  hash.ToString().c_str());
1366  close(iter->second);
1367  quota_mgr->back_channels_.erase(iter);
1368  } else {
1370  "did not find back channel %s", hash.ToString().c_str());
1371  }
1372  quota_mgr->UnlockBackChannels();
1373 
1374  continue;
1375  }
1376 
1377  // Unpinnings are also handled immediately with respect to the pinned gauge
1378  if (command_type == kUnpin) {
1379  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1380  const string hash_str(hash.ToString());
1381 
1382  const map<shash::Any, uint64_t>::iterator iter =
1383  quota_mgr->pinned_chunks_.find(hash);
1384  if (iter != quota_mgr->pinned_chunks_.end()) {
1385  quota_mgr->pinned_ -= iter->second;
1386  quota_mgr->pinned_chunks_.erase(iter);
1387  // It can happen that files get pinned that were removed from the cache
1388  // (see cache.cc). We fix this at this point, where we remove such
1389  // entries from the cache database.
1390  if (!FileExists(quota_mgr->cache_dir_ + "/"
1391  + hash.MakePathWithoutSuffix())) {
1393  "remove orphaned pinned hash %s from cache database",
1394  hash_str.c_str());
1395  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1396  hash_str.length(), SQLITE_STATIC);
1397  int retval;
1398  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1399  const uint64_t size =
1400  sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1401  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1402  hash_str.length(), SQLITE_STATIC);
1403  retval = sqlite3_step(quota_mgr->stmt_rm_);
1404  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1405  quota_mgr->gauge_ -= size;
1406  } else {
1408  "failed to delete %s (%d)", hash_str.c_str(), retval);
1409  }
1410  sqlite3_reset(quota_mgr->stmt_rm_);
1411  }
1412  sqlite3_reset(quota_mgr->stmt_size_);
1413  }
1414  } else {
1415  LogCvmfs(kLogQuota, kLogDebug, "this chunk was not pinned");
1416  }
1417  }
1418 
1419  // Immediate commands trigger flushing of the buffer
1420  const bool immediate_command =
1421  (command_type == kCleanup) || (command_type == kList) ||
1422  (command_type == kListPinned) || (command_type == kListCatalogs) ||
1423  (command_type == kListVolatile) || (command_type == kRemove) ||
1424  (command_type == kStatus) || (command_type == kLimits) ||
1425  (command_type == kPid);
1426  if (!immediate_command)
1427  num_commands++;
1428 
1429  if ((num_commands == kCommandBufferSize) || immediate_command) {
1430  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1431  description_buffer);
1432  if (!immediate_command)
1433  num_commands = 0;
1434  }
1435 
1436  if (immediate_command) {
1437  // Process cleanup, listings
1438  const int return_pipe =
1439  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1440  if (return_pipe < 0) {
1441  num_commands = 0;
1442  continue;
1443  }
1444 
1445  int retval;
1446  sqlite3_stmt *this_stmt_list = NULL;
1447  switch (command_type) {
1448  case kRemove: {
1449  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1450  const string hash_str = hash.ToString();
1451  LogCvmfs(kLogQuota, kLogDebug, "manually removing %s",
1452  hash_str.c_str());
1453  bool success = false;
1454 
1455  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1456  hash_str.length(), SQLITE_STATIC);
1457  int retval;
1458  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1459  const uint64_t size =
1460  sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1461  const uint64_t is_pinned =
1462  sqlite3_column_int64(quota_mgr->stmt_size_, 1);
1463 
1464  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1465  hash_str.length(), SQLITE_STATIC);
1466  retval = sqlite3_step(quota_mgr->stmt_rm_);
1467  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1468  success = true;
1469  quota_mgr->gauge_ -= size;
1470  if (is_pinned) {
1471  quota_mgr->pinned_chunks_.erase(hash);
1472  quota_mgr->pinned_ -= size;
1473  }
1474  } else {
1476  "failed to delete %s (%d)", hash_str.c_str(), retval);
1477  }
1478  sqlite3_reset(quota_mgr->stmt_rm_);
1479  } else {
1480  // File does not exist
1481  success = true;
1482  }
1483  sqlite3_reset(quota_mgr->stmt_size_);
1484 
1485  WritePipe(return_pipe, &success, sizeof(success));
1486  break;
1487  }
1488  case kCleanup:
1489  retval = quota_mgr->DoCleanup(size);
1490  WritePipe(return_pipe, &retval, sizeof(retval));
1491  break;
1492  case kList:
1493  if (!this_stmt_list)
1494  this_stmt_list = quota_mgr->stmt_list_;
1495  case kListPinned:
1496  if (!this_stmt_list)
1497  this_stmt_list = quota_mgr->stmt_list_pinned_;
1498  case kListCatalogs:
1499  if (!this_stmt_list)
1500  this_stmt_list = quota_mgr->stmt_list_catalogs_;
1501  case kListVolatile:
1502  if (!this_stmt_list)
1503  this_stmt_list = quota_mgr->stmt_list_volatile_;
1504 
1505  // Pipe back the list, one by one
1506  int length;
1507  while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1508  string path = "(NULL)";
1509  if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1510  path = string(reinterpret_cast<const char *>(
1511  sqlite3_column_text(this_stmt_list, 0)));
1512  }
1513  length = path.length();
1514  WritePipe(return_pipe, &length, sizeof(length));
1515  if (length > 0)
1516  WritePipe(return_pipe, &path[0], length);
1517  }
1518  length = -1;
1519  WritePipe(return_pipe, &length, sizeof(length));
1520  sqlite3_reset(this_stmt_list);
1521  break;
1522  case kStatus:
1523  WritePipe(return_pipe, &quota_mgr->gauge_, sizeof(quota_mgr->gauge_));
1524  WritePipe(return_pipe, &quota_mgr->pinned_,
1525  sizeof(quota_mgr->pinned_));
1526  break;
1527  case kLimits:
1528  WritePipe(return_pipe, &quota_mgr->limit_, sizeof(quota_mgr->limit_));
1529  WritePipe(return_pipe, &quota_mgr->cleanup_threshold_,
1530  sizeof(quota_mgr->cleanup_threshold_));
1531  break;
1532  case kPid: {
1533  pid_t pid = getpid();
1534  WritePipe(return_pipe, &pid, sizeof(pid));
1535  break;
1536  }
1537  default:
1538  PANIC(NULL); // other types are handled by the bunch processor
1539  }
1540  quota_mgr->UnbindReturnPipe(return_pipe);
1541  num_commands = 0;
1542  }
1543  }
1544 
1545  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager (%d)", errno);
1546  close(quota_mgr->pipe_lru_[0]);
1547  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1548  description_buffer);
1549 
1550  // Unpin
1551  command_buffer[0].command_type = kTouch;
1552  for (map<shash::Any, uint64_t>::const_iterator
1553  i = quota_mgr->pinned_chunks_.begin(),
1554  iEnd = quota_mgr->pinned_chunks_.end();
1555  i != iEnd;
1556  ++i) {
1557  command_buffer[0].StoreHash(i->first);
1558  quota_mgr->ProcessCommandBunch(1, command_buffer, description_buffer);
1559  }
1560 
1561  return NULL;
1562 }
1563 
1564 
1566  if (!shared_) {
1567  MakePipe(pipe);
1568  return;
1569  }
1570 
1571  // Create FIFO in cache directory, store path name (number) in pipe write end
1572  int i = 0;
1573  int retval;
1574  do {
1575  retval = mkfifo((workspace_dir_ + "/pipe" + StringifyInt(i)).c_str(), 0600);
1576  pipe[1] = i;
1577  i++;
1578  } while ((retval == -1) && (errno == EEXIST));
1579  assert(retval == 0);
1580 
1581  // Connect reader's end
1582  pipe[0] = open((workspace_dir_ + "/pipe" + StringifyInt(pipe[1])).c_str(),
1583  O_RDONLY | O_NONBLOCK);
1584  assert(pipe[0] >= 0);
1585  Nonblock2Block(pipe[0]);
1586 }
1587 
1588 
1589 void PosixQuotaManager::ParseDirectories(const std::string cache_workspace,
1590  std::string *cache_dir,
1591  std::string *workspace_dir) {
1592  vector<string> dir_tokens(SplitString(cache_workspace, ':'));
1593  switch (dir_tokens.size()) {
1594  case 1:
1595  *cache_dir = *workspace_dir = dir_tokens[0];
1596  break;
1597  case 2:
1598  *cache_dir = dir_tokens[0];
1599  *workspace_dir = dir_tokens[1];
1600  break;
1601  default:
1602  PANIC(NULL);
1603  }
1604 }
1605 
1606 
1613  const uint64_t size,
1614  const string &description,
1615  const bool is_catalog) {
1616  assert((size > 0) || !is_catalog);
1617 
1618  const string hash_str = hash.ToString();
1619  LogCvmfs(kLogQuota, kLogDebug, "pin into lru %s, path %s", hash_str.c_str(),
1620  description.c_str());
1621 
1622  // Has to run when not yet spawned (cvmfs initialization)
1623  if (!spawned_) {
1624  // Code duplication here
1625  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1626  if (pinned_ + size > cleanup_threshold_) {
1627  LogCvmfs(kLogQuota, kLogDebug, "failed to insert %s (pinned), no space",
1628  hash_str.c_str());
1629  return false;
1630  } else {
1631  pinned_chunks_[hash] = size;
1632  pinned_ += size;
1633  CheckHighPinWatermark();
1634  }
1635  }
1636  const bool exists = Contains(hash_str);
1637  if (!exists && (gauge_ + size > limit_)) {
1638  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1639  gauge_, size);
1640  const int retval = DoCleanup(cleanup_threshold_);
1641  assert(retval != 0);
1642  }
1643  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1644  SQLITE_STATIC);
1645  sqlite3_bind_int64(stmt_new_, 2, size);
1646  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1647  sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1648  SQLITE_STATIC);
1649  sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1650  sqlite3_bind_int64(stmt_new_, 6, 1);
1651  const int retval = sqlite3_step(stmt_new_);
1652  assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1653  sqlite3_reset(stmt_new_);
1654  if (!exists)
1655  gauge_ += size;
1656  return true;
1657  }
1658 
1659  int pipe_reserve[2];
1660  MakeReturnPipe(pipe_reserve);
1661 
1662  LruCommand cmd;
1663  cmd.command_type = kReserve;
1664  cmd.SetSize(size);
1665  cmd.StoreHash(hash);
1666  cmd.return_pipe = pipe_reserve[1];
1667  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1668  bool result;
1669  ManagedReadHalfPipe(pipe_reserve[0], &result, sizeof(result));
1670  CloseReturnPipe(pipe_reserve);
1671 
1672  if (!result)
1673  return false;
1674  DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1675 
1676  return true;
1677 }
1678 
1679 
1681  const uint64_t cleanup_threshold,
1682  const string &cache_workspace)
1683  : shared_(false)
1684  , spawned_(false)
1685  , limit_(limit)
1686  , cleanup_threshold_(cleanup_threshold)
1687  , gauge_(0)
1688  , pinned_(0)
1689  , seq_(0)
1690  , cache_dir_() // initialized in body
1691  , workspace_dir_() // initialized in body
1692  , fd_lock_cachedb_(-1)
1693  , async_delete_(true)
1694  , cachemgr_pid_(0)
1695  , database_(NULL)
1696  , stmt_touch_(NULL)
1697  , stmt_unpin_(NULL)
1698  , stmt_block_(NULL)
1699  , stmt_unblock_(NULL)
1700  , stmt_new_(NULL)
1701  , stmt_lru_(NULL)
1702  , stmt_size_(NULL)
1703  , stmt_rm_(NULL)
1704  , stmt_rm_batch_(NULL)
1705  , stmt_list_(NULL)
1706  , stmt_list_pinned_(NULL)
1707  , stmt_list_catalogs_(NULL)
1708  , stmt_list_volatile_(NULL)
1709  , initialized_(false) {
1710  ParseDirectories(cache_workspace, &cache_dir_, &workspace_dir_);
1711  pipe_lru_[0] = pipe_lru_[1] = -1;
1712  cleanup_recorder_.AddRecorder(1, 90); // last 1.5 min with second resolution
1713  // last 1.5 h with minute resolution
1714  cleanup_recorder_.AddRecorder(60, 90 * 60);
1715  // last 18 hours with 20 min resolution
1716  cleanup_recorder_.AddRecorder(20 * 60, 60 * 60 * 18);
1717  // last 4 days with hour resolution
1718  cleanup_recorder_.AddRecorder(60 * 60, 60 * 60 * 24 * 4);
1719 }
1720 
1721 
1723  if (!initialized_)
1724  return;
1725 
1726  if (shared_) {
1727  // Most of cleanup is done elsewhen by shared cache manager
1728  close(pipe_lru_[1]);
1729  return;
1730  }
1731 
1732  if (spawned_) {
1733  char fin = 0;
1734  WritePipe(pipe_lru_[1], &fin, 1);
1735  close(pipe_lru_[1]);
1736  pthread_join(thread_lru_, NULL);
1737  } else {
1739  }
1740 
1741  CloseDatabase();
1742 }
1743 
1744 
1746  const LruCommand *commands,
1747  const char *descriptions) {
1748  int retval = sqlite3_exec(database_, "BEGIN", NULL, NULL, NULL);
1749  assert(retval == SQLITE_OK);
1750 
1751  for (unsigned i = 0; i < num; ++i) {
1752  const shash::Any hash = commands[i].RetrieveHash();
1753  const string hash_str = hash.ToString();
1754  const unsigned size = commands[i].GetSize();
1755  LogCvmfs(kLogQuota, kLogDebug, "processing %s (%d)", hash_str.c_str(),
1756  commands[i].command_type);
1757 
1758  bool exists;
1759  switch (commands[i].command_type) {
1760  case kTouch:
1761  sqlite3_bind_int64(stmt_touch_, 1, seq_++);
1762  sqlite3_bind_text(stmt_touch_, 2, &hash_str[0], hash_str.length(),
1763  SQLITE_STATIC);
1764  retval = sqlite3_step(stmt_touch_);
1765  LogCvmfs(kLogQuota, kLogDebug, "touching %s (%ld): %d",
1766  hash_str.c_str(), seq_ - 1, retval);
1767  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1768  PANIC(kLogSyslogErr, "failed to update %s in cachedb, error %d",
1769  hash_str.c_str(), retval);
1770  }
1771  sqlite3_reset(stmt_touch_);
1772  break;
1773  case kUnpin:
1774  sqlite3_bind_text(stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1775  SQLITE_STATIC);
1776  retval = sqlite3_step(stmt_unpin_);
1777  LogCvmfs(kLogQuota, kLogDebug, "unpinning %s: %d", hash_str.c_str(),
1778  retval);
1779  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1780  PANIC(kLogSyslogErr, "failed to unpin %s in cachedb, error %d",
1781  hash_str.c_str(), retval);
1782  }
1783  sqlite3_reset(stmt_unpin_);
1784  break;
1785  case kPin:
1786  case kPinRegular:
1787  case kInsert:
1788  case kInsertVolatile:
1789  // It could already be in, check
1790  exists = Contains(hash_str);
1791 
1792  // Cleanup, move to trash and unlink
1793  if (!exists && (gauge_ + size > limit_)) {
1794  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %u",
1795  gauge_, size);
1796  retval = DoCleanup(cleanup_threshold_);
1797  assert(retval != 0);
1798  }
1799 
1800  // Insert or replace
1801  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1802  SQLITE_STATIC);
1803  sqlite3_bind_int64(stmt_new_, 2, size);
1804  if (commands[i].command_type == kInsertVolatile) {
1805  sqlite3_bind_int64(stmt_new_, 3, (seq_++) | kVolatileFlag);
1806  } else {
1807  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1808  }
1809  sqlite3_bind_text(stmt_new_, 4, &descriptions[i * kMaxDescription],
1810  commands[i].desc_length, SQLITE_STATIC);
1811  sqlite3_bind_int64(
1812  stmt_new_, 5,
1813  (commands[i].command_type == kPin) ? kFileCatalog : kFileRegular);
1814  sqlite3_bind_int64(stmt_new_, 6,
1815  ((commands[i].command_type == kPin)
1816  || (commands[i].command_type == kPinRegular))
1817  ? 1
1818  : 0);
1819  retval = sqlite3_step(stmt_new_);
1820  LogCvmfs(kLogQuota, kLogDebug, "insert or replace %s, method %d: %d",
1821  hash_str.c_str(), commands[i].command_type, retval);
1822  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1823  PANIC(kLogSyslogErr, "failed to insert %s in cachedb, error %d",
1824  hash_str.c_str(), retval);
1825  }
1826  sqlite3_reset(stmt_new_);
1827 
1828  if (!exists)
1829  gauge_ += size;
1830  break;
1831  default:
1832  // other types should have been taken care of by event loop
1833  PANIC(NULL);
1834  }
1835  }
1836 
1837  retval = sqlite3_exec(database_, "COMMIT", NULL, NULL, NULL);
1838  if (retval != SQLITE_OK) {
1839  PANIC(kLogSyslogErr, "failed to commit to cachedb, error %d", retval);
1840  }
1841 }
1842 
1843 
1845  bool result = false;
1846  string sql;
1847  sqlite3_stmt *stmt_select = NULL;
1848  sqlite3_stmt *stmt_insert = NULL;
1849  int sqlerr;
1850  int seq = 0;
1851  char hex[4];
1852  struct stat info;
1853  platform_dirent64 *d;
1854  DIR *dirp = NULL;
1855  string path;
1856 
1857  LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, "re-building cache database");
1858 
1859  // Empty cache catalog and fscache
1860  sql = "DELETE FROM cache_catalog; DELETE FROM fscache;";
1861  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1862  if (sqlerr != SQLITE_OK) {
1863  LogCvmfs(kLogQuota, kLogDebug, "could not clear cache database");
1864  goto build_return;
1865  }
1866 
1867  gauge_ = 0;
1868 
1869  // Insert files from cache sub-directories 00 - ff
1870  // TODO(jblomer): fs_traversal
1871  sqlite3_prepare_v2(database_,
1872  "INSERT INTO fscache (sha1, size, actime) "
1873  "VALUES (:sha1, :s, :t);",
1874  -1, &stmt_insert, NULL);
1875 
1876  for (int i = 0; i <= 0xff; i++) {
1877  snprintf(hex, sizeof(hex), "%02x", i);
1878  path = cache_dir_ + "/" + string(hex);
1879  if ((dirp = opendir(path.c_str())) == NULL) {
1881  "failed to open directory %s (tmpwatch interfering?)",
1882  path.c_str());
1883  goto build_return;
1884  }
1885  while ((d = platform_readdir(dirp)) != NULL) {
1886  const string file_path = path + "/" + string(d->d_name);
1887  if (stat(file_path.c_str(), &info) == 0) {
1888  if (!S_ISREG(info.st_mode))
1889  continue;
1890  if (info.st_size == 0) {
1892  "removing empty file %s during automatic cache db rebuild",
1893  file_path.c_str());
1894  unlink(file_path.c_str());
1895  continue;
1896  }
1897 
1898  string hash = string(hex) + string(d->d_name);
1899  sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1900  SQLITE_STATIC);
1901  sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1902  sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1903  if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1904  LogCvmfs(kLogQuota, kLogDebug, "could not insert into temp table");
1905  goto build_return;
1906  }
1907  sqlite3_reset(stmt_insert);
1908 
1909  gauge_ += info.st_size;
1910  } else {
1911  LogCvmfs(kLogQuota, kLogDebug, "could not stat %s", file_path.c_str());
1912  }
1913  }
1914  closedir(dirp);
1915  dirp = NULL;
1916  }
1917  sqlite3_finalize(stmt_insert);
1918  stmt_insert = NULL;
1919 
1920  // Transfer from temp table in cache catalog
1921  sqlite3_prepare_v2(database_,
1922  "SELECT sha1, size FROM fscache ORDER BY actime;", -1,
1923  &stmt_select, NULL);
1924  sqlite3_prepare_v2(
1925  database_,
1926  "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1927  "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1928  -1, &stmt_insert, NULL);
1929  while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1930  const string hash = string(
1931  reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1932  sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1933  sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1934  sqlite3_bind_int64(stmt_insert, 3, seq++);
1935  // Might also be a catalog (information is lost)
1936  sqlite3_bind_int64(stmt_insert, 4, kFileRegular);
1937 
1938  const int retval = sqlite3_step(stmt_insert);
1939  if (retval != SQLITE_DONE) {
1940  // If the file system hosting the cache is full, we'll likely notice here
1942  "could not insert into cache catalog (%d - %s)", retval,
1943  sqlite3_errstr(retval));
1944  goto build_return;
1945  }
1946  sqlite3_reset(stmt_insert);
1947  }
1948 
1949  // Delete temporary table
1950  sql = "DELETE FROM fscache;";
1951  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1952  if (sqlerr != SQLITE_OK) {
1953  LogCvmfs(kLogQuota, kLogDebug, "could not clear temporary table (%d)",
1954  sqlerr);
1955  goto build_return;
1956  }
1957 
1958  seq_ = seq;
1959  result = true;
1961  "rebuilding finished, sequence %" PRIu64 ", gauge %" PRIu64, seq_,
1962  gauge_);
1963 
1964 build_return:
1965  if (stmt_insert)
1966  sqlite3_finalize(stmt_insert);
1967  if (stmt_select)
1968  sqlite3_finalize(stmt_select);
1969  if (dirp)
1970  closedir(dirp);
1971  return result;
1972 }
1973 
1974 
1980  const string &channel_id) {
1981  if (protocol_revision_ >= 1) {
1982  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1983  MakeReturnPipe(back_channel);
1984 
1985  LruCommand cmd;
1987  cmd.return_pipe = back_channel[1];
1988  // Not StoreHash(). This is an MD5 hash.
1989  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1990  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1991 
1992  char success;
1993  ManagedReadHalfPipe(back_channel[0], &success, sizeof(success));
1994  // At this point, the named FIFO is unlinked, so don't use CloseReturnPipe
1995  if (success != 'S') {
1997  "failed to register quota back channel (%c)", success);
1998  }
1999  } else {
2000  // Dummy pipe to return valid file descriptors
2001  MakePipe(back_channel);
2002  }
2003 }
2004 
2005 
2010  const string hash_str = hash.ToString();
2011 
2012  int pipe_remove[2];
2013  MakeReturnPipe(pipe_remove);
2014 
2015  LruCommand cmd;
2016  cmd.command_type = kRemove;
2017  cmd.return_pipe = pipe_remove[1];
2018  cmd.StoreHash(hash);
2019  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
2020 
2021  bool success;
2022  ManagedReadHalfPipe(pipe_remove[0], &success, sizeof(success));
2023  CloseReturnPipe(pipe_remove);
2024 
2025  unlink((cache_dir_ + "/" + hash.MakePathWithoutSuffix()).c_str());
2026 }
2027 
2028 
2030  if (spawned_)
2031  return;
2032 
2033  if (pthread_create(&thread_lru_, NULL, MainCommandServer,
2034  static_cast<void *>(this))
2035  != 0) {
2036  PANIC(kLogDebug, "could not create lru thread");
2037  }
2038 
2039  spawned_ = true;
2040 }
2041 
2042 
2047  LruCommand cmd;
2048  cmd.command_type = kTouch;
2049  cmd.StoreHash(hash);
2050  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
2051 }
2052 
2053 
2055  if (shared_)
2056  close(pipe_wronly);
2057 }
2058 
2059 
2061  if (shared_)
2062  unlink((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str());
2063 }
2064 
2065 
2067  LogCvmfs(kLogQuota, kLogDebug, "Unpin %s", hash.ToString().c_str());
2068 
2069  LruCommand cmd;
2070  cmd.command_type = kUnpin;
2071  cmd.StoreHash(hash);
2072  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
2073 }
2074 
2075 
2077  const string &channel_id) {
2078  if (protocol_revision_ >= 1) {
2079  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
2080 
2081  LruCommand cmd;
2083  // Not StoreHash(). This is an MD5 hash.
2084  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
2085  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
2086 
2087  // Writer's end will be closed by cache manager, FIFO is already unlinked
2088  close(back_channel[0]);
2089  } else {
2090  ClosePipe(back_channel);
2091  }
2092 }
2093 
2094 void PosixQuotaManager::ManagedReadHalfPipe(int fd, void *buf, size_t nbyte) {
2095  const unsigned timeout_ms = cachemgr_pid_ ? 1000 : 0;
2096  bool result = false;
2097  do {
2098  result = ReadHalfPipe(fd, buf, nbyte, timeout_ms);
2099  // try only as long as the cachemgr is still alive
2100  } while (!result && getpgid(cachemgr_pid_) >= 0);
2101  if (!result) {
2102  PANIC(kLogStderr,
2103  "Error: quota manager could not read from cachemanager pipe");
2104  }
2105 }
virtual uint32_t GetProtocolRevision()
Definition: quota_posix.cc:740
void SetLogSyslogFacility(const int local_facility)
Definition: logging.cc:179
sqlite3_stmt * stmt_new_
Definition: quota_posix.h:361
sqlite3_stmt * stmt_list_pinned_
Definition: quota_posix.h:367
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
Definition: statistics.cc:266
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
Definition: logging.cc:147
std::vector< std::string > DoList(const CommandType list_command)
Definition: quota_posix.cc:652
struct stat64 platform_stat64
virtual pid_t GetPid()
Definition: quota_posix.cc:718
virtual uint64_t GetCleanupRate(uint64_t period_s)
Definition: quota_posix.cc:818
sqlite3_stmt * stmt_list_
Definition: quota_posix.h:366
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
Definition: quota_posix.cc:694
virtual uint64_t GetSize()
Definition: quota_posix.cc:800
sqlite3_stmt * stmt_rm_
Definition: quota_posix.h:364
virtual bool Cleanup(const uint64_t leave_size)
Definition: quota_posix.cc:125
static const unsigned kSqliteMemPerThread
Definition: quota_posix.h:201
#define PANIC(...)
Definition: exception.h:29
virtual uint64_t GetMaxFileSize()
Definition: quota_posix.cc:713
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
uint64_t cleanup_threshold_
Definition: quota_posix.h:287
static const uint32_t kProtocolRevision
Definition: quota.h:45
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1896
void Daemonize()
Definition: posix.cc:1685
bool DoCleanup(const uint64_t leave_size)
Definition: quota_posix.cc:492
uint64_t GetSize() const
Definition: quota_posix.h:159
bool SafeWrite(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:2036
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
Definition: quota_posix.cc:838
int BindReturnPipe(int pipe_wronly)
Definition: quota_posix.cc:64
std::string cache_dir_
Definition: quota_posix.h:308
bool Contains(const std::string &hash_str)
Definition: quota_posix.cc:205
perf::MultiRecorder cleanup_recorder_
Definition: quota_posix.h:354
int platform_stat(const char *path, platform_stat64 *buf)
void SetLogMicroSyslog(const std::string &filename)
Definition: logging.cc:267
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
sqlite3 * database_
Definition: quota_posix.h:356
unsigned char digest[digest_size_]
Definition: hash.h:121
virtual bool SetLimit(uint64_t limit)
Definition: quota_posix.cc:788
virtual std::vector< std::string > ListCatalogs()
void CheckHighPinWatermark()
Definition: quota_posix.cc:82
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
Definition: quota_posix.cc:759
bool FileExists(const std::string &path)
Definition: posix.cc:803
int GetLogSyslogLevel()
Definition: logging.cc:164
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:67
int64_t String2Int64(const string &value)
Definition: string.cc:234
unsigned GetDigestSize() const
Definition: hash.h:164
std::string workspace_dir_
Definition: quota_posix.h:315
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
pthread_t thread_lru_
Definition: quota_posix.h:330
bool ReadHalfPipe(int fd, void *buf, size_t nbyte, unsigned timeout_ms)
Definition: posix.cc:520
#define GetLogDebugFile()
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
Definition: posix.cc:2094
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
void Nonblock2Block(int filedes)
Definition: posix.cc:648
virtual uint64_t GetSizePinned()
Definition: quota_posix.cc:809
void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
virtual uint64_t GetCapacity()
Definition: quota_posix.cc:678
void SetSize(const uint64_t new_size)
Definition: quota_posix.h:153
void LockBackChannels()
Definition: quota.h:97
shash::Any RetrieveHash() const
Definition: quota_posix.h:173
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
Definition: posix.cc:980
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
Definition: quota_posix.cc:246
sqlite3_stmt * stmt_touch_
Definition: quota_posix.h:357
string StringifyInt(const int64_t value)
Definition: string.cc:77
std::string GetLogMicroSyslog()
Definition: logging.cc:303
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
Definition: quota_posix.cc:628
sqlite3_stmt * stmt_list_volatile_
Definition: quota_posix.h:369
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
Definition: quota.h:110
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
static int MainCacheManager(int argc, char **argv)
bool EmptyTrash(const std::vector< std::string > &trash)
Definition: quota_posix.cc:588
std::map< shash::Md5, int > back_channels_
Definition: quota.h:95
static const unsigned kMaxDescription
Definition: quota_posix.h:218
bool SetSharedLimit(uint64_t limit)
Definition: quota_posix.cc:772
bool CloseAllFildes(const std::set< int > &preserve_fildes)
Definition: posix.cc:1864
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
static const unsigned kLockFileMagicNumber
Definition: quota_posix.h:196
unsigned char digest[shash::kMaxDigestSize]
Definition: quota_posix.h:138
std::string MakePathWithoutSuffix() const
Definition: hash.h:323
sqlite3_stmt * stmt_list_catalogs_
Definition: quota_posix.h:368
bool ExecAsDaemon(const std::vector< std::string > &command_line, pid_t *child_pid)
Definition: posix.cc:1627
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
Definition: quota_posix.cc:278
uint64_t GetNoTicks(uint32_t retrospect_s) const
Definition: statistics.cc:271
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
platform_dirent64 * platform_readdir(DIR *dirp)
void SetCacheMgrPid(pid_t pid_)
Definition: quota_posix.h:89
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:514
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
Definition: posix.cc:659
std::map< shash::Any, uint64_t > pinned_chunks_
Definition: quota_posix.h:320
sqlite3_stmt * stmt_unpin_
Definition: quota_posix.h:358
void CloseReturnPipe(int pipe[2])
Definition: quota_posix.cc:195
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
virtual void Spawn()
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
const unsigned kDigestSizes[]
Definition: hash.h:69
void UnlockBackChannels()
Definition: quota.h:101
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
Definition: quota_posix.h:165
static const uint64_t kVolatileFlag
Definition: quota_posix.h:231
void UnlockFile(const int filedes)
Definition: posix.cc:1004
sqlite3_stmt * stmt_size_
Definition: quota_posix.h:363
struct dirent64 platform_dirent64
int GetLogSyslogFacility()
Definition: logging.cc:210
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545