CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quota_posix.cc
Go to the documentation of this file.
1 
15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
17 
18 
19 #include "quota_posix.h"
20 
21 #include <dirent.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdint.h>
28 #include <sys/dir.h>
29 #include <sys/stat.h>
30 #ifndef __APPLE__
31 #include <sys/statfs.h>
32 #endif
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 #include <cassert>
39 #include <cstdio>
40 #include <cstdlib>
41 #include <cstring>
42 #include <limits>
43 #include <map>
44 #include <set>
45 #include <string>
46 #include <vector>
47 
48 #include "crypto/hash.h"
49 #include "duplex_sqlite3.h"
50 #include "monitor.h"
51 #include "statistics.h"
52 #include "util/concurrency.h"
53 #include "util/exception.h"
54 #include "util/logging.h"
55 #include "util/platform.h"
56 #include "util/pointer.h"
57 #include "util/posix.h"
58 #include "util/smalloc.h"
59 #include "util/string.h"
60 
61 using namespace std; // NOLINT
62 
63 
64 int PosixQuotaManager::BindReturnPipe(int pipe_wronly) {
65  if (!shared_)
66  return pipe_wronly;
67 
68  // Connect writer's end
69  int result =
70  open((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str(),
71  O_WRONLY | O_NONBLOCK);
72  if (result >= 0) {
73  Nonblock2Block(result);
74  } else {
76  "failed to bind return pipe (%d)", errno);
77  }
78  return result;
79 }
80 
81 
83  const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84  if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86  "high watermark of pinned files (%" PRIu64 "M > %" PRIu64 "M)",
87  pinned_/(1024*1024), watermark/(1024*1024));
88  BroadcastBackchannels("R"); // clients: please release pinned catalogs
89  }
90 }
91 
92 
94  DIR *dirp = opendir(workspace_dir_.c_str());
95  assert(dirp != NULL);
96 
97  platform_dirent64 *dent;
98  bool found_leftovers = false;
99  while ((dent = platform_readdir(dirp)) != NULL) {
100  const string name = dent->d_name;
101  const string path = workspace_dir_ + "/" + name;
102  platform_stat64 info;
103  int retval = platform_stat(path.c_str(), &info);
104  if (retval != 0)
105  continue;
106  if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) == "pipe")) {
107  if (!found_leftovers) {
109  "removing left-over FIFOs from cache directory");
110  }
111  found_leftovers = true;
112  unlink(path.c_str());
113  }
114  }
115  closedir(dirp);
116 }
117 
118 
125 bool PosixQuotaManager::Cleanup(const uint64_t leave_size) {
126  if (!spawned_)
127  return DoCleanup(leave_size);
128 
129  bool result;
130  int pipe_cleanup[2];
131  MakeReturnPipe(pipe_cleanup);
132 
133  LruCommand cmd;
134  cmd.command_type = kCleanup;
135  cmd.size = leave_size;
136  cmd.return_pipe = pipe_cleanup[1];
137 
138  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
139  ManagedReadHalfPipe(pipe_cleanup[0], &result, sizeof(result));
140  CloseReturnPipe(pipe_cleanup);
141 
142  return result;
143 }
144 
145 
147  if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148  if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149  if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150  if (stmt_list_) sqlite3_finalize(stmt_list_);
151  if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152  if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153  if (stmt_rm_batch_) sqlite3_finalize(stmt_rm_batch_);
154  if (stmt_size_) sqlite3_finalize(stmt_size_);
155  if (stmt_touch_) sqlite3_finalize(stmt_touch_);
156  if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
157  if (stmt_block_) sqlite3_finalize(stmt_block_);
158  if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
159  if (stmt_new_) sqlite3_finalize(stmt_new_);
160  if (database_) sqlite3_close(database_);
161  UnlockFile(fd_lock_cachedb_);
162 
163  stmt_list_catalogs_ = NULL;
164  stmt_list_pinned_ = NULL;
165  stmt_list_volatile_ = NULL;
166  stmt_list_ = NULL;
167  stmt_rm_ = NULL;
168  stmt_rm_batch_ = NULL;
169  stmt_size_ = NULL;
170  stmt_touch_ = NULL;
171  stmt_unpin_ = NULL;
172  stmt_block_ = NULL;
173  stmt_unblock_ = NULL;
174  stmt_new_ = NULL;
175  database_ = NULL;
176 
177  pinned_chunks_.clear();
178 }
179 
180 
182  if (shared_) {
183  close(pipe[0]);
184  UnlinkReturnPipe(pipe[1]);
185  } else {
186  ClosePipe(pipe);
187  }
188 }
189 
190 
191 bool PosixQuotaManager::Contains(const string &hash_str) {
192  bool result = false;
193 
194  sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
195  SQLITE_STATIC);
196  if (sqlite3_step(stmt_size_) == SQLITE_ROW)
197  result = true;
198  sqlite3_reset(stmt_size_);
199  LogCvmfs(kLogQuota, kLogDebug, "contains %s returns %d",
200  hash_str.c_str(), result);
201 
202  return result;
203 }
204 
205 
207  if ((limit_ == 0) || (gauge_ >= limit_))
208  return;
209 
210  struct statvfs vfs_info;
211  int retval = statvfs((cache_dir_ + "/cachedb").c_str(), &vfs_info);
212  if (retval != 0) {
214  "failed to query %s for free space (%d)",
215  cache_dir_.c_str(), errno);
216  return;
217  }
218  int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
219  LogCvmfs(kLogQuota, kLogDebug, "free space: %" PRId64 " MB",
220  free_space_byte / (1024 * 1024));
221 
222  int64_t required_byte = limit_ - gauge_;
223  if (free_space_byte < required_byte) {
225  "too little free space on the file system hosting the cache,"
226  " %" PRId64 " MB available",
227  free_space_byte / (1024 * 1024));
228  }
229 }
230 
231 
233  const string &cache_workspace,
234  const uint64_t limit,
235  const uint64_t cleanup_threshold,
236  const bool rebuild_database)
237 {
238  if (cleanup_threshold >= limit) {
239  LogCvmfs(kLogQuota, kLogDebug, "invalid parameters: limit %" PRIu64 ", "
240  "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
241  return NULL;
242  }
243 
244  PosixQuotaManager *quota_manager =
245  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
246 
247  // Initialize cache catalog
248  if (!quota_manager->InitDatabase(rebuild_database)) {
249  delete quota_manager;
250  return NULL;
251  }
252  quota_manager->CheckFreeSpace();
253  MakePipe(quota_manager->pipe_lru_);
254 
255  quota_manager->protocol_revision_ = kProtocolRevision;
256  quota_manager->initialized_ = true;
257  return quota_manager;
258 }
259 
260 
265  const std::string &exe_path,
266  const std::string &cache_workspace,
267  const uint64_t limit,
268  const uint64_t cleanup_threshold,
269  bool foreground)
270 {
271  string cache_dir;
272  string workspace_dir;
273  ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
274 
275  pid_t new_cachemgr_pid;
276 
277  // Create lock file: only one fuse client at a time
278  const int fd_lockfile = LockFile(workspace_dir + "/lock_cachemgr");
279  if (fd_lockfile < 0) {
280  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
281  (workspace_dir + "/lock_cachemgr").c_str(), errno);
282  return NULL;
283  }
284 
285  PosixQuotaManager *quota_mgr =
286  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
287  quota_mgr->shared_ = true;
288  quota_mgr->spawned_ = true;
289 
290  // Try to connect to pipe
291  const string fifo_path = workspace_dir + "/cachemgr";
292  LogCvmfs(kLogQuota, kLogDebug, "trying to connect to existing pipe");
293  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
294  if (quota_mgr->pipe_lru_[1] >= 0) {
295  const int fd_lockfile_rw = open((workspace_dir + "/lock_cachemgr").c_str(), O_RDWR, 0600);
296  unsigned lockfile_magicnumber = 0;
297  const ssize_t result_mn = SafeRead(fd_lockfile_rw, &lockfile_magicnumber, sizeof(lockfile_magicnumber));
298  const ssize_t result = SafeRead(fd_lockfile_rw, &new_cachemgr_pid, sizeof(new_cachemgr_pid));
299  close(fd_lockfile_rw);
300 
301  if ((lockfile_magicnumber != kLockFileMagicNumber) || (result < 0) || (result_mn < 0)
302  || (static_cast<size_t>(result) < sizeof(new_cachemgr_pid))) {
303  if (result != 0) {
305  "could not read cache manager pid from lockfile");
306  UnlockFile(fd_lockfile);
307  delete quota_mgr;
308  return NULL;
309  } else {
310  // support reload from old versions of the cache manager
311  // lock file is empty in this case, try a plain ReadHalfPipe to get pid
312  quota_mgr->SetCacheMgrPid(quota_mgr->GetPid());
313  }
314  } else {
315  quota_mgr->SetCacheMgrPid(new_cachemgr_pid);
316  }
317 
318 
319  LogCvmfs(kLogQuota, kLogDebug, "connected to existing cache manager pipe");
320  quota_mgr->initialized_ = true;
321  Nonblock2Block(quota_mgr->pipe_lru_[1]);
322  UnlockFile(fd_lockfile);
323  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
325  "received limit %" PRIu64 ", threshold %" PRIu64,
326  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
327  if (FileExists(workspace_dir + "/cachemgr.protocol")) {
328  quota_mgr->protocol_revision_ = quota_mgr->GetProtocolRevision();
329  LogCvmfs(kLogQuota, kLogDebug, "connected protocol revision %u",
330  quota_mgr->protocol_revision_);
331  } else {
332  LogCvmfs(kLogQuota, kLogDebug, "connected to ancient cache manager");
333  }
334  return quota_mgr;
335  }
336  const int connect_error = errno;
337 
338  // Lock file: let existing cache manager finish first
339  const int fd_lockfile_fifo = LockFile(workspace_dir + "/lock_cachemgr.fifo");
340  if (fd_lockfile_fifo < 0) {
341  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
342  (workspace_dir + "/lock_cachemgr.fifo").c_str(), errno);
343  UnlockFile(fd_lockfile);
344  delete quota_mgr;
345  return NULL;
346  }
347  UnlockFile(fd_lockfile_fifo);
348 
349  if (connect_error == ENXIO) {
350  LogCvmfs(kLogQuota, kLogDebug, "left-over FIFO found, unlinking");
351  unlink(fifo_path.c_str());
352  }
353 
354  // Creating a new FIFO for the cache manager (to be bound later)
355  int retval = mkfifo(fifo_path.c_str(), 0600);
356  if (retval != 0) {
357  LogCvmfs(kLogQuota, kLogDebug, "failed to create cache manager FIFO (%d)",
358  errno);
359  UnlockFile(fd_lockfile);
360  delete quota_mgr;
361  return NULL;
362  }
363 
364  // Create new cache manager
365  int pipe_boot[2];
366  int pipe_handshake[2];
367  MakePipe(pipe_boot);
368  MakePipe(pipe_handshake);
369 
370  vector<string> command_line;
371  command_line.push_back(exe_path);
372  command_line.push_back("__cachemgr__");
373  command_line.push_back(cache_workspace);
374  command_line.push_back(StringifyInt(pipe_boot[1]));
375  command_line.push_back(StringifyInt(pipe_handshake[0]));
376  command_line.push_back(StringifyInt(limit));
377  command_line.push_back(StringifyInt(cleanup_threshold));
378  // do not propagate foreground in order to reliably get pid from exec
379  // instead, daemonize right here
380  command_line.push_back(StringifyInt(true)); //foreground
381  command_line.push_back(StringifyInt(GetLogSyslogLevel()));
382  command_line.push_back(StringifyInt(GetLogSyslogFacility()));
383  command_line.push_back(GetLogDebugFile() + ":" + GetLogMicroSyslog());
384 
385  set<int> preserve_filedes;
386  preserve_filedes.insert(0);
387  preserve_filedes.insert(1);
388  preserve_filedes.insert(2);
389  preserve_filedes.insert(pipe_boot[1]);
390  preserve_filedes.insert(pipe_handshake[0]);
391 
392  if (foreground) {
393  retval = ManagedExec(command_line, preserve_filedes, map<int, int>(),
394  /*drop_credentials*/ false,
395  /*clear_env*/ false,
396  /*double_fork*/ true,
397  &new_cachemgr_pid);
398  } else {
399  retval = ExecAsDaemon(command_line, &new_cachemgr_pid);
400  }
401  if (!retval) {
402  UnlockFile(fd_lockfile);
403  ClosePipe(pipe_boot);
404  ClosePipe(pipe_handshake);
405  delete quota_mgr;
406  LogCvmfs(kLogQuota, kLogDebug, "failed to start cache manager");
407  return NULL;
408  }
409  LogCvmfs(kLogQuota, kLogDebug, "new cache manager pid: %d", new_cachemgr_pid);
410  quota_mgr->SetCacheMgrPid(new_cachemgr_pid);
411  const int fd_lockfile_rw = open((workspace_dir + "/lock_cachemgr").c_str(), O_RDWR | O_TRUNC, 0600);
412  const unsigned magic_number = PosixQuotaManager::kLockFileMagicNumber;
413  const bool result_mn = SafeWrite(fd_lockfile_rw, &magic_number, sizeof(magic_number));
414  const bool result = SafeWrite(fd_lockfile_rw, &new_cachemgr_pid, sizeof(new_cachemgr_pid));
415  if (!result || !result_mn) {
416  PANIC(kLogSyslogErr, "could not write cache manager pid to lockfile");
417  }
418 
419  close(fd_lockfile_rw);
420  // Wait for cache manager to be ready
421  close(pipe_boot[1]);
422  close(pipe_handshake[0]);
423  char buf;
424  if (read(pipe_boot[0], &buf, 1) != 1) {
425  UnlockFile(fd_lockfile);
426  close(pipe_boot[0]);
427  close(pipe_handshake[1]);
428  delete quota_mgr;
430  "cache manager did not start");
431  return NULL;
432  }
433  close(pipe_boot[0]);
434 
435  // Connect write end
436  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
437  if (quota_mgr->pipe_lru_[1] < 0) {
439  "failed to connect to newly created FIFO (%d)", errno);
440  close(pipe_handshake[1]);
441  UnlockFile(fd_lockfile);
442  delete quota_mgr;
443  return NULL;
444  }
445 
446  // Finalize handshake
447  buf = 'C';
448  if (write(pipe_handshake[1], &buf, 1) != 1) {
449  UnlockFile(fd_lockfile);
450  close(pipe_handshake[1]);
451  LogCvmfs(kLogQuota, kLogDebug, "could not finalize handshake");
452  delete quota_mgr;
453  return NULL;
454  }
455  close(pipe_handshake[1]);
456 
457  Nonblock2Block(quota_mgr->pipe_lru_[1]);
458  LogCvmfs(kLogQuota, kLogDebug, "connected to a new cache manager");
459  quota_mgr->protocol_revision_ = kProtocolRevision;
460 
461  UnlockFile(fd_lockfile);
462 
463  quota_mgr->initialized_ = true;
464  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
465  LogCvmfs(kLogQuota, kLogDebug, "received limit %" PRIu64 ", "
466  "threshold %" PRIu64,
467  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
468  return quota_mgr;
469 }
470 
471 
472 bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
473  if (gauge_ <= leave_size)
474  return true;
475 
476  // TODO(jblomer) transaction
478  "clean up cache until at most %lu KB is used", leave_size/1024);
479  LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_);
480  cleanup_recorder_.Tick();
481 
482  bool result;
483  vector<string> trash;
484 
485  // Note that volatile files start counting from the smallest int64 number:
486  // the absolute sequence number with the first bit set in two's complement.
487  // So -1 can be a marker that will never appear in the database.
488  int64_t max_acseq = -1;
489  do {
490  sqlite3_reset(stmt_lru_);
491  sqlite3_bind_int64(stmt_lru_, 1, (max_acseq == -1) ?
492  std::numeric_limits<int64_t>::min() : (max_acseq + 1));
493 
494  std::vector<EvictCandidate> candidates;
495  candidates.reserve(kEvictBatchSize);
496  string hash_str;
497  unsigned i = 0;
498  while (sqlite3_step(stmt_lru_) == SQLITE_ROW) {
499  hash_str = reinterpret_cast<const char *>(
500  sqlite3_column_text(stmt_lru_, 0));
501  LogCvmfs(kLogQuota, kLogDebug, "add %s to candidates for eviction",
502  hash_str.c_str());
503  candidates.push_back(EvictCandidate(
505  sqlite3_column_int64(stmt_lru_, 1),
506  sqlite3_column_int64(stmt_lru_, 2)));
507  i++;
508  }
509  if (candidates.empty()) {
510  LogCvmfs(kLogQuota, kLogDebug, "no more entries to evict");
511  break;
512  }
513 
514  const unsigned N = candidates.size();
515  for (i = 0; i < N; ++i) {
516  // That's a critical condition. We must not delete a not yet inserted
517  // pinned file as it is already reserved (but will be inserted later).
518  // Instead, set the pin bit in the db to not run into an endless loop
519  if (pinned_chunks_.find(candidates[i].hash) != pinned_chunks_.end()) {
520  hash_str = candidates[i].hash.ToString();
521  LogCvmfs(kLogQuota, kLogDebug, "skip %s for eviction",
522  hash_str.c_str());
523  sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
524  SQLITE_STATIC);
525  result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
526  sqlite3_reset(stmt_block_);
527  assert(result);
528  continue;
529  }
530 
531  trash.push_back(cache_dir_ + "/" +
532  candidates[i].hash.MakePathWithoutSuffix());
533  gauge_ -= candidates[i].size;
534  max_acseq = candidates[i].acseq;
535  LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64,
536  candidates[i].hash.ToString().c_str(), gauge_);
537 
538  if (gauge_ <= leave_size)
539  break;
540  }
541  } while (gauge_ > leave_size);
542 
543  if (max_acseq != -1) {
544  sqlite3_bind_int64(stmt_rm_batch_, 1, max_acseq);
545  result = (sqlite3_step(stmt_rm_batch_) == SQLITE_DONE);
546  assert(result);
547  sqlite3_reset(stmt_rm_batch_);
548 
549  result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
550  sqlite3_reset(stmt_unblock_);
551  assert(result);
552  }
553 
554  if (!EmptyTrash(trash))
555  return false;
556 
557  if (gauge_ > leave_size) {
559  "request to clean until %" PRIu64 ", "
560  "but effective gauge is %" PRIu64, leave_size, gauge_);
561  return false;
562  }
563  return true;
564 }
565 
566 bool PosixQuotaManager::EmptyTrash(const std::vector<std::string> &trash) {
567  if (trash.empty())
568  return true;
569 
570  if (async_delete_) {
571  // Double fork avoids zombie, forked removal process must not flush file
572  // buffers
573  pid_t pid;
574  int statloc;
575  if ((pid = fork()) == 0) {
576  // TODO(jblomer): eviciting files in the cache should perhaps become a
577  // thread. This would also allow to block the chunks and prevent the
578  // race with re-insertion. Then again, a thread can block umount.
579 #ifndef DEBUGMSG
580  CloseAllFildes(std::set<int>());
581 #endif
582  if (fork() == 0) {
583  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
584  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
585  unlink(trash[i].c_str());
586  }
587  _exit(0);
588  }
589  _exit(0);
590  } else {
591  if (pid > 0)
592  waitpid(pid, &statloc, 0);
593  else
594  return false;
595  }
596  } else { // !async_delete_
597  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
598  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
599  unlink(trash[i].c_str());
600  }
601  }
602  return true;
603 }
604 
605 
607  const shash::Any &hash,
608  const uint64_t size,
609  const string &description,
610  const CommandType command_type)
611 {
612  const string hash_str = hash.ToString();
613  LogCvmfs(kLogQuota, kLogDebug, "insert into lru %s, path %s, method %d",
614  hash_str.c_str(), description.c_str(), command_type);
615  const unsigned desc_length = (description.length() > kMaxDescription) ?
616  kMaxDescription : description.length();
617 
618  LruCommand *cmd =
619  reinterpret_cast<LruCommand *>(alloca(sizeof(LruCommand) + desc_length));
620  new (cmd) LruCommand;
621  cmd->command_type = command_type;
622  cmd->SetSize(size);
623  cmd->StoreHash(hash);
624  cmd->desc_length = desc_length;
625  memcpy(reinterpret_cast<char *>(cmd)+sizeof(LruCommand),
626  &description[0], desc_length);
627  WritePipe(pipe_lru_[1], cmd, sizeof(LruCommand) + desc_length);
628 }
629 
630 
631 vector<string> PosixQuotaManager::DoList(const CommandType list_command) {
632  vector<string> result;
633 
634  int pipe_list[2];
635  MakeReturnPipe(pipe_list);
636  char description_buffer[kMaxDescription];
637 
638  LruCommand cmd;
639  cmd.command_type = list_command;
640  cmd.return_pipe = pipe_list[1];
641  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
642 
643  int length;
644  do {
645  ManagedReadHalfPipe(pipe_list[0], &length, sizeof(length));
646  if (length > 0) {
647  ReadPipe(pipe_list[0], description_buffer, length);
648  result.push_back(string(description_buffer, length));
649  }
650  } while (length >= 0);
651 
652  CloseReturnPipe(pipe_list);
653  return result;
654 }
655 
656 
658  if (limit_ != (uint64_t)(-1))
659  return limit_;
660 
661  // Unrestricted cache, look at free space on cache dir fs
662  struct statfs info;
663  if (statfs(".", &info) == 0) {
664  return info.f_bavail * info.f_bsize;
665  } else {
667  "failed to query file system info of cache (%d)", errno);
668  return limit_;
669  }
670 }
671 
672 
673 void PosixQuotaManager::GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
674 {
675  int pipe_limits[2];
676  MakeReturnPipe(pipe_limits);
677 
678  LruCommand cmd;
679  cmd.command_type = kLimits;
680  cmd.return_pipe = pipe_limits[1];
681  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
682  ManagedReadHalfPipe(pipe_limits[0], limit, sizeof(*limit));
683  ReadPipe(pipe_limits[0], cleanup_threshold, sizeof(*cleanup_threshold));
684  CloseReturnPipe(pipe_limits);
685 }
686 
687 
693  return limit_ - cleanup_threshold_;
694 }
695 
696 
698  if (!shared_ || !spawned_) {
699  return getpid();
700  }
701  if (cachemgr_pid_) {
702  return cachemgr_pid_;
703  }
704 
705  pid_t result;
706  int pipe_pid[2];
707  MakeReturnPipe(pipe_pid);
708 
709  LruCommand cmd;
710  cmd.command_type = kPid;
711  cmd.return_pipe = pipe_pid[1];
712  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
713  ReadHalfPipe(pipe_pid[0], &result, sizeof(result));
714  CloseReturnPipe(pipe_pid);
715  return result;
716 }
717 
718 
720  int pipe_revision[2];
721  MakeReturnPipe(pipe_revision);
722 
723  LruCommand cmd;
724  cmd.command_type = kGetProtocolRevision;
725  cmd.return_pipe = pipe_revision[1];
726  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
727 
728  uint32_t revision;
729  ManagedReadHalfPipe(pipe_revision[0], &revision, sizeof(revision));
730  CloseReturnPipe(pipe_revision);
731  return revision;
732 }
733 
734 
738 void PosixQuotaManager::GetSharedStatus(uint64_t *gauge, uint64_t *pinned) {
739  int pipe_status[2];
740  MakeReturnPipe(pipe_status);
741 
742  LruCommand cmd;
743  cmd.command_type = kStatus;
744  cmd.return_pipe = pipe_status[1];
745  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
746  ManagedReadHalfPipe(pipe_status[0], gauge, sizeof(*gauge));
747  ReadPipe(pipe_status[0], pinned, sizeof(*pinned));
748  CloseReturnPipe(pipe_status);
749 }
750 
751 bool PosixQuotaManager::SetSharedLimit(uint64_t limit) {
752  int pipe_set_limit[2];
753  bool result;
754  MakeReturnPipe(pipe_set_limit);
755 
756  LruCommand cmd;
757  cmd.command_type = kSetLimit;
758  cmd.size = limit;
759  cmd.return_pipe = pipe_set_limit[1];
760  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
761  ReadHalfPipe(pipe_set_limit[0], &result, sizeof(result));
762  CloseReturnPipe(pipe_set_limit);
763  return result;
764 }
765 
766 
768  if (!spawned_) {
769  limit_ = size;
770  cleanup_threshold_ = size/2;
771  LogCvmfs(kLogQuota, kLogDebug | kLogSyslog, "Quota limit set to %lu / threshold %lu", limit_, cleanup_threshold_ );
772  return true;
773  }
774  return SetSharedLimit(size);
775 }
776 
778  if (!spawned_) return gauge_;
779  uint64_t gauge, size_pinned;
780  GetSharedStatus(&gauge, &size_pinned);
781  return gauge;
782 }
783 
784 
786  if (!spawned_) return pinned_;
787  uint64_t gauge, size_pinned;
788  GetSharedStatus(&gauge, &size_pinned);
789  return size_pinned;
790 }
791 
792 
793 uint64_t PosixQuotaManager::GetCleanupRate(uint64_t period_s) {
794  if (!spawned_ || (protocol_revision_ < 2)) return 0;
795  uint64_t cleanup_rate;
796 
797  int pipe_cleanup_rate[2];
798  MakeReturnPipe(pipe_cleanup_rate);
799  LruCommand cmd;
800  cmd.command_type = kCleanupRate;
801  cmd.size = period_s;
802  cmd.return_pipe = pipe_cleanup_rate[1];
803  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
804  ManagedReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate, sizeof(cleanup_rate));
805  CloseReturnPipe(pipe_cleanup_rate);
806 
807  return cleanup_rate;
808 }
809 
810 
811 bool PosixQuotaManager::InitDatabase(const bool rebuild_database) {
812  string sql;
813  sqlite3_stmt *stmt;
814 
815  fd_lock_cachedb_ = LockFile(workspace_dir_ + "/lock_cachedb");
816  if (fd_lock_cachedb_ < 0) {
817  LogCvmfs(kLogQuota, kLogDebug, "failed to create cachedb lock");
818  return false;
819  }
820 
821  bool retry = false;
822  const string db_file = cache_dir_ + "/cachedb";
823  if (rebuild_database) {
824  LogCvmfs(kLogQuota, kLogDebug, "rebuild database, unlinking existing (%s)",
825  db_file.c_str());
826  unlink(db_file.c_str());
827  unlink((db_file + "-journal").c_str());
828  }
829 
830  init_recover:
831  int err = sqlite3_open(db_file.c_str(), &database_);
832  if (err != SQLITE_OK) {
833  LogCvmfs(kLogQuota, kLogDebug, "could not open cache database (%d)", err);
834  goto init_database_fail;
835  }
836  // TODO(reneme): make this a `QuotaDatabase : public sqlite::Database`
837  sql = "PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
838  "PRAGMA auto_vacuum=1; "
839  "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
840  " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
841  "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
842  "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
843  " ON cache_catalog (acseq); "
844  "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
845  "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
846  "CREATE INDEX idx_fscache_actime ON fscache (actime); "
847  "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
848  " CONSTRAINT pk_properties PRIMARY KEY(key));";
849  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
850  if (err != SQLITE_OK) {
851  if (!retry) {
852  retry = true;
853  sqlite3_close(database_);
854  unlink(db_file.c_str());
855  unlink((db_file + "-journal").c_str());
857  "LRU database corrupted, re-building");
858  goto init_recover;
859  }
860  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
861  sql.c_str());
862  goto init_database_fail;
863  }
864 
865  // If this an old cache catalog,
866  // add and initialize new columns to cache_catalog
867  sql = "ALTER TABLE cache_catalog ADD type INTEGER; "
868  "ALTER TABLE cache_catalog ADD pinned INTEGER";
869  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
870  if (err == SQLITE_OK) {
871  sql = "UPDATE cache_catalog SET type=" + StringifyInt(kFileRegular) + ";";
872  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
873  if (err != SQLITE_OK) {
875  "could not init cache database (failed: %s)", sql.c_str());
876  goto init_database_fail;
877  }
878  }
879 
880  // Set pinned back
881  sql = "UPDATE cache_catalog SET pinned=0;";
882  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
883  if (err != SQLITE_OK) {
884  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
885  sql.c_str());
886  goto init_database_fail;
887  }
888 
889  // Set schema version
890  sql = "INSERT OR REPLACE INTO properties (key, value) "
891  "VALUES ('schema', '1.0')";
892  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
893  if (err != SQLITE_OK) {
894  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
895  sql.c_str());
896  goto init_database_fail;
897  }
898 
899  // If cache catalog is empty, recreate from file system
900  sql = "SELECT count(*) FROM cache_catalog;";
901  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
902  if (sqlite3_step(stmt) == SQLITE_ROW) {
903  if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
905  "CernVM-FS: building lru cache database...");
906  if (!RebuildDatabase()) {
908  "could not build cache database from file system");
909  sqlite3_finalize(stmt);
910  goto init_database_fail;
911  }
912  }
913  sqlite3_finalize(stmt);
914  } else {
915  LogCvmfs(kLogQuota, kLogDebug, "could not select on cache catalog");
916  sqlite3_finalize(stmt);
917  goto init_database_fail;
918  }
919 
920  // How many bytes do we already have in cache?
921  sql = "SELECT sum(size) FROM cache_catalog;";
922  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
923  if (sqlite3_step(stmt) == SQLITE_ROW) {
924  gauge_ = sqlite3_column_int64(stmt, 0);
925  } else {
926  LogCvmfs(kLogQuota, kLogDebug, "could not determine cache size");
927  sqlite3_finalize(stmt);
928  goto init_database_fail;
929  }
930  sqlite3_finalize(stmt);
931 
932  // Highest seq-no?
933  sql = "SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
934  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
935  if (sqlite3_step(stmt) == SQLITE_ROW) {
936  seq_ = sqlite3_column_int64(stmt, 0)+1;
937  } else {
938  LogCvmfs(kLogQuota, kLogDebug, "could not determine highest seq-no");
939  sqlite3_finalize(stmt);
940  goto init_database_fail;
941  }
942  sqlite3_finalize(stmt);
943 
944  // Prepare touch, new, remove statements
945  sqlite3_prepare_v2(database_,
946  "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
947  "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
948  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=0 "
949  "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
950  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=2 "
951  "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
952  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=1 "
953  "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
954  sqlite3_prepare_v2(database_,
955  "INSERT OR REPLACE INTO cache_catalog "
956  "(sha1, size, acseq, path, type, pinned) "
957  "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
958  -1, &stmt_new_, NULL);
959  sqlite3_prepare_v2(database_,
960  "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
961  -1, &stmt_size_, NULL);
962  sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;",
963  -1, &stmt_rm_, NULL);
964  sqlite3_prepare_v2(database_,
965  "DELETE FROM cache_catalog WHERE acseq<=:a AND pinned<>2;",
966  -1, &stmt_rm_batch_, NULL);
967  sqlite3_prepare_v2(database_, (std::string(
968  "SELECT sha1, size, acseq FROM cache_catalog "
969  "WHERE pinned<>2 AND acseq>=:a "
970  "ORDER BY acseq ASC "
971  "LIMIT ") + StringifyInt(kEvictBatchSize) + ";").c_str(),
972  -1, &stmt_lru_, NULL);
973  sqlite3_prepare_v2(database_,
974  ("SELECT path FROM cache_catalog WHERE type=" +
975  StringifyInt(kFileRegular) +
976  ";").c_str(), -1, &stmt_list_, NULL);
977  sqlite3_prepare_v2(database_,
978  "SELECT path FROM cache_catalog WHERE pinned<>0;",
979  -1, &stmt_list_pinned_, NULL);
980  sqlite3_prepare_v2(database_,
981  "SELECT path FROM cache_catalog WHERE acseq < 0;",
982  -1, &stmt_list_volatile_, NULL);
983  sqlite3_prepare_v2(database_,
984  ("SELECT path FROM cache_catalog WHERE type=" +
985  StringifyInt(kFileCatalog) +
986  ";").c_str(), -1, &stmt_list_catalogs_, NULL);
987  return true;
988 
989  init_database_fail:
990  sqlite3_close(database_);
991  database_ = NULL;
992  UnlockFile(fd_lock_cachedb_);
993  return false;
994 }
995 
996 
1002  const shash::Any &any_hash,
1003  const uint64_t size,
1004  const string &description)
1005 {
1006  DoInsert(any_hash, size, description, kInsert);
1007 }
1008 
1009 
1016  const shash::Any &any_hash,
1017  const uint64_t size,
1018  const string &description)
1019 {
1020  DoInsert(any_hash, size, description, kInsertVolatile);
1021 }
1022 
1023 
1027 vector<string> PosixQuotaManager::List() {
1028  return DoList(kList);
1029 }
1030 
1031 
1036  return DoList(kListPinned);
1037 }
1038 
1039 
1044  return DoList(kListCatalogs);
1045 }
1046 
1047 
1052  return DoList(kListVolatile);
1053 }
1054 
1055 
1059 int PosixQuotaManager::MainCacheManager(int argc, char **argv) {
1060 
1061  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1062  int retval;
1063 
1064  PosixQuotaManager shared_manager(0, 0, "");
1065  shared_manager.shared_ = true;
1066  shared_manager.spawned_ = true;
1067  shared_manager.pinned_ = 0;
1068 
1069  // Process command line arguments
1070  ParseDirectories(string(argv[2]),
1071  &shared_manager.cache_dir_,
1072  &shared_manager.workspace_dir_);
1073  int pipe_boot = String2Int64(argv[3]);
1074  int pipe_handshake = String2Int64(argv[4]);
1075  shared_manager.limit_ = String2Int64(argv[5]);
1076  shared_manager.cleanup_threshold_ = String2Int64(argv[6]);
1077  int foreground = String2Int64(argv[7]);
1078  int syslog_level = String2Int64(argv[8]);
1079  int syslog_facility = String2Int64(argv[9]);
1080  vector<string> logfiles = SplitString(argv[10], ':');
1081 
1082  SetLogSyslogLevel(syslog_level);
1083  SetLogSyslogFacility(syslog_facility);
1084  if ((logfiles.size() > 0) && (logfiles[0] != ""))
1085  SetLogDebugFile(logfiles[0] + ".cachemgr");
1086  if (logfiles.size() > 1)
1087  SetLogMicroSyslog(logfiles[1]);
1088 
1089  if (!foreground)
1090  Daemonize();
1091 
1092  UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
1093  assert(watchdog.IsValid());
1094  watchdog->Spawn("./stacktrace.cachemgr");
1095 
1096  // Initialize pipe, open non-blocking as cvmfs is not yet connected
1097  const int fd_lockfile_fifo =
1098  LockFile(shared_manager.workspace_dir_ + "/lock_cachemgr.fifo");
1099  if (fd_lockfile_fifo < 0) {
1100  LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, "could not open lock file "
1101  "%s (%d)",
1102  (shared_manager.workspace_dir_ + "/lock_cachemgr.fifo").c_str(),
1103  errno);
1104  return 1;
1105  }
1106  const string crash_guard = shared_manager.cache_dir_ + "/cachemgr.running";
1107  const bool rebuild = FileExists(crash_guard);
1108  retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1109  if (retval < 0) {
1111  "failed to create shared cache manager crash guard");
1112  UnlockFile(fd_lockfile_fifo);
1113  return 1;
1114  }
1115  close(retval);
1116 
1117  // Redirect SQlite temp directory to cache (global variable)
1118  const string tmp_dir = shared_manager.workspace_dir_;
1119  sqlite3_temp_directory =
1120  static_cast<char *>(sqlite3_malloc(tmp_dir.length() + 1));
1121  snprintf(sqlite3_temp_directory, tmp_dir.length() + 1, "%s", tmp_dir.c_str());
1122 
1123  // Cleanup leftover named pipes
1124  shared_manager.CleanupPipes();
1125 
1126  if (!shared_manager.InitDatabase(rebuild)) {
1127  UnlockFile(fd_lockfile_fifo);
1128  return 1;
1129  }
1130  shared_manager.CheckFreeSpace();
1131 
1132  // Save protocol revision to file. If the file is not found, it indicates
1133  // to the client that the cache manager is from times before the protocol
1134  // was versioned.
1135  const string protocol_revision_path =
1136  shared_manager.workspace_dir_ + "/cachemgr.protocol";
1137  retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1138  if (retval < 0) {
1140  "failed to open protocol revision file (%d)", errno);
1141  UnlockFile(fd_lockfile_fifo);
1142  return 1;
1143  }
1144  const string revision = StringifyInt(kProtocolRevision);
1145  int written = write(retval, revision.data(), revision.length());
1146  close(retval);
1147  if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1149  "failed to write protocol revision (%d)", errno);
1150  UnlockFile(fd_lockfile_fifo);
1151  return 1;
1152  }
1153 
1154  const string fifo_path = shared_manager.workspace_dir_ + "/cachemgr";
1155  shared_manager.pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1156  if (shared_manager.pipe_lru_[0] < 0) {
1157  LogCvmfs(kLogQuota, kLogDebug, "failed to listen on FIFO %s (%d)",
1158  fifo_path.c_str(), errno);
1159  UnlockFile(fd_lockfile_fifo);
1160  return 1;
1161  }
1162  Nonblock2Block(shared_manager.pipe_lru_[0]);
1163  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager listening");
1164 
1165  char buf = 'C';
1166  WritePipe(pipe_boot, &buf, 1);
1167  close(pipe_boot);
1168 
1169  ReadPipe(pipe_handshake, &buf, 1);
1170  close(pipe_handshake);
1171  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager handshake done");
1172 
1173  // Ensure that broken pipes from clients do not kill the cache manager
1174  signal(SIGPIPE, SIG_IGN);
1175  // Don't let Ctrl-C ungracefully kill interactive session
1176  signal(SIGINT, SIG_IGN);
1177 
1178  shared_manager.MainCommandServer(&shared_manager);
1179  unlink(fifo_path.c_str());
1180  unlink(protocol_revision_path.c_str());
1181  shared_manager.CloseDatabase();
1182  unlink(crash_guard.c_str());
1183  UnlockFile(fd_lockfile_fifo);
1184 
1185  if (sqlite3_temp_directory) {
1186  sqlite3_free(sqlite3_temp_directory);
1187  sqlite3_temp_directory = NULL;
1188  }
1189 
1190  return 0;
1191 }
1192 
1193 
1195  PosixQuotaManager *quota_mgr = static_cast<PosixQuotaManager *>(data);
1196 
1197  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1198  sqlite3_soft_heap_limit(quota_mgr->kSqliteMemPerThread);
1199 
1200  LruCommand command_buffer[kCommandBufferSize];
1201  char description_buffer[kCommandBufferSize*kMaxDescription];
1202  unsigned num_commands = 0;
1203 
1204  while (read(quota_mgr->pipe_lru_[0], &command_buffer[num_commands],
1205  sizeof(command_buffer[0])) == sizeof(command_buffer[0]))
1206  {
1207  const CommandType command_type = command_buffer[num_commands].command_type;
1208  LogCvmfs(kLogQuota, kLogDebug, "received command %d", command_type);
1209  const uint64_t size = command_buffer[num_commands].GetSize();
1210 
1211  // Inserts and pins come with a description (usually a path)
1212  if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1213  (command_type == kPin) || (command_type == kPinRegular))
1214  {
1215  const int desc_length = command_buffer[num_commands].desc_length;
1216  ReadPipe(quota_mgr->pipe_lru_[0],
1217  &description_buffer[kMaxDescription*num_commands], desc_length);
1218  }
1219 
1220  // The protocol revision is returned immediately
1221  if (command_type == kGetProtocolRevision) {
1222  int return_pipe =
1223  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1224  if (return_pipe < 0)
1225  continue;
1226  WritePipe(return_pipe, &quota_mgr->kProtocolRevision,
1227  sizeof(quota_mgr->kProtocolRevision));
1228  quota_mgr->UnbindReturnPipe(return_pipe);
1229  continue;
1230  }
1231 
1232  // The cleanup rate is returned immediately
1233  if (command_type == kCleanupRate) {
1234  int return_pipe =
1235  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1236  if (return_pipe < 0)
1237  continue;
1238  uint64_t period_s = size; // use the size field to transmit the period
1239  uint64_t rate = quota_mgr->cleanup_recorder_.GetNoTicks(period_s);
1240  WritePipe(return_pipe, &rate, sizeof(rate));
1241  quota_mgr->UnbindReturnPipe(return_pipe);
1242  continue;
1243  }
1244 
1245  if (command_type == kSetLimit) {
1246  const int return_pipe =
1247  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1248  if (return_pipe < 0)
1249  continue;
1250  quota_mgr->limit_ = size; // use the size field to transmit the size
1251  quota_mgr->cleanup_threshold_ = size/2;
1252  LogCvmfs(kLogQuota, kLogDebug | kLogSyslog, "Quota limit set to %lu / threshold %lu", quota_mgr->limit_, quota_mgr->cleanup_threshold_ );
1253  bool ret = true;
1254  WritePipe(return_pipe, &ret, sizeof(ret));
1255  quota_mgr->UnbindReturnPipe(return_pipe);
1256  continue;
1257  }
1258 
1259  // Reservations are handled immediately and "out of band"
1260  if (command_type == kReserve) {
1261  bool success = true;
1262  int return_pipe =
1263  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1264  if (return_pipe < 0)
1265  continue;
1266 
1267  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1268  const string hash_str(hash.ToString());
1269  LogCvmfs(kLogQuota, kLogDebug, "reserve %lu bytes for %s",
1270  size, hash_str.c_str());
1271 
1272  if (quota_mgr->pinned_chunks_.find(hash) ==
1273  quota_mgr->pinned_chunks_.end())
1274  {
1275  if ((quota_mgr->pinned_ + size) > quota_mgr->cleanup_threshold_) {
1277  "failed to insert %s (pinned), no space", hash_str.c_str());
1278  success = false;
1279  } else {
1280  quota_mgr->pinned_chunks_[hash] = size;
1281  quota_mgr->pinned_ += size;
1282  quota_mgr->CheckHighPinWatermark();
1283  }
1284  }
1285 
1286  WritePipe(return_pipe, &success, sizeof(success));
1287  quota_mgr->UnbindReturnPipe(return_pipe);
1288  continue;
1289  }
1290 
1291  // Back channels are also handled out of band
1292  if (command_type == kRegisterBackChannel) {
1293  int return_pipe =
1294  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1295  if (return_pipe < 0)
1296  continue;
1297 
1298  quota_mgr->UnlinkReturnPipe(command_buffer[num_commands].return_pipe);
1299  Block2Nonblock(return_pipe); // back channels are opportunistic
1300  shash::Md5 hash;
1301  memcpy(hash.digest, command_buffer[num_commands].digest,
1303 
1304  quota_mgr->LockBackChannels();
1305  map<shash::Md5, int>::const_iterator iter =
1306  quota_mgr->back_channels_.find(hash);
1307  if (iter != quota_mgr->back_channels_.end()) {
1309  "closing left-over back channel %s", hash.ToString().c_str());
1310  close(iter->second);
1311  }
1312  quota_mgr->back_channels_[hash] = return_pipe;
1313  quota_mgr->UnlockBackChannels();
1314 
1315  char success = 'S';
1316  WritePipe(return_pipe, &success, sizeof(success));
1317  LogCvmfs(kLogQuota, kLogDebug, "register back channel %s on fd %d",
1318  hash.ToString().c_str(), return_pipe);
1319 
1320  continue;
1321  }
1322 
1323  if (command_type == kUnregisterBackChannel) {
1324  shash::Md5 hash;
1325  memcpy(hash.digest, command_buffer[num_commands].digest,
1327 
1328  quota_mgr->LockBackChannels();
1329  map<shash::Md5, int>::iterator iter =
1330  quota_mgr->back_channels_.find(hash);
1331  if (iter != quota_mgr->back_channels_.end()) {
1333  "closing back channel %s", hash.ToString().c_str());
1334  close(iter->second);
1335  quota_mgr->back_channels_.erase(iter);
1336  } else {
1338  "did not find back channel %s", hash.ToString().c_str());
1339  }
1340  quota_mgr->UnlockBackChannels();
1341 
1342  continue;
1343  }
1344 
1345  // Unpinnings are also handled immediately with respect to the pinned gauge
1346  if (command_type == kUnpin) {
1347  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1348  const string hash_str(hash.ToString());
1349 
1350  map<shash::Any, uint64_t>::iterator iter =
1351  quota_mgr->pinned_chunks_.find(hash);
1352  if (iter != quota_mgr->pinned_chunks_.end()) {
1353  quota_mgr->pinned_ -= iter->second;
1354  quota_mgr->pinned_chunks_.erase(iter);
1355  // It can happen that files get pinned that were removed from the cache
1356  // (see cache.cc). We fix this at this point, where we remove such
1357  // entries from the cache database.
1358  if (!FileExists(quota_mgr->cache_dir_ + "/" +
1359  hash.MakePathWithoutSuffix()))
1360  {
1362  "remove orphaned pinned hash %s from cache database",
1363  hash_str.c_str());
1364  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1365  hash_str.length(), SQLITE_STATIC);
1366  int retval;
1367  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1368  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1369  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1370  hash_str.length(), SQLITE_STATIC);
1371  retval = sqlite3_step(quota_mgr->stmt_rm_);
1372  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1373  quota_mgr->gauge_ -= size;
1374  } else {
1376  "failed to delete %s (%d)", hash_str.c_str(), retval);
1377  }
1378  sqlite3_reset(quota_mgr->stmt_rm_);
1379  }
1380  sqlite3_reset(quota_mgr->stmt_size_);
1381  }
1382  } else {
1383  LogCvmfs(kLogQuota, kLogDebug, "this chunk was not pinned");
1384  }
1385  }
1386 
1387  // Immediate commands trigger flushing of the buffer
1388  bool immediate_command = (command_type == kCleanup) ||
1389  (command_type == kList) || (command_type == kListPinned) ||
1390  (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1391  (command_type == kRemove) || (command_type == kStatus) ||
1392  (command_type == kLimits) || (command_type == kPid);
1393  if (!immediate_command) num_commands++;
1394 
1395  if ((num_commands == kCommandBufferSize) || immediate_command)
1396  {
1397  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1398  description_buffer);
1399  if (!immediate_command) num_commands = 0;
1400  }
1401 
1402  if (immediate_command) {
1403  // Process cleanup, listings
1404  int return_pipe =
1405  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1406  if (return_pipe < 0) {
1407  num_commands = 0;
1408  continue;
1409  }
1410 
1411  int retval;
1412  sqlite3_stmt *this_stmt_list = NULL;
1413  switch (command_type) {
1414  case kRemove: {
1415  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1416  const string hash_str = hash.ToString();
1417  LogCvmfs(kLogQuota, kLogDebug, "manually removing %s",
1418  hash_str.c_str());
1419  bool success = false;
1420 
1421  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1422  hash_str.length(), SQLITE_STATIC);
1423  int retval;
1424  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1425  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1426  uint64_t is_pinned = sqlite3_column_int64(quota_mgr->stmt_size_, 1);
1427 
1428  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1429  hash_str.length(), SQLITE_STATIC);
1430  retval = sqlite3_step(quota_mgr->stmt_rm_);
1431  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1432  success = true;
1433  quota_mgr->gauge_ -= size;
1434  if (is_pinned) {
1435  quota_mgr->pinned_chunks_.erase(hash);
1436  quota_mgr->pinned_ -= size;
1437  }
1438  } else {
1440  "failed to delete %s (%d)", hash_str.c_str(), retval);
1441  }
1442  sqlite3_reset(quota_mgr->stmt_rm_);
1443  } else {
1444  // File does not exist
1445  success = true;
1446  }
1447  sqlite3_reset(quota_mgr->stmt_size_);
1448 
1449  WritePipe(return_pipe, &success, sizeof(success));
1450  break; }
1451  case kCleanup:
1452  retval = quota_mgr->DoCleanup(size);
1453  WritePipe(return_pipe, &retval, sizeof(retval));
1454  break;
1455  case kList:
1456  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_;
1457  case kListPinned:
1458  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_pinned_;
1459  case kListCatalogs:
1460  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_catalogs_;
1461  case kListVolatile:
1462  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_volatile_;
1463 
1464  // Pipe back the list, one by one
1465  int length;
1466  while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1467  string path = "(NULL)";
1468  if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1469  path = string(
1470  reinterpret_cast<const char *>(
1471  sqlite3_column_text(this_stmt_list, 0)));
1472  }
1473  length = path.length();
1474  WritePipe(return_pipe, &length, sizeof(length));
1475  if (length > 0)
1476  WritePipe(return_pipe, &path[0], length);
1477  }
1478  length = -1;
1479  WritePipe(return_pipe, &length, sizeof(length));
1480  sqlite3_reset(this_stmt_list);
1481  break;
1482  case kStatus:
1483  WritePipe(return_pipe, &quota_mgr->gauge_, sizeof(quota_mgr->gauge_));
1484  WritePipe(return_pipe, &quota_mgr->pinned_,
1485  sizeof(quota_mgr->pinned_));
1486  break;
1487  case kLimits:
1488  WritePipe(return_pipe, &quota_mgr->limit_, sizeof(quota_mgr->limit_));
1489  WritePipe(return_pipe, &quota_mgr->cleanup_threshold_,
1490  sizeof(quota_mgr->cleanup_threshold_));
1491  break;
1492  case kPid: {
1493  pid_t pid = getpid();
1494  WritePipe(return_pipe, &pid, sizeof(pid));
1495  break;
1496  }
1497  default:
1498  PANIC(NULL); // other types are handled by the bunch processor
1499  }
1500  quota_mgr->UnbindReturnPipe(return_pipe);
1501  num_commands = 0;
1502  }
1503  }
1504 
1505  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager (%d)", errno);
1506  close(quota_mgr->pipe_lru_[0]);
1507  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1508  description_buffer);
1509 
1510  // Unpin
1511  command_buffer[0].command_type = kTouch;
1512  for (map<shash::Any, uint64_t>::const_iterator i =
1513  quota_mgr->pinned_chunks_.begin(),
1514  iEnd = quota_mgr->pinned_chunks_.end(); i != iEnd; ++i)
1515  {
1516  command_buffer[0].StoreHash(i->first);
1517  quota_mgr->ProcessCommandBunch(1, command_buffer, description_buffer);
1518  }
1519 
1520  return NULL;
1521 }
1522 
1523 
1525  if (!shared_) {
1526  MakePipe(pipe);
1527  return;
1528  }
1529 
1530  // Create FIFO in cache directory, store path name (number) in pipe write end
1531  int i = 0;
1532  int retval;
1533  do {
1534  retval = mkfifo((workspace_dir_ + "/pipe" + StringifyInt(i)).c_str(), 0600);
1535  pipe[1] = i;
1536  i++;
1537  } while ((retval == -1) && (errno == EEXIST));
1538  assert(retval == 0);
1539 
1540  // Connect reader's end
1541  pipe[0] = open((workspace_dir_ + "/pipe" + StringifyInt(pipe[1])).c_str(),
1542  O_RDONLY | O_NONBLOCK);
1543  assert(pipe[0] >= 0);
1544  Nonblock2Block(pipe[0]);
1545 }
1546 
1547 
1549  const std::string cache_workspace,
1550  std::string *cache_dir,
1551  std::string *workspace_dir)
1552 {
1553  vector<string> dir_tokens(SplitString(cache_workspace, ':'));
1554  switch (dir_tokens.size()) {
1555  case 1:
1556  *cache_dir = *workspace_dir = dir_tokens[0];
1557  break;
1558  case 2:
1559  *cache_dir = dir_tokens[0];
1560  *workspace_dir = dir_tokens[1];
1561  break;
1562  default:
1563  PANIC(NULL);
1564  }
1565 }
1566 
1567 
1574  const shash::Any &hash,
1575  const uint64_t size,
1576  const string &description,
1577  const bool is_catalog)
1578 {
1579  assert((size > 0) || !is_catalog);
1580 
1581  const string hash_str = hash.ToString();
1582  LogCvmfs(kLogQuota, kLogDebug, "pin into lru %s, path %s",
1583  hash_str.c_str(), description.c_str());
1584 
1585  // Has to run when not yet spawned (cvmfs initialization)
1586  if (!spawned_) {
1587  // Code duplication here
1588  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1589  if (pinned_ + size > cleanup_threshold_) {
1590  LogCvmfs(kLogQuota, kLogDebug, "failed to insert %s (pinned), no space",
1591  hash_str.c_str());
1592  return false;
1593  } else {
1594  pinned_chunks_[hash] = size;
1595  pinned_ += size;
1596  CheckHighPinWatermark();
1597  }
1598  }
1599  bool exists = Contains(hash_str);
1600  if (!exists && (gauge_ + size > limit_)) {
1601  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1602  gauge_, size);
1603  int retval = DoCleanup(cleanup_threshold_);
1604  assert(retval != 0);
1605  }
1606  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1607  SQLITE_STATIC);
1608  sqlite3_bind_int64(stmt_new_, 2, size);
1609  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1610  sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1611  SQLITE_STATIC);
1612  sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1613  sqlite3_bind_int64(stmt_new_, 6, 1);
1614  int retval = sqlite3_step(stmt_new_);
1615  assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1616  sqlite3_reset(stmt_new_);
1617  if (!exists) gauge_ += size;
1618  return true;
1619  }
1620 
1621  int pipe_reserve[2];
1622  MakeReturnPipe(pipe_reserve);
1623 
1624  LruCommand cmd;
1625  cmd.command_type = kReserve;
1626  cmd.SetSize(size);
1627  cmd.StoreHash(hash);
1628  cmd.return_pipe = pipe_reserve[1];
1629  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1630  bool result;
1631  ManagedReadHalfPipe(pipe_reserve[0], &result, sizeof(result));
1632  CloseReturnPipe(pipe_reserve);
1633 
1634  if (!result) return false;
1635  DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1636 
1637  return true;
1638 }
1639 
1640 
1642  const uint64_t limit,
1643  const uint64_t cleanup_threshold,
1644  const string &cache_workspace)
1645  : shared_(false)
1646  , spawned_(false)
1647  , limit_(limit)
1648  , cleanup_threshold_(cleanup_threshold)
1649  , gauge_(0)
1650  , pinned_(0)
1651  , seq_(0)
1652  , cache_dir_() // initialized in body
1653  , workspace_dir_() // initialized in body
1654  , fd_lock_cachedb_(-1)
1655  , async_delete_(true)
1656  , cachemgr_pid_(0)
1657  , database_(NULL)
1658  , stmt_touch_(NULL)
1659  , stmt_unpin_(NULL)
1660  , stmt_block_(NULL)
1661  , stmt_unblock_(NULL)
1662  , stmt_new_(NULL)
1663  , stmt_lru_(NULL)
1664  , stmt_size_(NULL)
1665  , stmt_rm_(NULL)
1666  , stmt_rm_batch_(NULL)
1667  , stmt_list_(NULL)
1668  , stmt_list_pinned_(NULL)
1669  , stmt_list_catalogs_(NULL)
1670  , stmt_list_volatile_(NULL)
1671  , initialized_(false)
1672 {
1673  ParseDirectories(cache_workspace, &cache_dir_, &workspace_dir_);
1674  pipe_lru_[0] = pipe_lru_[1] = -1;
1675  cleanup_recorder_.AddRecorder(1, 90); // last 1.5 min with second resolution
1676  // last 1.5 h with minute resolution
1677  cleanup_recorder_.AddRecorder(60, 90*60);
1678  // last 18 hours with 20 min resolution
1679  cleanup_recorder_.AddRecorder(20*60, 60*60*18);
1680  // last 4 days with hour resolution
1681  cleanup_recorder_.AddRecorder(60*60, 60*60*24*4);
1682 }
1683 
1684 
1686  if (!initialized_) return;
1687 
1688  if (shared_) {
1689  // Most of cleanup is done elsewhen by shared cache manager
1690  close(pipe_lru_[1]);
1691  return;
1692  }
1693 
1694  if (spawned_) {
1695  char fin = 0;
1696  WritePipe(pipe_lru_[1], &fin, 1);
1697  close(pipe_lru_[1]);
1698  pthread_join(thread_lru_, NULL);
1699  } else {
1701  }
1702 
1703  CloseDatabase();
1704 }
1705 
1706 
1708  const unsigned num,
1709  const LruCommand *commands,
1710  const char *descriptions)
1711 {
1712  int retval = sqlite3_exec(database_, "BEGIN", NULL, NULL, NULL);
1713  assert(retval == SQLITE_OK);
1714 
1715  for (unsigned i = 0; i < num; ++i) {
1716  const shash::Any hash = commands[i].RetrieveHash();
1717  const string hash_str = hash.ToString();
1718  const unsigned size = commands[i].GetSize();
1719  LogCvmfs(kLogQuota, kLogDebug, "processing %s (%d)",
1720  hash_str.c_str(), commands[i].command_type);
1721 
1722  bool exists;
1723  switch (commands[i].command_type) {
1724  case kTouch:
1725  sqlite3_bind_int64(stmt_touch_, 1, seq_++);
1726  sqlite3_bind_text(stmt_touch_, 2, &hash_str[0], hash_str.length(),
1727  SQLITE_STATIC);
1728  retval = sqlite3_step(stmt_touch_);
1729  LogCvmfs(kLogQuota, kLogDebug, "touching %s (%ld): %d",
1730  hash_str.c_str(), seq_-1, retval);
1731  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1732  PANIC(kLogSyslogErr, "failed to update %s in cachedb, error %d",
1733  hash_str.c_str(), retval);
1734  }
1735  sqlite3_reset(stmt_touch_);
1736  break;
1737  case kUnpin:
1738  sqlite3_bind_text(stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1739  SQLITE_STATIC);
1740  retval = sqlite3_step(stmt_unpin_);
1741  LogCvmfs(kLogQuota, kLogDebug, "unpinning %s: %d",
1742  hash_str.c_str(), retval);
1743  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1744  PANIC(kLogSyslogErr, "failed to unpin %s in cachedb, error %d",
1745  hash_str.c_str(), retval);
1746  }
1747  sqlite3_reset(stmt_unpin_);
1748  break;
1749  case kPin:
1750  case kPinRegular:
1751  case kInsert:
1752  case kInsertVolatile:
1753  // It could already be in, check
1754  exists = Contains(hash_str);
1755 
1756  // Cleanup, move to trash and unlink
1757  if (!exists && (gauge_ + size > limit_)) {
1758  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %u",
1759  gauge_, size);
1760  retval = DoCleanup(cleanup_threshold_);
1761  assert(retval != 0);
1762  }
1763 
1764  // Insert or replace
1765  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1766  SQLITE_STATIC);
1767  sqlite3_bind_int64(stmt_new_, 2, size);
1768  if (commands[i].command_type == kInsertVolatile) {
1769  sqlite3_bind_int64(stmt_new_, 3, (seq_++) | kVolatileFlag);
1770  } else {
1771  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1772  }
1773  sqlite3_bind_text(stmt_new_, 4, &descriptions[i*kMaxDescription],
1774  commands[i].desc_length, SQLITE_STATIC);
1775  sqlite3_bind_int64(stmt_new_, 5, (commands[i].command_type == kPin) ?
1777  sqlite3_bind_int64(stmt_new_, 6,
1778  ((commands[i].command_type == kPin) ||
1779  (commands[i].command_type == kPinRegular)) ? 1 : 0);
1780  retval = sqlite3_step(stmt_new_);
1781  LogCvmfs(kLogQuota, kLogDebug, "insert or replace %s, method %d: %d",
1782  hash_str.c_str(), commands[i].command_type, retval);
1783  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1784  PANIC(kLogSyslogErr, "failed to insert %s in cachedb, error %d",
1785  hash_str.c_str(), retval);
1786  }
1787  sqlite3_reset(stmt_new_);
1788 
1789  if (!exists) gauge_ += size;
1790  break;
1791  default:
1792  // other types should have been taken care of by event loop
1793  PANIC(NULL);
1794  }
1795  }
1796 
1797  retval = sqlite3_exec(database_, "COMMIT", NULL, NULL, NULL);
1798  if (retval != SQLITE_OK) {
1799  PANIC(kLogSyslogErr, "failed to commit to cachedb, error %d", retval);
1800  }
1801 }
1802 
1803 
1805  bool result = false;
1806  string sql;
1807  sqlite3_stmt *stmt_select = NULL;
1808  sqlite3_stmt *stmt_insert = NULL;
1809  int sqlerr;
1810  int seq = 0;
1811  char hex[4];
1812  struct stat info;
1813  platform_dirent64 *d;
1814  DIR *dirp = NULL;
1815  string path;
1816 
1817  LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, "re-building cache database");
1818 
1819  // Empty cache catalog and fscache
1820  sql = "DELETE FROM cache_catalog; DELETE FROM fscache;";
1821  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1822  if (sqlerr != SQLITE_OK) {
1823  LogCvmfs(kLogQuota, kLogDebug, "could not clear cache database");
1824  goto build_return;
1825  }
1826 
1827  gauge_ = 0;
1828 
1829  // Insert files from cache sub-directories 00 - ff
1830  // TODO(jblomer): fs_traversal
1831  sqlite3_prepare_v2(database_, "INSERT INTO fscache (sha1, size, actime) "
1832  "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1833 
1834  for (int i = 0; i <= 0xff; i++) {
1835  snprintf(hex, sizeof(hex), "%02x", i);
1836  path = cache_dir_ + "/" + string(hex);
1837  if ((dirp = opendir(path.c_str())) == NULL) {
1839  "failed to open directory %s (tmpwatch interfering?)",
1840  path.c_str());
1841  goto build_return;
1842  }
1843  while ((d = platform_readdir(dirp)) != NULL) {
1844  string file_path = path + "/" + string(d->d_name);
1845  if (stat(file_path.c_str(), &info) == 0) {
1846  if (!S_ISREG(info.st_mode))
1847  continue;
1848  if (info.st_size == 0) {
1850  "removing empty file %s during automatic cache db rebuild",
1851  file_path.c_str());
1852  unlink(file_path.c_str());
1853  continue;
1854  }
1855 
1856  string hash = string(hex) + string(d->d_name);
1857  sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1858  SQLITE_STATIC);
1859  sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1860  sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1861  if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1862  LogCvmfs(kLogQuota, kLogDebug, "could not insert into temp table");
1863  goto build_return;
1864  }
1865  sqlite3_reset(stmt_insert);
1866 
1867  gauge_ += info.st_size;
1868  } else {
1869  LogCvmfs(kLogQuota, kLogDebug, "could not stat %s", file_path.c_str());
1870  }
1871  }
1872  closedir(dirp);
1873  dirp = NULL;
1874  }
1875  sqlite3_finalize(stmt_insert);
1876  stmt_insert = NULL;
1877 
1878  // Transfer from temp table in cache catalog
1879  sqlite3_prepare_v2(database_,
1880  "SELECT sha1, size FROM fscache ORDER BY actime;",
1881  -1, &stmt_select, NULL);
1882  sqlite3_prepare_v2(database_,
1883  "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1884  "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1885  -1, &stmt_insert, NULL);
1886  while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1887  const string hash = string(
1888  reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1889  sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1890  sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1891  sqlite3_bind_int64(stmt_insert, 3, seq++);
1892  // Might also be a catalog (information is lost)
1893  sqlite3_bind_int64(stmt_insert, 4, kFileRegular);
1894 
1895  int retval = sqlite3_step(stmt_insert);
1896  if (retval != SQLITE_DONE) {
1897  // If the file system hosting the cache is full, we'll likely notice here
1899  "could not insert into cache catalog (%d - %s)",
1900  retval, sqlite3_errstr(retval));
1901  goto build_return;
1902  }
1903  sqlite3_reset(stmt_insert);
1904  }
1905 
1906  // Delete temporary table
1907  sql = "DELETE FROM fscache;";
1908  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1909  if (sqlerr != SQLITE_OK) {
1910  LogCvmfs(kLogQuota, kLogDebug, "could not clear temporary table (%d)",
1911  sqlerr);
1912  goto build_return;
1913  }
1914 
1915  seq_ = seq;
1916  result = true;
1918  "rebuilding finished, sequence %" PRIu64 ", gauge %" PRIu64,
1919  seq_, gauge_);
1920 
1921  build_return:
1922  if (stmt_insert) sqlite3_finalize(stmt_insert);
1923  if (stmt_select) sqlite3_finalize(stmt_select);
1924  if (dirp) closedir(dirp);
1925  return result;
1926 }
1927 
1928 
1934  int back_channel[2],
1935  const string &channel_id)
1936 {
1937  if (protocol_revision_ >= 1) {
1938  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1939  MakeReturnPipe(back_channel);
1940 
1941  LruCommand cmd;
1943  cmd.return_pipe = back_channel[1];
1944  // Not StoreHash(). This is an MD5 hash.
1945  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1946  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1947 
1948  char success;
1949  ManagedReadHalfPipe(back_channel[0], &success, sizeof(success));
1950  // At this point, the named FIFO is unlinked, so don't use CloseReturnPipe
1951  if (success != 'S') {
1953  "failed to register quota back channel (%c)", success);
1954  }
1955  } else {
1956  // Dummy pipe to return valid file descriptors
1957  MakePipe(back_channel);
1958  }
1959 }
1960 
1961 
1966  string hash_str = hash.ToString();
1967 
1968  int pipe_remove[2];
1969  MakeReturnPipe(pipe_remove);
1970 
1971  LruCommand cmd;
1972  cmd.command_type = kRemove;
1973  cmd.return_pipe = pipe_remove[1];
1974  cmd.StoreHash(hash);
1975  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1976 
1977  bool success;
1978  ManagedReadHalfPipe(pipe_remove[0], &success, sizeof(success));
1979  CloseReturnPipe(pipe_remove);
1980 
1981  unlink((cache_dir_ + "/" + hash.MakePathWithoutSuffix()).c_str());
1982 }
1983 
1984 
1986  if (spawned_)
1987  return;
1988 
1989  if (pthread_create(&thread_lru_, NULL, MainCommandServer,
1990  static_cast<void *>(this)) != 0)
1991  {
1992  PANIC(kLogDebug, "could not create lru thread");
1993  }
1994 
1995  spawned_ = true;
1996 }
1997 
1998 
2003  LruCommand cmd;
2004  cmd.command_type = kTouch;
2005  cmd.StoreHash(hash);
2006  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
2007 }
2008 
2009 
2011  if (shared_)
2012  close(pipe_wronly);
2013 }
2014 
2015 
2017  if (shared_)
2018  unlink((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str());
2019 }
2020 
2021 
2023  LogCvmfs(kLogQuota, kLogDebug, "Unpin %s", hash.ToString().c_str());
2024 
2025  LruCommand cmd;
2026  cmd.command_type = kUnpin;
2027  cmd.StoreHash(hash);
2028  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
2029 }
2030 
2031 
2033  int back_channel[2],
2034  const string &channel_id)
2035 {
2036  if (protocol_revision_ >= 1) {
2037  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
2038 
2039  LruCommand cmd;
2041  // Not StoreHash(). This is an MD5 hash.
2042  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
2043  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
2044 
2045  // Writer's end will be closed by cache manager, FIFO is already unlinked
2046  close(back_channel[0]);
2047  } else {
2048  ClosePipe(back_channel);
2049  }
2050 }
2051 
2052 void PosixQuotaManager::ManagedReadHalfPipe(int fd, void *buf, size_t nbyte) {
2053  const unsigned timeout_ms = cachemgr_pid_ ? 1000 : 0;
2054  bool result = false;
2055  do {
2056  result = ReadHalfPipe(fd, buf, nbyte, timeout_ms);
2057  // try only as long as the cachemgr is still alive
2058  } while (!result && getpgid(cachemgr_pid_) >= 0);
2059  if (!result) {
2060  PANIC(kLogStderr, "Error: quota manager could not read from cachemanager pipe");
2061  }
2062 
2063 }
virtual uint32_t GetProtocolRevision()
Definition: quota_posix.cc:719
void SetLogSyslogFacility(const int local_facility)
Definition: logging.cc:183
sqlite3_stmt * stmt_new_
Definition: quota_posix.h:361
sqlite3_stmt * stmt_list_pinned_
Definition: quota_posix.h:367
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
Definition: statistics.cc:267
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
Definition: logging.cc:151
std::vector< std::string > DoList(const CommandType list_command)
Definition: quota_posix.cc:631
struct stat64 platform_stat64
virtual pid_t GetPid()
Definition: quota_posix.cc:697
virtual uint64_t GetCleanupRate(uint64_t period_s)
Definition: quota_posix.cc:793
sqlite3_stmt * stmt_list_
Definition: quota_posix.h:366
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
Definition: quota_posix.cc:673
virtual uint64_t GetSize()
Definition: quota_posix.cc:777
sqlite3_stmt * stmt_rm_
Definition: quota_posix.h:364
virtual bool Cleanup(const uint64_t leave_size)
Definition: quota_posix.cc:125
static const unsigned kSqliteMemPerThread
Definition: quota_posix.h:201
#define PANIC(...)
Definition: exception.h:29
virtual uint64_t GetMaxFileSize()
Definition: quota_posix.cc:692
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
uint64_t cleanup_threshold_
Definition: quota_posix.h:287
static const uint32_t kProtocolRevision
Definition: quota.h:45
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1920
void Daemonize()
Definition: posix.cc:1702
bool DoCleanup(const uint64_t leave_size)
Definition: quota_posix.cc:472
uint64_t GetSize() const
Definition: quota_posix.h:160
bool SafeWrite(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:2060
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
Definition: quota_posix.cc:811
int BindReturnPipe(int pipe_wronly)
Definition: quota_posix.cc:64
std::string cache_dir_
Definition: quota_posix.h:308
bool Contains(const std::string &hash_str)
Definition: quota_posix.cc:191
perf::MultiRecorder cleanup_recorder_
Definition: quota_posix.h:354
int platform_stat(const char *path, platform_stat64 *buf)
void SetLogMicroSyslog(const std::string &filename)
Definition: logging.cc:272
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
sqlite3 * database_
Definition: quota_posix.h:356
unsigned char digest[digest_size_]
Definition: hash.h:124
virtual bool SetLimit(uint64_t limit)
Definition: quota_posix.cc:767
virtual std::vector< std::string > ListCatalogs()
void CheckHighPinWatermark()
Definition: quota_posix.cc:82
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
Definition: quota_posix.cc:738
bool FileExists(const std::string &path)
Definition: posix.cc:802
int GetLogSyslogLevel()
Definition: logging.cc:168
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:70
int64_t String2Int64(const string &value)
Definition: string.cc:240
unsigned GetDigestSize() const
Definition: hash.h:168
std::string workspace_dir_
Definition: quota_posix.h:315
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
pthread_t thread_lru_
Definition: quota_posix.h:330
bool ReadHalfPipe(int fd, void *buf, size_t nbyte, unsigned timeout_ms)
Definition: posix.cc:525
#define GetLogDebugFile()
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
Definition: posix.cc:2119
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:308
void Nonblock2Block(int filedes)
Definition: posix.cc:647
virtual uint64_t GetSizePinned()
Definition: quota_posix.cc:785
void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
virtual uint64_t GetCapacity()
Definition: quota_posix.cc:657
void SetSize(const uint64_t new_size)
Definition: quota_posix.h:154
void LockBackChannels()
Definition: quota.h:97
shash::Any RetrieveHash() const
Definition: quota_posix.h:174
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
Definition: posix.cc:982
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
Definition: quota_posix.cc:232
sqlite3_stmt * stmt_touch_
Definition: quota_posix.h:357
string StringifyInt(const int64_t value)
Definition: string.cc:78
std::string GetLogMicroSyslog()
Definition: logging.cc:308
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
Definition: quota_posix.cc:606
sqlite3_stmt * stmt_list_volatile_
Definition: quota_posix.h:369
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
Definition: quota.h:110
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
static int MainCacheManager(int argc, char **argv)
bool EmptyTrash(const std::vector< std::string > &trash)
Definition: quota_posix.cc:566
std::map< shash::Md5, int > back_channels_
Definition: quota.h:95
static const unsigned kMaxDescription
Definition: quota_posix.h:218
bool SetSharedLimit(uint64_t limit)
Definition: quota_posix.cc:751
bool CloseAllFildes(const std::set< int > &preserve_fildes)
Definition: posix.cc:1887
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
static const unsigned kLockFileMagicNumber
Definition: quota_posix.h:196
unsigned char digest[shash::kMaxDigestSize]
Definition: quota_posix.h:138
std::string MakePathWithoutSuffix() const
Definition: hash.h:335
sqlite3_stmt * stmt_list_catalogs_
Definition: quota_posix.h:368
bool ExecAsDaemon(const std::vector< std::string > &command_line, pid_t *child_pid)
Definition: posix.cc:1639
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
Definition: quota_posix.cc:264
uint64_t GetNoTicks(uint32_t retrospect_s) const
Definition: statistics.cc:272
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
platform_dirent64 * platform_readdir(DIR *dirp)
void SetCacheMgrPid(pid_t pid_)
Definition: quota_posix.h:89
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:510
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
Definition: posix.cc:658
std::map< shash::Any, uint64_t > pinned_chunks_
Definition: quota_posix.h:320
sqlite3_stmt * stmt_unpin_
Definition: quota_posix.h:358
void CloseReturnPipe(int pipe[2])
Definition: quota_posix.cc:181
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
virtual void Spawn()
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
const unsigned kDigestSizes[]
Definition: hash.h:69
void UnlockBackChannels()
Definition: quota.h:101
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
Definition: quota_posix.h:166
static const uint64_t kVolatileFlag
Definition: quota_posix.h:231
void UnlockFile(const int filedes)
Definition: posix.cc:1006
sqlite3_stmt * stmt_size_
Definition: quota_posix.h:363
struct dirent64 platform_dirent64
int GetLogSyslogFacility()
Definition: logging.cc:214
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528