CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quota_posix.cc
Go to the documentation of this file.
1 
15 #define __STDC_LIMIT_MACROS
16 #define __STDC_FORMAT_MACROS
17 
18 #include "cvmfs_config.h"
19 #include "quota_posix.h"
20 
21 #include <dirent.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <stdint.h>
28 #include <sys/dir.h>
29 #include <sys/stat.h>
30 #ifndef __APPLE__
31 #include <sys/statfs.h>
32 #endif
33 #include <sys/statvfs.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 #include <cassert>
39 #include <cstdio>
40 #include <cstdlib>
41 #include <cstring>
42 
43 #include <map>
44 #include <set>
45 #include <string>
46 #include <vector>
47 
48 #include "crypto/hash.h"
49 #include "duplex_sqlite3.h"
50 #include "monitor.h"
51 #include "statistics.h"
52 #include "util/concurrency.h"
53 #include "util/exception.h"
54 #include "util/logging.h"
55 #include "util/platform.h"
56 #include "util/pointer.h"
57 #include "util/posix.h"
58 #include "util/smalloc.h"
59 #include "util/string.h"
60 
61 using namespace std; // NOLINT
62 
63 
64 int PosixQuotaManager::BindReturnPipe(int pipe_wronly) {
65  if (!shared_)
66  return pipe_wronly;
67 
68  // Connect writer's end
69  int result =
70  open((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str(),
71  O_WRONLY | O_NONBLOCK);
72  if (result >= 0) {
73  Nonblock2Block(result);
74  } else {
76  "failed to bind return pipe (%d)", errno);
77  }
78  return result;
79 }
80 
81 
83  const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
84  if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
86  "high watermark of pinned files (%" PRIu64 "M > %" PRIu64 "M)",
87  pinned_/(1024*1024), watermark/(1024*1024));
88  BroadcastBackchannels("R"); // clients: please release pinned catalogs
89  }
90 }
91 
92 
94  DIR *dirp = opendir(workspace_dir_.c_str());
95  assert(dirp != NULL);
96 
97  platform_dirent64 *dent;
98  bool found_leftovers = false;
99  while ((dent = platform_readdir(dirp)) != NULL) {
100  const string name = dent->d_name;
101  const string path = workspace_dir_ + "/" + name;
102  platform_stat64 info;
103  int retval = platform_stat(path.c_str(), &info);
104  if (retval != 0)
105  continue;
106  if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) == "pipe")) {
107  if (!found_leftovers) {
109  "removing left-over FIFOs from cache directory");
110  }
111  found_leftovers = true;
112  unlink(path.c_str());
113  }
114  }
115  closedir(dirp);
116 }
117 
118 
125 bool PosixQuotaManager::Cleanup(const uint64_t leave_size) {
126  if (!spawned_)
127  return DoCleanup(leave_size);
128 
129  bool result;
130  int pipe_cleanup[2];
131  MakeReturnPipe(pipe_cleanup);
132 
133  LruCommand cmd;
134  cmd.command_type = kCleanup;
135  cmd.size = leave_size;
136  cmd.return_pipe = pipe_cleanup[1];
137 
138  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
139  ManagedReadHalfPipe(pipe_cleanup[0], &result, sizeof(result));
140  CloseReturnPipe(pipe_cleanup);
141 
142  return result;
143 }
144 
145 
147  if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
148  if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
149  if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
150  if (stmt_list_) sqlite3_finalize(stmt_list_);
151  if (stmt_lru_) sqlite3_finalize(stmt_lru_);
152  if (stmt_rm_) sqlite3_finalize(stmt_rm_);
153  if (stmt_size_) sqlite3_finalize(stmt_size_);
154  if (stmt_touch_) sqlite3_finalize(stmt_touch_);
155  if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
156  if (stmt_block_) sqlite3_finalize(stmt_block_);
157  if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
158  if (stmt_new_) sqlite3_finalize(stmt_new_);
159  if (database_) sqlite3_close(database_);
160  UnlockFile(fd_lock_cachedb_);
161 
162  stmt_list_catalogs_ = NULL;
163  stmt_list_pinned_ = NULL;
164  stmt_list_volatile_ = NULL;
165  stmt_list_ = NULL;
166  stmt_rm_ = NULL;
167  stmt_size_ = NULL;
168  stmt_touch_ = NULL;
169  stmt_unpin_ = NULL;
170  stmt_block_ = NULL;
171  stmt_unblock_ = NULL;
172  stmt_new_ = NULL;
173  database_ = NULL;
174 
175  pinned_chunks_.clear();
176 }
177 
178 
180  if (shared_) {
181  close(pipe[0]);
182  UnlinkReturnPipe(pipe[1]);
183  } else {
184  ClosePipe(pipe);
185  }
186 }
187 
188 
189 bool PosixQuotaManager::Contains(const string &hash_str) {
190  bool result = false;
191 
192  sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
193  SQLITE_STATIC);
194  if (sqlite3_step(stmt_size_) == SQLITE_ROW)
195  result = true;
196  sqlite3_reset(stmt_size_);
197  LogCvmfs(kLogQuota, kLogDebug, "contains %s returns %d",
198  hash_str.c_str(), result);
199 
200  return result;
201 }
202 
203 
205  if ((limit_ == 0) || (gauge_ >= limit_))
206  return;
207 
208  struct statvfs vfs_info;
209  int retval = statvfs((cache_dir_ + "/cachedb").c_str(), &vfs_info);
210  if (retval != 0) {
212  "failed to query %s for free space (%d)",
213  cache_dir_.c_str(), errno);
214  return;
215  }
216  int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
217  LogCvmfs(kLogQuota, kLogDebug, "free space: %" PRId64 " MB",
218  free_space_byte / (1024 * 1024));
219 
220  int64_t required_byte = limit_ - gauge_;
221  if (free_space_byte < required_byte) {
223  "too little free space on the file system hosting the cache,"
224  " %" PRId64 " MB available",
225  free_space_byte / (1024 * 1024));
226  }
227 }
228 
229 
231  const string &cache_workspace,
232  const uint64_t limit,
233  const uint64_t cleanup_threshold,
234  const bool rebuild_database)
235 {
236  if (cleanup_threshold >= limit) {
237  LogCvmfs(kLogQuota, kLogDebug, "invalid parameters: limit %" PRIu64 ", "
238  "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
239  return NULL;
240  }
241 
242  PosixQuotaManager *quota_manager =
243  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
244 
245  // Initialize cache catalog
246  if (!quota_manager->InitDatabase(rebuild_database)) {
247  delete quota_manager;
248  return NULL;
249  }
250  quota_manager->CheckFreeSpace();
251  MakePipe(quota_manager->pipe_lru_);
252 
253  quota_manager->protocol_revision_ = kProtocolRevision;
254  quota_manager->initialized_ = true;
255  return quota_manager;
256 }
257 
258 
263  const std::string &exe_path,
264  const std::string &cache_workspace,
265  const uint64_t limit,
266  const uint64_t cleanup_threshold,
267  bool foreground)
268 {
269  string cache_dir;
270  string workspace_dir;
271  ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
272 
273  pid_t new_cachemgr_pid;
274 
275  // Create lock file: only one fuse client at a time
276  const int fd_lockfile = LockFile(workspace_dir + "/lock_cachemgr");
277  if (fd_lockfile < 0) {
278  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
279  (workspace_dir + "/lock_cachemgr").c_str(), errno);
280  return NULL;
281  }
282 
283  PosixQuotaManager *quota_mgr =
284  new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
285  quota_mgr->shared_ = true;
286  quota_mgr->spawned_ = true;
287 
288  // Try to connect to pipe
289  const string fifo_path = workspace_dir + "/cachemgr";
290  LogCvmfs(kLogQuota, kLogDebug, "trying to connect to existing pipe");
291  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
292  if (quota_mgr->pipe_lru_[1] >= 0) {
293  const int fd_lockfile_rw = open((workspace_dir + "/lock_cachemgr").c_str(), O_RDWR, 0600);
294  unsigned lockfile_magicnumber = 0;
295  const ssize_t result_mn = SafeRead(fd_lockfile_rw, &lockfile_magicnumber, sizeof(lockfile_magicnumber));
296  const ssize_t result = SafeRead(fd_lockfile_rw, &new_cachemgr_pid, sizeof(new_cachemgr_pid));
297  close(fd_lockfile_rw);
298 
299  if ((lockfile_magicnumber != kLockFileMagicNumber) || (result < 0) || (result_mn < 0)
300  || (static_cast<size_t>(result) < sizeof(new_cachemgr_pid))) {
301  if (result != 0) {
303  "could not read cache manager pid from lockfile");
304  UnlockFile(fd_lockfile);
305  delete quota_mgr;
306  return NULL;
307  } else {
308  // support reload from old versions of the cache manager
309  // lock file is empty in this case, try a plain ReadHalfPipe to get pid
310  quota_mgr->SetCacheMgrPid(quota_mgr->GetPid());
311  }
312  } else {
313  quota_mgr->SetCacheMgrPid(new_cachemgr_pid);
314  }
315 
316 
317  LogCvmfs(kLogQuota, kLogDebug, "connected to existing cache manager pipe");
318  quota_mgr->initialized_ = true;
319  Nonblock2Block(quota_mgr->pipe_lru_[1]);
320  UnlockFile(fd_lockfile);
321  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
323  "received limit %" PRIu64 ", threshold %" PRIu64,
324  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
325  if (FileExists(workspace_dir + "/cachemgr.protocol")) {
326  quota_mgr->protocol_revision_ = quota_mgr->GetProtocolRevision();
327  LogCvmfs(kLogQuota, kLogDebug, "connected protocol revision %u",
328  quota_mgr->protocol_revision_);
329  } else {
330  LogCvmfs(kLogQuota, kLogDebug, "connected to ancient cache manager");
331  }
332  return quota_mgr;
333  }
334  const int connect_error = errno;
335 
336  // Lock file: let existing cache manager finish first
337  const int fd_lockfile_fifo = LockFile(workspace_dir + "/lock_cachemgr.fifo");
338  if (fd_lockfile_fifo < 0) {
339  LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
340  (workspace_dir + "/lock_cachemgr.fifo").c_str(), errno);
341  UnlockFile(fd_lockfile);
342  delete quota_mgr;
343  return NULL;
344  }
345  UnlockFile(fd_lockfile_fifo);
346 
347  if (connect_error == ENXIO) {
348  LogCvmfs(kLogQuota, kLogDebug, "left-over FIFO found, unlinking");
349  unlink(fifo_path.c_str());
350  }
351 
352  // Creating a new FIFO for the cache manager (to be bound later)
353  int retval = mkfifo(fifo_path.c_str(), 0600);
354  if (retval != 0) {
355  LogCvmfs(kLogQuota, kLogDebug, "failed to create cache manager FIFO (%d)",
356  errno);
357  UnlockFile(fd_lockfile);
358  delete quota_mgr;
359  return NULL;
360  }
361 
362  // Create new cache manager
363  int pipe_boot[2];
364  int pipe_handshake[2];
365  MakePipe(pipe_boot);
366  MakePipe(pipe_handshake);
367 
368  vector<string> command_line;
369  command_line.push_back(exe_path);
370  command_line.push_back("__cachemgr__");
371  command_line.push_back(cache_workspace);
372  command_line.push_back(StringifyInt(pipe_boot[1]));
373  command_line.push_back(StringifyInt(pipe_handshake[0]));
374  command_line.push_back(StringifyInt(limit));
375  command_line.push_back(StringifyInt(cleanup_threshold));
376  // do not propagate foreground in order to reliably get pid from exec
377  // instead, daemonize right here
378  command_line.push_back(StringifyInt(true)); //foreground
379  command_line.push_back(StringifyInt(GetLogSyslogLevel()));
380  command_line.push_back(StringifyInt(GetLogSyslogFacility()));
381  command_line.push_back(GetLogDebugFile() + ":" + GetLogMicroSyslog());
382 
383  set<int> preserve_filedes;
384  preserve_filedes.insert(0);
385  preserve_filedes.insert(1);
386  preserve_filedes.insert(2);
387  preserve_filedes.insert(pipe_boot[1]);
388  preserve_filedes.insert(pipe_handshake[0]);
389 
390  if (foreground) {
391  retval = ManagedExec(command_line, preserve_filedes, map<int, int>(),
392  /*drop_credentials*/ false,
393  /*clear_env*/ false,
394  /*double_fork*/ true,
395  &new_cachemgr_pid);
396  } else {
397  retval = ExecAsDaemon(command_line, &new_cachemgr_pid);
398  }
399  if (!retval) {
400  UnlockFile(fd_lockfile);
401  ClosePipe(pipe_boot);
402  ClosePipe(pipe_handshake);
403  delete quota_mgr;
404  LogCvmfs(kLogQuota, kLogDebug, "failed to start cache manager");
405  return NULL;
406  }
407  LogCvmfs(kLogQuota, kLogDebug, "new cache manager pid: %d", new_cachemgr_pid);
408  quota_mgr->SetCacheMgrPid(new_cachemgr_pid);
409  const int fd_lockfile_rw = open((workspace_dir + "/lock_cachemgr").c_str(), O_RDWR | O_TRUNC, 0600);
410  const unsigned magic_number = PosixQuotaManager::kLockFileMagicNumber;
411  const bool result_mn = SafeWrite(fd_lockfile_rw, &magic_number, sizeof(magic_number));
412  const bool result = SafeWrite(fd_lockfile_rw, &new_cachemgr_pid, sizeof(new_cachemgr_pid));
413  if (!result || !result_mn) {
414  PANIC(kLogSyslogErr, "could not write cache manager pid to lockfile");
415  }
416 
417  close(fd_lockfile_rw);
418  // Wait for cache manager to be ready
419  close(pipe_boot[1]);
420  close(pipe_handshake[0]);
421  char buf;
422  if (read(pipe_boot[0], &buf, 1) != 1) {
423  UnlockFile(fd_lockfile);
424  close(pipe_boot[0]);
425  close(pipe_handshake[1]);
426  delete quota_mgr;
428  "cache manager did not start");
429  return NULL;
430  }
431  close(pipe_boot[0]);
432 
433  // Connect write end
434  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
435  if (quota_mgr->pipe_lru_[1] < 0) {
437  "failed to connect to newly created FIFO (%d)", errno);
438  close(pipe_handshake[1]);
439  UnlockFile(fd_lockfile);
440  delete quota_mgr;
441  return NULL;
442  }
443 
444  // Finalize handshake
445  buf = 'C';
446  if (write(pipe_handshake[1], &buf, 1) != 1) {
447  UnlockFile(fd_lockfile);
448  close(pipe_handshake[1]);
449  LogCvmfs(kLogQuota, kLogDebug, "could not finalize handshake");
450  delete quota_mgr;
451  return NULL;
452  }
453  close(pipe_handshake[1]);
454 
455  Nonblock2Block(quota_mgr->pipe_lru_[1]);
456  LogCvmfs(kLogQuota, kLogDebug, "connected to a new cache manager");
457  quota_mgr->protocol_revision_ = kProtocolRevision;
458 
459  UnlockFile(fd_lockfile);
460 
461  quota_mgr->initialized_ = true;
462  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
463  LogCvmfs(kLogQuota, kLogDebug, "received limit %" PRIu64 ", "
464  "threshold %" PRIu64,
465  quota_mgr->limit_, quota_mgr->cleanup_threshold_);
466  return quota_mgr;
467 }
468 
469 
470 bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
471  if (gauge_ <= leave_size)
472  return true;
473 
474  // TODO(jblomer) transaction
476  "clean up cache until at most %lu KB is used", leave_size/1024);
477  LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_);
478  cleanup_recorder_.Tick();
479 
480  bool result;
481  string hash_str;
482  vector<string> trash;
483 
484  do {
485  sqlite3_reset(stmt_lru_);
486  if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
487  LogCvmfs(kLogQuota, kLogDebug, "could not get lru-entry");
488  break;
489  }
490 
491  hash_str = string(reinterpret_cast<const char *>(
492  sqlite3_column_text(stmt_lru_, 0)));
493  LogCvmfs(kLogQuota, kLogDebug, "removing %s", hash_str.c_str());
494  shash::Any hash = shash::MkFromHexPtr(shash::HexPtr(hash_str));
495 
496  // That's a critical condition. We must not delete a not yet inserted
497  // pinned file as it is already reserved (but will be inserted later).
498  // Instead, set the pin bit in the db to not run into an endless loop
499  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
500  trash.push_back(cache_dir_ + "/" + hash.MakePathWithoutSuffix());
501  gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
502  LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64,
503  hash_str.c_str(), gauge_);
504 
505  sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
506  SQLITE_STATIC);
507  result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
508  sqlite3_reset(stmt_rm_);
509 
510  if (!result) {
512  "failed to find %s in cache database (%d). "
513  "Cache database is out of sync. "
514  "Restart cvmfs with clean cache.", hash_str.c_str(), result);
515  return false;
516  }
517  } else {
518  sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
519  SQLITE_STATIC);
520  result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
521  sqlite3_reset(stmt_block_);
522  assert(result);
523  }
524  } while (gauge_ > leave_size);
525 
526  result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
527  sqlite3_reset(stmt_unblock_);
528  assert(result);
529 
530  // Double fork avoids zombie, forked removal process must not flush file
531  // buffers
532  if (!trash.empty()) {
533  if (async_delete_) {
534  pid_t pid;
535  int statloc;
536  if ((pid = fork()) == 0) {
537  // TODO(jblomer): eviciting files in the cache should perhaps become a
538  // thread. This would also allow to block the chunks and prevent the
539  // race with re-insertion. Then again, a thread can block umount.
540 #ifndef DEBUGMSG
541  CloseAllFildes(std::set<int>());
542 #endif
543  if (fork() == 0) {
544  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
545  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
546  unlink(trash[i].c_str());
547  }
548  _exit(0);
549  }
550  _exit(0);
551  } else {
552  if (pid > 0)
553  waitpid(pid, &statloc, 0);
554  else
555  return false;
556  }
557  } else { // !async_delete_
558  for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
559  LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
560  unlink(trash[i].c_str());
561  }
562  }
563  }
564 
565  if (gauge_ > leave_size) {
567  "request to clean until %" PRIu64 ", "
568  "but effective gauge is %" PRIu64, leave_size, gauge_);
569  return false;
570  }
571  return true;
572 }
573 
574 
576  const shash::Any &hash,
577  const uint64_t size,
578  const string &description,
579  const CommandType command_type)
580 {
581  const string hash_str = hash.ToString();
582  LogCvmfs(kLogQuota, kLogDebug, "insert into lru %s, path %s, method %d",
583  hash_str.c_str(), description.c_str(), command_type);
584  const unsigned desc_length = (description.length() > kMaxDescription) ?
585  kMaxDescription : description.length();
586 
587  LruCommand *cmd =
588  reinterpret_cast<LruCommand *>(alloca(sizeof(LruCommand) + desc_length));
589  new (cmd) LruCommand;
590  cmd->command_type = command_type;
591  cmd->SetSize(size);
592  cmd->StoreHash(hash);
593  cmd->desc_length = desc_length;
594  memcpy(reinterpret_cast<char *>(cmd)+sizeof(LruCommand),
595  &description[0], desc_length);
596  WritePipe(pipe_lru_[1], cmd, sizeof(LruCommand) + desc_length);
597 }
598 
599 
600 vector<string> PosixQuotaManager::DoList(const CommandType list_command) {
601  vector<string> result;
602 
603  int pipe_list[2];
604  MakeReturnPipe(pipe_list);
605  char description_buffer[kMaxDescription];
606 
607  LruCommand cmd;
608  cmd.command_type = list_command;
609  cmd.return_pipe = pipe_list[1];
610  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
611 
612  int length;
613  do {
614  ManagedReadHalfPipe(pipe_list[0], &length, sizeof(length));
615  if (length > 0) {
616  ReadPipe(pipe_list[0], description_buffer, length);
617  result.push_back(string(description_buffer, length));
618  }
619  } while (length >= 0);
620 
621  CloseReturnPipe(pipe_list);
622  return result;
623 }
624 
625 
627  if (limit_ != (uint64_t)(-1))
628  return limit_;
629 
630  // Unrestricted cache, look at free space on cache dir fs
631  struct statfs info;
632  if (statfs(".", &info) == 0) {
633  return info.f_bavail * info.f_bsize;
634  } else {
636  "failed to query file system info of cache (%d)", errno);
637  return limit_;
638  }
639 }
640 
641 
642 void PosixQuotaManager::GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
643 {
644  int pipe_limits[2];
645  MakeReturnPipe(pipe_limits);
646 
647  LruCommand cmd;
648  cmd.command_type = kLimits;
649  cmd.return_pipe = pipe_limits[1];
650  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
651  ManagedReadHalfPipe(pipe_limits[0], limit, sizeof(*limit));
652  ReadPipe(pipe_limits[0], cleanup_threshold, sizeof(*cleanup_threshold));
653  CloseReturnPipe(pipe_limits);
654 }
655 
656 
662  return limit_ - cleanup_threshold_;
663 }
664 
665 
667  if (!shared_ || !spawned_) {
668  return getpid();
669  }
670  if (cachemgr_pid_) {
671  return cachemgr_pid_;
672  }
673 
674  pid_t result;
675  int pipe_pid[2];
676  MakeReturnPipe(pipe_pid);
677 
678  LruCommand cmd;
679  cmd.command_type = kPid;
680  cmd.return_pipe = pipe_pid[1];
681  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
682  ReadHalfPipe(pipe_pid[0], &result, sizeof(result));
683  CloseReturnPipe(pipe_pid);
684  return result;
685 }
686 
687 
689  int pipe_revision[2];
690  MakeReturnPipe(pipe_revision);
691 
692  LruCommand cmd;
693  cmd.command_type = kGetProtocolRevision;
694  cmd.return_pipe = pipe_revision[1];
695  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
696 
697  uint32_t revision;
698  ManagedReadHalfPipe(pipe_revision[0], &revision, sizeof(revision));
699  CloseReturnPipe(pipe_revision);
700  return revision;
701 }
702 
703 
707 void PosixQuotaManager::GetSharedStatus(uint64_t *gauge, uint64_t *pinned) {
708  int pipe_status[2];
709  MakeReturnPipe(pipe_status);
710 
711  LruCommand cmd;
712  cmd.command_type = kStatus;
713  cmd.return_pipe = pipe_status[1];
714  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
715  ManagedReadHalfPipe(pipe_status[0], gauge, sizeof(*gauge));
716  ReadPipe(pipe_status[0], pinned, sizeof(*pinned));
717  CloseReturnPipe(pipe_status);
718 }
719 
720 
722  if (!spawned_) return gauge_;
723  uint64_t gauge, size_pinned;
724  GetSharedStatus(&gauge, &size_pinned);
725  return gauge;
726 }
727 
728 
730  if (!spawned_) return pinned_;
731  uint64_t gauge, size_pinned;
732  GetSharedStatus(&gauge, &size_pinned);
733  return size_pinned;
734 }
735 
736 
737 uint64_t PosixQuotaManager::GetCleanupRate(uint64_t period_s) {
738  if (!spawned_ || (protocol_revision_ < 2)) return 0;
739  uint64_t cleanup_rate;
740 
741  int pipe_cleanup_rate[2];
742  MakeReturnPipe(pipe_cleanup_rate);
743  LruCommand cmd;
744  cmd.command_type = kCleanupRate;
745  cmd.size = period_s;
746  cmd.return_pipe = pipe_cleanup_rate[1];
747  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
748  ManagedReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate, sizeof(cleanup_rate));
749  CloseReturnPipe(pipe_cleanup_rate);
750 
751  return cleanup_rate;
752 }
753 
754 
755 bool PosixQuotaManager::InitDatabase(const bool rebuild_database) {
756  string sql;
757  sqlite3_stmt *stmt;
758 
759  fd_lock_cachedb_ = LockFile(workspace_dir_ + "/lock_cachedb");
760  if (fd_lock_cachedb_ < 0) {
761  LogCvmfs(kLogQuota, kLogDebug, "failed to create cachedb lock");
762  return false;
763  }
764 
765  bool retry = false;
766  const string db_file = cache_dir_ + "/cachedb";
767  if (rebuild_database) {
768  LogCvmfs(kLogQuota, kLogDebug, "rebuild database, unlinking existing (%s)",
769  db_file.c_str());
770  unlink(db_file.c_str());
771  unlink((db_file + "-journal").c_str());
772  }
773 
774  init_recover:
775  int err = sqlite3_open(db_file.c_str(), &database_);
776  if (err != SQLITE_OK) {
777  LogCvmfs(kLogQuota, kLogDebug, "could not open cache database (%d)", err);
778  goto init_database_fail;
779  }
780  // TODO(reneme): make this a `QuotaDatabase : public sqlite::Database`
781  sql = "PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
782  "PRAGMA auto_vacuum=1; "
783  "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
784  " acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
785  "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
786  "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
787  " ON cache_catalog (acseq); "
788  "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
789  "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
790  "CREATE INDEX idx_fscache_actime ON fscache (actime); "
791  "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
792  " CONSTRAINT pk_properties PRIMARY KEY(key));";
793  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
794  if (err != SQLITE_OK) {
795  if (!retry) {
796  retry = true;
797  sqlite3_close(database_);
798  unlink(db_file.c_str());
799  unlink((db_file + "-journal").c_str());
801  "LRU database corrupted, re-building");
802  goto init_recover;
803  }
804  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
805  sql.c_str());
806  goto init_database_fail;
807  }
808 
809  // If this an old cache catalog,
810  // add and initialize new columns to cache_catalog
811  sql = "ALTER TABLE cache_catalog ADD type INTEGER; "
812  "ALTER TABLE cache_catalog ADD pinned INTEGER";
813  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
814  if (err == SQLITE_OK) {
815  sql = "UPDATE cache_catalog SET type=" + StringifyInt(kFileRegular) + ";";
816  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
817  if (err != SQLITE_OK) {
819  "could not init cache database (failed: %s)", sql.c_str());
820  goto init_database_fail;
821  }
822  }
823 
824  // Set pinned back
825  sql = "UPDATE cache_catalog SET pinned=0;";
826  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
827  if (err != SQLITE_OK) {
828  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
829  sql.c_str());
830  goto init_database_fail;
831  }
832 
833  // Set schema version
834  sql = "INSERT OR REPLACE INTO properties (key, value) "
835  "VALUES ('schema', '1.0')";
836  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
837  if (err != SQLITE_OK) {
838  LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
839  sql.c_str());
840  goto init_database_fail;
841  }
842 
843  // If cache catalog is empty, recreate from file system
844  sql = "SELECT count(*) FROM cache_catalog;";
845  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
846  if (sqlite3_step(stmt) == SQLITE_ROW) {
847  if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
849  "CernVM-FS: building lru cache database...");
850  if (!RebuildDatabase()) {
852  "could not build cache database from file system");
853  sqlite3_finalize(stmt);
854  goto init_database_fail;
855  }
856  }
857  sqlite3_finalize(stmt);
858  } else {
859  LogCvmfs(kLogQuota, kLogDebug, "could not select on cache catalog");
860  sqlite3_finalize(stmt);
861  goto init_database_fail;
862  }
863 
864  // How many bytes do we already have in cache?
865  sql = "SELECT sum(size) FROM cache_catalog;";
866  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
867  if (sqlite3_step(stmt) == SQLITE_ROW) {
868  gauge_ = sqlite3_column_int64(stmt, 0);
869  } else {
870  LogCvmfs(kLogQuota, kLogDebug, "could not determine cache size");
871  sqlite3_finalize(stmt);
872  goto init_database_fail;
873  }
874  sqlite3_finalize(stmt);
875 
876  // Highest seq-no?
877  sql = "SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
878  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
879  if (sqlite3_step(stmt) == SQLITE_ROW) {
880  seq_ = sqlite3_column_int64(stmt, 0)+1;
881  } else {
882  LogCvmfs(kLogQuota, kLogDebug, "could not determine highest seq-no");
883  sqlite3_finalize(stmt);
884  goto init_database_fail;
885  }
886  sqlite3_finalize(stmt);
887 
888  // Prepare touch, new, remove statements
889  sqlite3_prepare_v2(database_,
890  "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
891  "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
892  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=0 "
893  "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
894  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=2 "
895  "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
896  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=1 "
897  "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
898  sqlite3_prepare_v2(database_,
899  "INSERT OR REPLACE INTO cache_catalog "
900  "(sha1, size, acseq, path, type, pinned) "
901  "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
902  -1, &stmt_new_, NULL);
903  sqlite3_prepare_v2(database_,
904  "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
905  -1, &stmt_size_, NULL);
906  sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;",
907  -1, &stmt_rm_, NULL);
908  sqlite3_prepare_v2(database_,
909  "SELECT sha1, size FROM cache_catalog WHERE "
910  "acseq=(SELECT min(acseq) "
911  "FROM cache_catalog WHERE pinned<>2);",
912  -1, &stmt_lru_, NULL);
913  sqlite3_prepare_v2(database_,
914  ("SELECT path FROM cache_catalog WHERE type=" +
915  StringifyInt(kFileRegular) +
916  ";").c_str(), -1, &stmt_list_, NULL);
917  sqlite3_prepare_v2(database_,
918  "SELECT path FROM cache_catalog WHERE pinned<>0;",
919  -1, &stmt_list_pinned_, NULL);
920  sqlite3_prepare_v2(database_,
921  "SELECT path FROM cache_catalog WHERE acseq < 0;",
922  -1, &stmt_list_volatile_, NULL);
923  sqlite3_prepare_v2(database_,
924  ("SELECT path FROM cache_catalog WHERE type=" +
925  StringifyInt(kFileCatalog) +
926  ";").c_str(), -1, &stmt_list_catalogs_, NULL);
927  return true;
928 
929  init_database_fail:
930  sqlite3_close(database_);
931  database_ = NULL;
932  UnlockFile(fd_lock_cachedb_);
933  return false;
934 }
935 
936 
942  const shash::Any &any_hash,
943  const uint64_t size,
944  const string &description)
945 {
946  DoInsert(any_hash, size, description, kInsert);
947 }
948 
949 
956  const shash::Any &any_hash,
957  const uint64_t size,
958  const string &description)
959 {
960  DoInsert(any_hash, size, description, kInsertVolatile);
961 }
962 
963 
967 vector<string> PosixQuotaManager::List() {
968  return DoList(kList);
969 }
970 
971 
976  return DoList(kListPinned);
977 }
978 
979 
984  return DoList(kListCatalogs);
985 }
986 
987 
992  return DoList(kListVolatile);
993 }
994 
995 
999 int PosixQuotaManager::MainCacheManager(int argc, char **argv) {
1000 
1001  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1002  int retval;
1003 
1004  PosixQuotaManager shared_manager(0, 0, "");
1005  shared_manager.shared_ = true;
1006  shared_manager.spawned_ = true;
1007  shared_manager.pinned_ = 0;
1008 
1009  // Process command line arguments
1010  ParseDirectories(string(argv[2]),
1011  &shared_manager.cache_dir_,
1012  &shared_manager.workspace_dir_);
1013  int pipe_boot = String2Int64(argv[3]);
1014  int pipe_handshake = String2Int64(argv[4]);
1015  shared_manager.limit_ = String2Int64(argv[5]);
1016  shared_manager.cleanup_threshold_ = String2Int64(argv[6]);
1017  int foreground = String2Int64(argv[7]);
1018  int syslog_level = String2Int64(argv[8]);
1019  int syslog_facility = String2Int64(argv[9]);
1020  vector<string> logfiles = SplitString(argv[10], ':');
1021 
1022  SetLogSyslogLevel(syslog_level);
1023  SetLogSyslogFacility(syslog_facility);
1024  if ((logfiles.size() > 0) && (logfiles[0] != ""))
1025  SetLogDebugFile(logfiles[0] + ".cachemgr");
1026  if (logfiles.size() > 1)
1027  SetLogMicroSyslog(logfiles[1]);
1028 
1029  if (!foreground)
1030  Daemonize();
1031 
1032  UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
1033  assert(watchdog.IsValid());
1034  watchdog->Spawn("./stacktrace.cachemgr");
1035 
1036  // Initialize pipe, open non-blocking as cvmfs is not yet connected
1037  const int fd_lockfile_fifo =
1038  LockFile(shared_manager.workspace_dir_ + "/lock_cachemgr.fifo");
1039  if (fd_lockfile_fifo < 0) {
1040  LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, "could not open lock file "
1041  "%s (%d)",
1042  (shared_manager.workspace_dir_ + "/lock_cachemgr.fifo").c_str(),
1043  errno);
1044  return 1;
1045  }
1046  const string crash_guard = shared_manager.cache_dir_ + "/cachemgr.running";
1047  const bool rebuild = FileExists(crash_guard);
1048  retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1049  if (retval < 0) {
1051  "failed to create shared cache manager crash guard");
1052  UnlockFile(fd_lockfile_fifo);
1053  return 1;
1054  }
1055  close(retval);
1056 
1057  // Redirect SQlite temp directory to cache (global variable)
1058  const string tmp_dir = shared_manager.workspace_dir_;
1059  sqlite3_temp_directory =
1060  static_cast<char *>(sqlite3_malloc(tmp_dir.length() + 1));
1061  snprintf(sqlite3_temp_directory, tmp_dir.length() + 1, "%s", tmp_dir.c_str());
1062 
1063  // Cleanup leftover named pipes
1064  shared_manager.CleanupPipes();
1065 
1066  if (!shared_manager.InitDatabase(rebuild)) {
1067  UnlockFile(fd_lockfile_fifo);
1068  return 1;
1069  }
1070  shared_manager.CheckFreeSpace();
1071 
1072  // Save protocol revision to file. If the file is not found, it indicates
1073  // to the client that the cache manager is from times before the protocol
1074  // was versioned.
1075  const string protocol_revision_path =
1076  shared_manager.workspace_dir_ + "/cachemgr.protocol";
1077  retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1078  if (retval < 0) {
1080  "failed to open protocol revision file (%d)", errno);
1081  UnlockFile(fd_lockfile_fifo);
1082  return 1;
1083  }
1084  const string revision = StringifyInt(kProtocolRevision);
1085  int written = write(retval, revision.data(), revision.length());
1086  close(retval);
1087  if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1089  "failed to write protocol revision (%d)", errno);
1090  UnlockFile(fd_lockfile_fifo);
1091  return 1;
1092  }
1093 
1094  const string fifo_path = shared_manager.workspace_dir_ + "/cachemgr";
1095  shared_manager.pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1096  if (shared_manager.pipe_lru_[0] < 0) {
1097  LogCvmfs(kLogQuota, kLogDebug, "failed to listen on FIFO %s (%d)",
1098  fifo_path.c_str(), errno);
1099  UnlockFile(fd_lockfile_fifo);
1100  return 1;
1101  }
1102  Nonblock2Block(shared_manager.pipe_lru_[0]);
1103  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager listening");
1104 
1105  char buf = 'C';
1106  WritePipe(pipe_boot, &buf, 1);
1107  close(pipe_boot);
1108 
1109  ReadPipe(pipe_handshake, &buf, 1);
1110  close(pipe_handshake);
1111  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager handshake done");
1112 
1113  // Ensure that broken pipes from clients do not kill the cache manager
1114  signal(SIGPIPE, SIG_IGN);
1115  // Don't let Ctrl-C ungracefully kill interactive session
1116  signal(SIGINT, SIG_IGN);
1117 
1118  shared_manager.MainCommandServer(&shared_manager);
1119  unlink(fifo_path.c_str());
1120  unlink(protocol_revision_path.c_str());
1121  shared_manager.CloseDatabase();
1122  unlink(crash_guard.c_str());
1123  UnlockFile(fd_lockfile_fifo);
1124 
1125  if (sqlite3_temp_directory) {
1126  sqlite3_free(sqlite3_temp_directory);
1127  sqlite3_temp_directory = NULL;
1128  }
1129 
1130  return 0;
1131 }
1132 
1133 
1135  PosixQuotaManager *quota_mgr = static_cast<PosixQuotaManager *>(data);
1136 
1137  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1138  sqlite3_soft_heap_limit(quota_mgr->kSqliteMemPerThread);
1139 
1140  LruCommand command_buffer[kCommandBufferSize];
1141  char description_buffer[kCommandBufferSize*kMaxDescription];
1142  unsigned num_commands = 0;
1143 
1144  while (read(quota_mgr->pipe_lru_[0], &command_buffer[num_commands],
1145  sizeof(command_buffer[0])) == sizeof(command_buffer[0]))
1146  {
1147  const CommandType command_type = command_buffer[num_commands].command_type;
1148  LogCvmfs(kLogQuota, kLogDebug, "received command %d", command_type);
1149  const uint64_t size = command_buffer[num_commands].GetSize();
1150 
1151  // Inserts and pins come with a description (usually a path)
1152  if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1153  (command_type == kPin) || (command_type == kPinRegular))
1154  {
1155  const int desc_length = command_buffer[num_commands].desc_length;
1156  ReadPipe(quota_mgr->pipe_lru_[0],
1157  &description_buffer[kMaxDescription*num_commands], desc_length);
1158  }
1159 
1160  // The protocol revision is returned immediately
1161  if (command_type == kGetProtocolRevision) {
1162  int return_pipe =
1163  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1164  if (return_pipe < 0)
1165  continue;
1166  WritePipe(return_pipe, &quota_mgr->kProtocolRevision,
1167  sizeof(quota_mgr->kProtocolRevision));
1168  quota_mgr->UnbindReturnPipe(return_pipe);
1169  continue;
1170  }
1171 
1172  // The cleanup rate is returned immediately
1173  if (command_type == kCleanupRate) {
1174  int return_pipe =
1175  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1176  if (return_pipe < 0)
1177  continue;
1178  uint64_t period_s = size; // use the size field to transmit the period
1179  uint64_t rate = quota_mgr->cleanup_recorder_.GetNoTicks(period_s);
1180  WritePipe(return_pipe, &rate, sizeof(rate));
1181  quota_mgr->UnbindReturnPipe(return_pipe);
1182  continue;
1183  }
1184 
1185  // Reservations are handled immediately and "out of band"
1186  if (command_type == kReserve) {
1187  bool success = true;
1188  int return_pipe =
1189  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1190  if (return_pipe < 0)
1191  continue;
1192 
1193  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1194  const string hash_str(hash.ToString());
1195  LogCvmfs(kLogQuota, kLogDebug, "reserve %lu bytes for %s",
1196  size, hash_str.c_str());
1197 
1198  if (quota_mgr->pinned_chunks_.find(hash) ==
1199  quota_mgr->pinned_chunks_.end())
1200  {
1201  if ((quota_mgr->pinned_ + size) > quota_mgr->cleanup_threshold_) {
1203  "failed to insert %s (pinned), no space", hash_str.c_str());
1204  success = false;
1205  } else {
1206  quota_mgr->pinned_chunks_[hash] = size;
1207  quota_mgr->pinned_ += size;
1208  quota_mgr->CheckHighPinWatermark();
1209  }
1210  }
1211 
1212  WritePipe(return_pipe, &success, sizeof(success));
1213  quota_mgr->UnbindReturnPipe(return_pipe);
1214  continue;
1215  }
1216 
1217  // Back channels are also handled out of band
1218  if (command_type == kRegisterBackChannel) {
1219  int return_pipe =
1220  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1221  if (return_pipe < 0)
1222  continue;
1223 
1224  quota_mgr->UnlinkReturnPipe(command_buffer[num_commands].return_pipe);
1225  Block2Nonblock(return_pipe); // back channels are opportunistic
1226  shash::Md5 hash;
1227  memcpy(hash.digest, command_buffer[num_commands].digest,
1229 
1230  quota_mgr->LockBackChannels();
1231  map<shash::Md5, int>::const_iterator iter =
1232  quota_mgr->back_channels_.find(hash);
1233  if (iter != quota_mgr->back_channels_.end()) {
1235  "closing left-over back channel %s", hash.ToString().c_str());
1236  close(iter->second);
1237  }
1238  quota_mgr->back_channels_[hash] = return_pipe;
1239  quota_mgr->UnlockBackChannels();
1240 
1241  char success = 'S';
1242  WritePipe(return_pipe, &success, sizeof(success));
1243  LogCvmfs(kLogQuota, kLogDebug, "register back channel %s on fd %d",
1244  hash.ToString().c_str(), return_pipe);
1245 
1246  continue;
1247  }
1248 
1249  if (command_type == kUnregisterBackChannel) {
1250  shash::Md5 hash;
1251  memcpy(hash.digest, command_buffer[num_commands].digest,
1253 
1254  quota_mgr->LockBackChannels();
1255  map<shash::Md5, int>::iterator iter =
1256  quota_mgr->back_channels_.find(hash);
1257  if (iter != quota_mgr->back_channels_.end()) {
1259  "closing back channel %s", hash.ToString().c_str());
1260  close(iter->second);
1261  quota_mgr->back_channels_.erase(iter);
1262  } else {
1264  "did not find back channel %s", hash.ToString().c_str());
1265  }
1266  quota_mgr->UnlockBackChannels();
1267 
1268  continue;
1269  }
1270 
1271  // Unpinnings are also handled immediately with respect to the pinned gauge
1272  if (command_type == kUnpin) {
1273  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1274  const string hash_str(hash.ToString());
1275 
1276  map<shash::Any, uint64_t>::iterator iter =
1277  quota_mgr->pinned_chunks_.find(hash);
1278  if (iter != quota_mgr->pinned_chunks_.end()) {
1279  quota_mgr->pinned_ -= iter->second;
1280  quota_mgr->pinned_chunks_.erase(iter);
1281  // It can happen that files get pinned that were removed from the cache
1282  // (see cache.cc). We fix this at this point, where we remove such
1283  // entries from the cache database.
1284  if (!FileExists(quota_mgr->cache_dir_ + "/" +
1285  hash.MakePathWithoutSuffix()))
1286  {
1288  "remove orphaned pinned hash %s from cache database",
1289  hash_str.c_str());
1290  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1291  hash_str.length(), SQLITE_STATIC);
1292  int retval;
1293  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1294  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1295  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1296  hash_str.length(), SQLITE_STATIC);
1297  retval = sqlite3_step(quota_mgr->stmt_rm_);
1298  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1299  quota_mgr->gauge_ -= size;
1300  } else {
1302  "failed to delete %s (%d)", hash_str.c_str(), retval);
1303  }
1304  sqlite3_reset(quota_mgr->stmt_rm_);
1305  }
1306  sqlite3_reset(quota_mgr->stmt_size_);
1307  }
1308  } else {
1309  LogCvmfs(kLogQuota, kLogDebug, "this chunk was not pinned");
1310  }
1311  }
1312 
1313  // Immediate commands trigger flushing of the buffer
1314  bool immediate_command = (command_type == kCleanup) ||
1315  (command_type == kList) || (command_type == kListPinned) ||
1316  (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1317  (command_type == kRemove) || (command_type == kStatus) ||
1318  (command_type == kLimits) || (command_type == kPid);
1319  if (!immediate_command) num_commands++;
1320 
1321  if ((num_commands == kCommandBufferSize) || immediate_command)
1322  {
1323  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1324  description_buffer);
1325  if (!immediate_command) num_commands = 0;
1326  }
1327 
1328  if (immediate_command) {
1329  // Process cleanup, listings
1330  int return_pipe =
1331  quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1332  if (return_pipe < 0) {
1333  num_commands = 0;
1334  continue;
1335  }
1336 
1337  int retval;
1338  sqlite3_stmt *this_stmt_list = NULL;
1339  switch (command_type) {
1340  case kRemove: {
1341  const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1342  const string hash_str = hash.ToString();
1343  LogCvmfs(kLogQuota, kLogDebug, "manually removing %s",
1344  hash_str.c_str());
1345  bool success = false;
1346 
1347  sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1348  hash_str.length(), SQLITE_STATIC);
1349  int retval;
1350  if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1351  uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1352  uint64_t is_pinned = sqlite3_column_int64(quota_mgr->stmt_size_, 1);
1353 
1354  sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1355  hash_str.length(), SQLITE_STATIC);
1356  retval = sqlite3_step(quota_mgr->stmt_rm_);
1357  if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1358  success = true;
1359  quota_mgr->gauge_ -= size;
1360  if (is_pinned) {
1361  quota_mgr->pinned_chunks_.erase(hash);
1362  quota_mgr->pinned_ -= size;
1363  }
1364  } else {
1366  "failed to delete %s (%d)", hash_str.c_str(), retval);
1367  }
1368  sqlite3_reset(quota_mgr->stmt_rm_);
1369  } else {
1370  // File does not exist
1371  success = true;
1372  }
1373  sqlite3_reset(quota_mgr->stmt_size_);
1374 
1375  WritePipe(return_pipe, &success, sizeof(success));
1376  break; }
1377  case kCleanup:
1378  retval = quota_mgr->DoCleanup(size);
1379  WritePipe(return_pipe, &retval, sizeof(retval));
1380  break;
1381  case kList:
1382  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_;
1383  case kListPinned:
1384  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_pinned_;
1385  case kListCatalogs:
1386  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_catalogs_;
1387  case kListVolatile:
1388  if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_volatile_;
1389 
1390  // Pipe back the list, one by one
1391  int length;
1392  while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1393  string path = "(NULL)";
1394  if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1395  path = string(
1396  reinterpret_cast<const char *>(
1397  sqlite3_column_text(this_stmt_list, 0)));
1398  }
1399  length = path.length();
1400  WritePipe(return_pipe, &length, sizeof(length));
1401  if (length > 0)
1402  WritePipe(return_pipe, &path[0], length);
1403  }
1404  length = -1;
1405  WritePipe(return_pipe, &length, sizeof(length));
1406  sqlite3_reset(this_stmt_list);
1407  break;
1408  case kStatus:
1409  WritePipe(return_pipe, &quota_mgr->gauge_, sizeof(quota_mgr->gauge_));
1410  WritePipe(return_pipe, &quota_mgr->pinned_,
1411  sizeof(quota_mgr->pinned_));
1412  break;
1413  case kLimits:
1414  WritePipe(return_pipe, &quota_mgr->limit_, sizeof(quota_mgr->limit_));
1415  WritePipe(return_pipe, &quota_mgr->cleanup_threshold_,
1416  sizeof(quota_mgr->cleanup_threshold_));
1417  break;
1418  case kPid: {
1419  pid_t pid = getpid();
1420  WritePipe(return_pipe, &pid, sizeof(pid));
1421  break;
1422  }
1423  default:
1424  PANIC(NULL); // other types are handled by the bunch processor
1425  }
1426  quota_mgr->UnbindReturnPipe(return_pipe);
1427  num_commands = 0;
1428  }
1429  }
1430 
1431  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager (%d)", errno);
1432  close(quota_mgr->pipe_lru_[0]);
1433  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1434  description_buffer);
1435 
1436  // Unpin
1437  command_buffer[0].command_type = kTouch;
1438  for (map<shash::Any, uint64_t>::const_iterator i =
1439  quota_mgr->pinned_chunks_.begin(),
1440  iEnd = quota_mgr->pinned_chunks_.end(); i != iEnd; ++i)
1441  {
1442  command_buffer[0].StoreHash(i->first);
1443  quota_mgr->ProcessCommandBunch(1, command_buffer, description_buffer);
1444  }
1445 
1446  return NULL;
1447 }
1448 
1449 
1451  if (!shared_) {
1452  MakePipe(pipe);
1453  return;
1454  }
1455 
1456  // Create FIFO in cache directory, store path name (number) in pipe write end
1457  int i = 0;
1458  int retval;
1459  do {
1460  retval = mkfifo((workspace_dir_ + "/pipe" + StringifyInt(i)).c_str(), 0600);
1461  pipe[1] = i;
1462  i++;
1463  } while ((retval == -1) && (errno == EEXIST));
1464  assert(retval == 0);
1465 
1466  // Connect reader's end
1467  pipe[0] = open((workspace_dir_ + "/pipe" + StringifyInt(pipe[1])).c_str(),
1468  O_RDONLY | O_NONBLOCK);
1469  assert(pipe[0] >= 0);
1470  Nonblock2Block(pipe[0]);
1471 }
1472 
1473 
1475  const std::string cache_workspace,
1476  std::string *cache_dir,
1477  std::string *workspace_dir)
1478 {
1479  vector<string> dir_tokens(SplitString(cache_workspace, ':'));
1480  switch (dir_tokens.size()) {
1481  case 1:
1482  *cache_dir = *workspace_dir = dir_tokens[0];
1483  break;
1484  case 2:
1485  *cache_dir = dir_tokens[0];
1486  *workspace_dir = dir_tokens[1];
1487  break;
1488  default:
1489  PANIC(NULL);
1490  }
1491 }
1492 
1493 
1500  const shash::Any &hash,
1501  const uint64_t size,
1502  const string &description,
1503  const bool is_catalog)
1504 {
1505  assert((size > 0) || !is_catalog);
1506 
1507  const string hash_str = hash.ToString();
1508  LogCvmfs(kLogQuota, kLogDebug, "pin into lru %s, path %s",
1509  hash_str.c_str(), description.c_str());
1510 
1511  // Has to run when not yet spawned (cvmfs initialization)
1512  if (!spawned_) {
1513  // Code duplication here
1514  if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1515  if (pinned_ + size > cleanup_threshold_) {
1516  LogCvmfs(kLogQuota, kLogDebug, "failed to insert %s (pinned), no space",
1517  hash_str.c_str());
1518  return false;
1519  } else {
1520  pinned_chunks_[hash] = size;
1521  pinned_ += size;
1522  CheckHighPinWatermark();
1523  }
1524  }
1525  bool exists = Contains(hash_str);
1526  if (!exists && (gauge_ + size > limit_)) {
1527  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1528  gauge_, size);
1529  int retval = DoCleanup(cleanup_threshold_);
1530  assert(retval != 0);
1531  }
1532  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1533  SQLITE_STATIC);
1534  sqlite3_bind_int64(stmt_new_, 2, size);
1535  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1536  sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1537  SQLITE_STATIC);
1538  sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1539  sqlite3_bind_int64(stmt_new_, 6, 1);
1540  int retval = sqlite3_step(stmt_new_);
1541  assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1542  sqlite3_reset(stmt_new_);
1543  if (!exists) gauge_ += size;
1544  return true;
1545  }
1546 
1547  int pipe_reserve[2];
1548  MakeReturnPipe(pipe_reserve);
1549 
1550  LruCommand cmd;
1551  cmd.command_type = kReserve;
1552  cmd.SetSize(size);
1553  cmd.StoreHash(hash);
1554  cmd.return_pipe = pipe_reserve[1];
1555  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1556  bool result;
1557  ManagedReadHalfPipe(pipe_reserve[0], &result, sizeof(result));
1558  CloseReturnPipe(pipe_reserve);
1559 
1560  if (!result) return false;
1561  DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1562 
1563  return true;
1564 }
1565 
1566 
1568  const uint64_t limit,
1569  const uint64_t cleanup_threshold,
1570  const string &cache_workspace)
1571  : shared_(false)
1572  , spawned_(false)
1573  , limit_(limit)
1574  , cleanup_threshold_(cleanup_threshold)
1575  , gauge_(0)
1576  , pinned_(0)
1577  , seq_(0)
1578  , cache_dir_() // initialized in body
1579  , workspace_dir_() // initialized in body
1580  , fd_lock_cachedb_(-1)
1581  , async_delete_(true)
1582  , cachemgr_pid_(0)
1583  , database_(NULL)
1584  , stmt_touch_(NULL)
1585  , stmt_unpin_(NULL)
1586  , stmt_block_(NULL)
1587  , stmt_unblock_(NULL)
1588  , stmt_new_(NULL)
1589  , stmt_lru_(NULL)
1590  , stmt_size_(NULL)
1591  , stmt_rm_(NULL)
1592  , stmt_list_(NULL)
1593  , stmt_list_pinned_(NULL)
1594  , stmt_list_catalogs_(NULL)
1595  , stmt_list_volatile_(NULL)
1596  , initialized_(false)
1597 {
1598  ParseDirectories(cache_workspace, &cache_dir_, &workspace_dir_);
1599  pipe_lru_[0] = pipe_lru_[1] = -1;
1600  cleanup_recorder_.AddRecorder(1, 90); // last 1.5 min with second resolution
1601  // last 1.5 h with minute resolution
1602  cleanup_recorder_.AddRecorder(60, 90*60);
1603  // last 18 hours with 20 min resolution
1604  cleanup_recorder_.AddRecorder(20*60, 60*60*18);
1605  // last 4 days with hour resolution
1606  cleanup_recorder_.AddRecorder(60*60, 60*60*24*4);
1607 }
1608 
1609 
1611  if (!initialized_) return;
1612 
1613  if (shared_) {
1614  // Most of cleanup is done elsewhen by shared cache manager
1615  close(pipe_lru_[1]);
1616  return;
1617  }
1618 
1619  if (spawned_) {
1620  char fin = 0;
1621  WritePipe(pipe_lru_[1], &fin, 1);
1622  close(pipe_lru_[1]);
1623  pthread_join(thread_lru_, NULL);
1624  } else {
1626  }
1627 
1628  CloseDatabase();
1629 }
1630 
1631 
1633  const unsigned num,
1634  const LruCommand *commands,
1635  const char *descriptions)
1636 {
1637  int retval = sqlite3_exec(database_, "BEGIN", NULL, NULL, NULL);
1638  assert(retval == SQLITE_OK);
1639 
1640  for (unsigned i = 0; i < num; ++i) {
1641  const shash::Any hash = commands[i].RetrieveHash();
1642  const string hash_str = hash.ToString();
1643  const unsigned size = commands[i].GetSize();
1644  LogCvmfs(kLogQuota, kLogDebug, "processing %s (%d)",
1645  hash_str.c_str(), commands[i].command_type);
1646 
1647  bool exists;
1648  switch (commands[i].command_type) {
1649  case kTouch:
1650  sqlite3_bind_int64(stmt_touch_, 1, seq_++);
1651  sqlite3_bind_text(stmt_touch_, 2, &hash_str[0], hash_str.length(),
1652  SQLITE_STATIC);
1653  retval = sqlite3_step(stmt_touch_);
1654  LogCvmfs(kLogQuota, kLogDebug, "touching %s (%ld): %d",
1655  hash_str.c_str(), seq_-1, retval);
1656  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1657  PANIC(kLogSyslogErr, "failed to update %s in cachedb, error %d",
1658  hash_str.c_str(), retval);
1659  }
1660  sqlite3_reset(stmt_touch_);
1661  break;
1662  case kUnpin:
1663  sqlite3_bind_text(stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1664  SQLITE_STATIC);
1665  retval = sqlite3_step(stmt_unpin_);
1666  LogCvmfs(kLogQuota, kLogDebug, "unpinning %s: %d",
1667  hash_str.c_str(), retval);
1668  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1669  PANIC(kLogSyslogErr, "failed to unpin %s in cachedb, error %d",
1670  hash_str.c_str(), retval);
1671  }
1672  sqlite3_reset(stmt_unpin_);
1673  break;
1674  case kPin:
1675  case kPinRegular:
1676  case kInsert:
1677  case kInsertVolatile:
1678  // It could already be in, check
1679  exists = Contains(hash_str);
1680 
1681  // Cleanup, move to trash and unlink
1682  if (!exists && (gauge_ + size > limit_)) {
1683  LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %u",
1684  gauge_, size);
1685  retval = DoCleanup(cleanup_threshold_);
1686  assert(retval != 0);
1687  }
1688 
1689  // Insert or replace
1690  sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1691  SQLITE_STATIC);
1692  sqlite3_bind_int64(stmt_new_, 2, size);
1693  if (commands[i].command_type == kInsertVolatile) {
1694  sqlite3_bind_int64(stmt_new_, 3, (seq_++) | kVolatileFlag);
1695  } else {
1696  sqlite3_bind_int64(stmt_new_, 3, seq_++);
1697  }
1698  sqlite3_bind_text(stmt_new_, 4, &descriptions[i*kMaxDescription],
1699  commands[i].desc_length, SQLITE_STATIC);
1700  sqlite3_bind_int64(stmt_new_, 5, (commands[i].command_type == kPin) ?
1702  sqlite3_bind_int64(stmt_new_, 6,
1703  ((commands[i].command_type == kPin) ||
1704  (commands[i].command_type == kPinRegular)) ? 1 : 0);
1705  retval = sqlite3_step(stmt_new_);
1706  LogCvmfs(kLogQuota, kLogDebug, "insert or replace %s, method %d: %d",
1707  hash_str.c_str(), commands[i].command_type, retval);
1708  if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1709  PANIC(kLogSyslogErr, "failed to insert %s in cachedb, error %d",
1710  hash_str.c_str(), retval);
1711  }
1712  sqlite3_reset(stmt_new_);
1713 
1714  if (!exists) gauge_ += size;
1715  break;
1716  default:
1717  // other types should have been taken care of by event loop
1718  PANIC(NULL);
1719  }
1720  }
1721 
1722  retval = sqlite3_exec(database_, "COMMIT", NULL, NULL, NULL);
1723  if (retval != SQLITE_OK) {
1724  PANIC(kLogSyslogErr, "failed to commit to cachedb, error %d", retval);
1725  }
1726 }
1727 
1728 
1730  bool result = false;
1731  string sql;
1732  sqlite3_stmt *stmt_select = NULL;
1733  sqlite3_stmt *stmt_insert = NULL;
1734  int sqlerr;
1735  int seq = 0;
1736  char hex[4];
1737  struct stat info;
1738  platform_dirent64 *d;
1739  DIR *dirp = NULL;
1740  string path;
1741 
1742  LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, "re-building cache database");
1743 
1744  // Empty cache catalog and fscache
1745  sql = "DELETE FROM cache_catalog; DELETE FROM fscache;";
1746  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1747  if (sqlerr != SQLITE_OK) {
1748  LogCvmfs(kLogQuota, kLogDebug, "could not clear cache database");
1749  goto build_return;
1750  }
1751 
1752  gauge_ = 0;
1753 
1754  // Insert files from cache sub-directories 00 - ff
1755  // TODO(jblomer): fs_traversal
1756  sqlite3_prepare_v2(database_, "INSERT INTO fscache (sha1, size, actime) "
1757  "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1758 
1759  for (int i = 0; i <= 0xff; i++) {
1760  snprintf(hex, sizeof(hex), "%02x", i);
1761  path = cache_dir_ + "/" + string(hex);
1762  if ((dirp = opendir(path.c_str())) == NULL) {
1764  "failed to open directory %s (tmpwatch interfering?)",
1765  path.c_str());
1766  goto build_return;
1767  }
1768  while ((d = platform_readdir(dirp)) != NULL) {
1769  string file_path = path + "/" + string(d->d_name);
1770  if (stat(file_path.c_str(), &info) == 0) {
1771  if (!S_ISREG(info.st_mode))
1772  continue;
1773  if (info.st_size == 0) {
1775  "removing empty file %s during automatic cache db rebuild",
1776  file_path.c_str());
1777  unlink(file_path.c_str());
1778  continue;
1779  }
1780 
1781  string hash = string(hex) + string(d->d_name);
1782  sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1783  SQLITE_STATIC);
1784  sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1785  sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1786  if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1787  LogCvmfs(kLogQuota, kLogDebug, "could not insert into temp table");
1788  goto build_return;
1789  }
1790  sqlite3_reset(stmt_insert);
1791 
1792  gauge_ += info.st_size;
1793  } else {
1794  LogCvmfs(kLogQuota, kLogDebug, "could not stat %s", file_path.c_str());
1795  }
1796  }
1797  closedir(dirp);
1798  dirp = NULL;
1799  }
1800  sqlite3_finalize(stmt_insert);
1801  stmt_insert = NULL;
1802 
1803  // Transfer from temp table in cache catalog
1804  sqlite3_prepare_v2(database_,
1805  "SELECT sha1, size FROM fscache ORDER BY actime;",
1806  -1, &stmt_select, NULL);
1807  sqlite3_prepare_v2(database_,
1808  "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1809  "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1810  -1, &stmt_insert, NULL);
1811  while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1812  const string hash = string(
1813  reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1814  sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1815  sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1816  sqlite3_bind_int64(stmt_insert, 3, seq++);
1817  // Might also be a catalog (information is lost)
1818  sqlite3_bind_int64(stmt_insert, 4, kFileRegular);
1819 
1820  int retval = sqlite3_step(stmt_insert);
1821  if (retval != SQLITE_DONE) {
1822  // If the file system hosting the cache is full, we'll likely notice here
1824  "could not insert into cache catalog (%d - %s)",
1825  retval, sqlite3_errstr(retval));
1826  goto build_return;
1827  }
1828  sqlite3_reset(stmt_insert);
1829  }
1830 
1831  // Delete temporary table
1832  sql = "DELETE FROM fscache;";
1833  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1834  if (sqlerr != SQLITE_OK) {
1835  LogCvmfs(kLogQuota, kLogDebug, "could not clear temporary table (%d)",
1836  sqlerr);
1837  goto build_return;
1838  }
1839 
1840  seq_ = seq;
1841  result = true;
1843  "rebuilding finished, sequence %" PRIu64 ", gauge %" PRIu64,
1844  seq_, gauge_);
1845 
1846  build_return:
1847  if (stmt_insert) sqlite3_finalize(stmt_insert);
1848  if (stmt_select) sqlite3_finalize(stmt_select);
1849  if (dirp) closedir(dirp);
1850  return result;
1851 }
1852 
1853 
1859  int back_channel[2],
1860  const string &channel_id)
1861 {
1862  if (protocol_revision_ >= 1) {
1863  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1864  MakeReturnPipe(back_channel);
1865 
1866  LruCommand cmd;
1868  cmd.return_pipe = back_channel[1];
1869  // Not StoreHash(). This is an MD5 hash.
1870  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1871  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1872 
1873  char success;
1874  ManagedReadHalfPipe(back_channel[0], &success, sizeof(success));
1875  // At this point, the named FIFO is unlinked, so don't use CloseReturnPipe
1876  if (success != 'S') {
1878  "failed to register quota back channel (%c)", success);
1879  }
1880  } else {
1881  // Dummy pipe to return valid file descriptors
1882  MakePipe(back_channel);
1883  }
1884 }
1885 
1886 
1891  string hash_str = hash.ToString();
1892 
1893  int pipe_remove[2];
1894  MakeReturnPipe(pipe_remove);
1895 
1896  LruCommand cmd;
1897  cmd.command_type = kRemove;
1898  cmd.return_pipe = pipe_remove[1];
1899  cmd.StoreHash(hash);
1900  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1901 
1902  bool success;
1903  ManagedReadHalfPipe(pipe_remove[0], &success, sizeof(success));
1904  CloseReturnPipe(pipe_remove);
1905 
1906  unlink((cache_dir_ + "/" + hash.MakePathWithoutSuffix()).c_str());
1907 }
1908 
1909 
1911  if (spawned_)
1912  return;
1913 
1914  if (pthread_create(&thread_lru_, NULL, MainCommandServer,
1915  static_cast<void *>(this)) != 0)
1916  {
1917  PANIC(kLogDebug, "could not create lru thread");
1918  }
1919 
1920  spawned_ = true;
1921 }
1922 
1923 
1928  LruCommand cmd;
1929  cmd.command_type = kTouch;
1930  cmd.StoreHash(hash);
1931  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1932 }
1933 
1934 
1936  if (shared_)
1937  close(pipe_wronly);
1938 }
1939 
1940 
1942  if (shared_)
1943  unlink((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str());
1944 }
1945 
1946 
1948  LogCvmfs(kLogQuota, kLogDebug, "Unpin %s", hash.ToString().c_str());
1949 
1950  LruCommand cmd;
1951  cmd.command_type = kUnpin;
1952  cmd.StoreHash(hash);
1953  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1954 }
1955 
1956 
1958  int back_channel[2],
1959  const string &channel_id)
1960 {
1961  if (protocol_revision_ >= 1) {
1962  shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1963 
1964  LruCommand cmd;
1966  // Not StoreHash(). This is an MD5 hash.
1967  memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1968  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1969 
1970  // Writer's end will be closed by cache manager, FIFO is already unlinked
1971  close(back_channel[0]);
1972  } else {
1973  ClosePipe(back_channel);
1974  }
1975 }
1976 
1977 void PosixQuotaManager::ManagedReadHalfPipe(int fd, void *buf, size_t nbyte) {
1978  const unsigned timeout_ms = cachemgr_pid_ ? 1000 : 0;
1979  bool result = false;
1980  do {
1981  result = ReadHalfPipe(fd, buf, nbyte, timeout_ms);
1982  // try only as long as the cachemgr is still alive
1983  } while (!result && getpgid(cachemgr_pid_) >= 0);
1984  if (!result) {
1985  PANIC(kLogStderr, "Error: quota manager could not read from cachemanager pipe");
1986  }
1987 
1988 }
virtual uint32_t GetProtocolRevision()
Definition: quota_posix.cc:688
void SetLogSyslogFacility(const int local_facility)
Definition: logging.cc:183
sqlite3_stmt * stmt_new_
Definition: quota_posix.h:342
sqlite3_stmt * stmt_list_pinned_
Definition: quota_posix.h:347
void AddRecorder(uint32_t resolution_s, uint32_t capacity_s)
Definition: statistics.cc:267
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void SetLogSyslogLevel(const int level)
Definition: logging.cc:151
std::vector< std::string > DoList(const CommandType list_command)
Definition: quota_posix.cc:600
struct stat64 platform_stat64
virtual pid_t GetPid()
Definition: quota_posix.cc:666
virtual uint64_t GetCleanupRate(uint64_t period_s)
Definition: quota_posix.cc:737
sqlite3_stmt * stmt_list_
Definition: quota_posix.h:346
void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
Definition: quota_posix.cc:642
virtual uint64_t GetSize()
Definition: quota_posix.cc:721
sqlite3_stmt * stmt_rm_
Definition: quota_posix.h:345
virtual bool Cleanup(const uint64_t leave_size)
Definition: quota_posix.cc:125
static const unsigned kSqliteMemPerThread
Definition: quota_posix.h:189
#define PANIC(...)
Definition: exception.h:29
virtual uint64_t GetMaxFileSize()
Definition: quota_posix.cc:661
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
uint64_t cleanup_threshold_
Definition: quota_posix.h:268
static const uint32_t kProtocolRevision
Definition: quota.h:45
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1920
void Daemonize()
Definition: posix.cc:1702
bool DoCleanup(const uint64_t leave_size)
Definition: quota_posix.cc:470
uint64_t GetSize() const
Definition: quota_posix.h:158
bool SafeWrite(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:2060
static void * MainCommandServer(void *data)
virtual void Insert(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: quota_posix.cc:941
assert((mem||(size==0))&&"Out Of Memory")
bool InitDatabase(const bool rebuild_database)
Definition: quota_posix.cc:755
int BindReturnPipe(int pipe_wronly)
Definition: quota_posix.cc:64
std::string cache_dir_
Definition: quota_posix.h:289
bool Contains(const std::string &hash_str)
Definition: quota_posix.cc:189
perf::MultiRecorder cleanup_recorder_
Definition: quota_posix.h:335
int platform_stat(const char *path, platform_stat64 *buf)
void SetLogMicroSyslog(const std::string &filename)
Definition: logging.cc:272
#define SetLogDebugFile(filename)
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
sqlite3 * database_
Definition: quota_posix.h:337
unsigned char digest[digest_size_]
Definition: hash.h:124
virtual std::vector< std::string > ListCatalogs()
Definition: quota_posix.cc:983
void CheckHighPinWatermark()
Definition: quota_posix.cc:82
void GetSharedStatus(uint64_t *gauge, uint64_t *pinned)
Definition: quota_posix.cc:707
bool FileExists(const std::string &path)
Definition: posix.cc:802
int GetLogSyslogLevel()
Definition: logging.cc:168
static Watchdog * Create(FnOnCrash on_crash)
Definition: monitor.cc:70
int64_t String2Int64(const string &value)
Definition: string.cc:222
unsigned GetDigestSize() const
Definition: hash.h:168
std::string workspace_dir_
Definition: quota_posix.h:296
void MakeReturnPipe(int pipe[2])
virtual std::vector< std::string > ListPinned()
Definition: quota_posix.cc:975
pthread_t thread_lru_
Definition: quota_posix.h:311
bool ReadHalfPipe(int fd, void *buf, size_t nbyte, unsigned timeout_ms)
Definition: posix.cc:525
#define GetLogDebugFile()
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
Definition: posix.cc:2119
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
void Nonblock2Block(int filedes)
Definition: posix.cc:647
virtual uint64_t GetSizePinned()
Definition: quota_posix.cc:729
void ManagedReadHalfPipe(int fd, void *buf, size_t nbyte)
virtual bool Pin(const shash::Any &hash, const uint64_t size, const std::string &description, const bool is_catalog)
virtual std::vector< std::string > ListVolatile()
Definition: quota_posix.cc:991
virtual uint64_t GetCapacity()
Definition: quota_posix.cc:626
void SetSize(const uint64_t new_size)
Definition: quota_posix.h:152
void LockBackChannels()
Definition: quota.h:96
shash::Any RetrieveHash() const
Definition: quota_posix.h:172
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
int LockFile(const std::string &path)
Definition: posix.cc:982
void UnlinkReturnPipe(int pipe_wronly)
virtual void Remove(const shash::Any &file)
static PosixQuotaManager * Create(const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, const bool rebuild_database)
Definition: quota_posix.cc:230
sqlite3_stmt * stmt_touch_
Definition: quota_posix.h:338
string StringifyInt(const int64_t value)
Definition: string.cc:78
std::string GetLogMicroSyslog()
Definition: logging.cc:308
void DoInsert(const shash::Any &hash, const uint64_t size, const std::string &description, const CommandType command_type)
Definition: quota_posix.cc:575
sqlite3_stmt * stmt_list_volatile_
Definition: quota_posix.h:349
void ProcessCommandBunch(const unsigned num, const LruCommand *commands, const char *descriptions)
uint32_t protocol_revision_
Definition: quota.h:109
virtual void Unpin(const shash::Any &hash)
PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold, const std::string &cache_workspace)
virtual std::vector< std::string > List()
Definition: quota_posix.cc:967
static int MainCacheManager(int argc, char **argv)
Definition: quota_posix.cc:999
std::map< shash::Md5, int > back_channels_
Definition: quota.h:94
static const unsigned kMaxDescription
Definition: quota_posix.h:201
bool CloseAllFildes(const std::set< int > &preserve_fildes)
Definition: posix.cc:1887
virtual void InsertVolatile(const shash::Any &hash, const uint64_t size, const std::string &description)
Definition: quota_posix.cc:955
static const unsigned kLockFileMagicNumber
Definition: quota_posix.h:184
unsigned char digest[shash::kMaxDigestSize]
Definition: quota_posix.h:136
std::string MakePathWithoutSuffix() const
Definition: hash.h:335
sqlite3_stmt * stmt_list_catalogs_
Definition: quota_posix.h:348
bool ExecAsDaemon(const std::vector< std::string > &command_line, pid_t *child_pid)
Definition: posix.cc:1639
static PosixQuotaManager * CreateShared(const std::string &exe_path, const std::string &cache_workspace, const uint64_t limit, const uint64_t cleanup_threshold, bool foreground)
Definition: quota_posix.cc:262
uint64_t GetNoTicks(uint32_t retrospect_s) const
Definition: statistics.cc:272
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
platform_dirent64 * platform_readdir(DIR *dirp)
void SetCacheMgrPid(pid_t pid_)
Definition: quota_posix.h:88
virtual ~PosixQuotaManager()
void Spawn(const std::string &crash_dump_path)
Definition: monitor.cc:510
static void ParseDirectories(const std::string cache_workspace, std::string *cache_dir, std::string *workspace_dir)
void Block2Nonblock(int filedes)
Definition: posix.cc:658
std::map< shash::Any, uint64_t > pinned_chunks_
Definition: quota_posix.h:301
sqlite3_stmt * stmt_unpin_
Definition: quota_posix.h:339
void CloseReturnPipe(int pipe[2])
Definition: quota_posix.cc:179
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
virtual void Spawn()
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
const unsigned kDigestSizes[]
Definition: hash.h:69
void UnlockBackChannels()
Definition: quota.h:100
void UnbindReturnPipe(int pipe_wronly)
virtual void Touch(const shash::Any &hash)
void StoreHash(const shash::Any &hash)
Definition: quota_posix.h:164
static const uint64_t kVolatileFlag
Definition: quota_posix.h:214
void UnlockFile(const int filedes)
Definition: posix.cc:1006
sqlite3_stmt * stmt_size_
Definition: quota_posix.h:344
struct dirent64 platform_dirent64
int GetLogSyslogFacility()
Definition: logging.cc:214
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528