CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_posix.cc
Go to the documentation of this file.
1 
27 #define __STDC_FORMAT_MACROS
28 
29 #include "cvmfs_config.h"
30 #include "cache_posix.h"
31 
32 #include <dirent.h>
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <inttypes.h>
36 #include <pthread.h>
37 #include <sys/stat.h>
38 #include <sys/types.h>
39 #ifndef __APPLE__
40 #include <sys/statfs.h>
41 #endif
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <vector>
51 
52 #include "crypto/hash.h"
53 #include "crypto/signature.h"
54 #include "directory_entry.h"
55 #include "manifest.h"
56 #include "manifest_fetch.h"
57 #include "network/download.h"
58 #include "quota.h"
59 #include "shortstring.h"
60 #include "statistics.h"
61 #include "util/atomic.h"
62 #include "util/logging.h"
63 #include "util/platform.h"
64 #include "util/posix.h"
65 #include "util/smalloc.h"
66 
67 using namespace std; // NOLINT
68 
69 namespace {
70 
79 class CallGuard {
80  public:
82  int32_t global_drainout = atomic_read32(&global_drainout_);
83  drainout_ = (global_drainout != 0);
84  if (!drainout_)
85  atomic_inc32(&num_inflight_calls_);
86  }
88  if (!drainout_)
89  atomic_dec32(&num_inflight_calls_);
90  }
91  static void Drainout() {
92  atomic_cas32(&global_drainout_, 0, 1);
93  while (atomic_read32(&num_inflight_calls_) != 0)
94  SafeSleepMs(50);
95  }
96  private:
97  bool drainout_;
100 };
101 atomic_int32 CallGuard::num_inflight_calls_ = 0;
102 atomic_int32 CallGuard::global_drainout_ = 0;
103 
104 } // anonymous namespace
105 
106 
107 //------------------------------------------------------------------------------
108 
109 
110 const uint64_t PosixCacheManager::kBigFile = 25 * 1024 * 1024; // 25M
111 
112 
114  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
115  LogCvmfs(kLogCache, kLogDebug, "abort %s", transaction->tmp_path.c_str());
116  close(transaction->fd);
117  int result = unlink(transaction->tmp_path.c_str());
118  transaction->~Transaction();
119  atomic_dec32(&no_inflight_txns_);
120  if (result == -1)
121  return -errno;
122  return 0;
123 }
124 
125 
132  if (quota_mgr == NULL)
133  return false;
134  delete quota_mgr_;
135  quota_mgr_ = quota_mgr;
136  return true;
137 }
138 
139 
141  int retval = do_refcount_ ? fd_mgr_->Close(fd) : close(fd);
142  if (retval != 0)
143  return -errno;
144  return 0;
145 }
146 
147 
149  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
150  int result;
151  LogCvmfs(kLogCache, kLogDebug, "commit %s %s",
152  transaction->final_path.c_str(), transaction->tmp_path.c_str());
153 
154  result = Flush(transaction);
155  close(transaction->fd);
156  if (result < 0) {
157  unlink(transaction->tmp_path.c_str());
158  transaction->~Transaction();
159  atomic_dec32(&no_inflight_txns_);
160  return result;
161  }
162 
163  // To support debugging, move files into quarantine on file size mismatch
164  if (transaction->size != transaction->expected_size) {
165  // Allow size to be zero if alien cache, because hadoop-fuse-dfs returns
166  // size zero for a while
167  if ( (transaction->expected_size != kSizeUnknown) &&
168  (reports_correct_filesize_ || (transaction->size != 0)) )
169  {
171  "size check failure for %s, expected %lu, got %lu",
172  transaction->id.ToString().c_str(),
173  transaction->expected_size, transaction->size);
174  CopyPath2Path(transaction->tmp_path,
175  cache_path_ + "/quarantaine/" + transaction->id.ToString());
176  unlink(transaction->tmp_path.c_str());
177  transaction->~Transaction();
178  atomic_dec32(&no_inflight_txns_);
179  return -EIO;
180  }
181  }
182 
183  if ((transaction->label.flags & kLabelPinned) ||
184  (transaction->label.flags & kLabelCatalog))
185  {
186  bool retval = quota_mgr_->Pin(
187  transaction->id, transaction->size, transaction->label.GetDescription(),
188  (transaction->label.flags & kLabelCatalog));
189  if (!retval) {
190  LogCvmfs(kLogCache, kLogDebug, "commit failed: cannot pin %s",
191  transaction->id.ToString().c_str());
192  unlink(transaction->tmp_path.c_str());
193  transaction->~Transaction();
194  atomic_dec32(&no_inflight_txns_);
195  return -ENOSPC;
196  }
197  }
198 
199  // Move the temporary file into its final location
200  if (alien_cache_) {
201  int retval = chmod(transaction->tmp_path.c_str(), 0660);
202  assert(retval == 0);
203  }
204  result =
205  Rename(transaction->tmp_path.c_str(), transaction->final_path.c_str());
206  if (result < 0) {
207  LogCvmfs(kLogCache, kLogDebug, "commit failed: %s", strerror(errno));
208  unlink(transaction->tmp_path.c_str());
209  if ((transaction->label.flags & kLabelPinned) ||
210  (transaction->label.flags & kLabelCatalog))
211  {
212  quota_mgr_->Remove(transaction->id);
213  }
214  } else {
215  // Success, inform quota manager
216  if (transaction->label.flags & kLabelVolatile) {
217  quota_mgr_->InsertVolatile(transaction->id, transaction->size,
218  transaction->label.GetDescription());
219  } else if (!transaction->label.IsCatalog() &&
220  !transaction->label.IsPinned())
221  {
222  quota_mgr_->Insert(transaction->id, transaction->size,
223  transaction->label.GetDescription());
224  }
225  }
226  transaction->~Transaction();
227  atomic_dec32(&no_inflight_txns_);
228  return result;
229 }
230 
231 bool PosixCacheManager::InitCacheDirectory(const string &cache_path) {
232  FileSystemInfo fs_info = GetFileSystemInfo(cache_path);
233 
234  if (fs_info.type == kFsTypeTmpfs) {
235  is_tmpfs_ = true;
236  }
237 
238  if (alien_cache_) {
239  if (!MakeCacheDirectories(cache_path, 0770)) {
240  return false;
241  }
243  "Cache directory structure created.");
244  switch (fs_info.type) {
245  case kFsTypeNFS:
246  rename_workaround_ = kRenameLink;
248  "Alien cache is on NFS.");
249  break;
250  case kFsTypeBeeGFS:
251  rename_workaround_ = kRenameSamedir;
253  "Alien cache is on BeeGFS.");
254  break;
255  default:
256  break;
257  }
258  } else {
259  if (!MakeCacheDirectories(cache_path, 0700))
260  return false;
261  }
262 
263  // TODO(jblomer): we might not need to look anymore for cvmfs 2.0 relicts
264  if (FileExists(cache_path + "/cvmfscatalog.cache")) {
266  "Not mounting on cvmfs 2.0.X cache");
267  return false;
268  }
269  return true;
270 }
271 
273  const string &cache_path,
274  const bool alien_cache,
275  const RenameWorkarounds rename_workaround,
276  const bool do_refcount)
277 {
278  UniquePtr<PosixCacheManager> cache_manager(
279  new PosixCacheManager(cache_path, alien_cache, do_refcount));
280  assert(cache_manager.IsValid());
281 
282  cache_manager->rename_workaround_ = rename_workaround;
283 
284  bool result_ = cache_manager->InitCacheDirectory(cache_path);
285  if (!result_) {
286  return NULL;
287  }
288 
289  return cache_manager.Release();
290 }
291 
292 
294  const Label &label,
295  const int flags,
296  void *txn)
297 {
298  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
299  transaction->label = label;
300 }
301 
302 
304  string msg;
305  if (do_refcount_) {
306  msg = "Refcounting Posix cache manager"
307  "(cache directory: " + cache_path_ + ")\n";
308  } else {
309  msg = "Posix cache manager (cache directory: " + cache_path_ + ")\n";
310  }
311  return msg;
312 }
313 
314 
320  if (do_refcount_) {
321  SavedState *state = new SavedState();
322  state->fd_mgr = fd_mgr_->Clone();
323  return state;
324  }
325  char *c = reinterpret_cast<char *>(smalloc(1));
326  *c = kMagicNoRefcount;
327  return c;
328 }
329 
330 
332  assert(data);
333  if (do_refcount_) {
334  SavedState *state = reinterpret_cast<SavedState *>(data);
335  if (state->magic_number == kMagicRefcount) {
336  LogCvmfs(kLogCache, kLogDebug, "Restoring refcount cache manager from "
337  "refcounted posix cache manager");
338 
339  fd_mgr_->AssignFrom(state->fd_mgr.weak_ref());
340  } else {
341  LogCvmfs(kLogCache, kLogDebug, "Restoring refcount cache manager from "
342  "non-refcounted posix cache manager");
343  }
344  return -1;
345  }
346 
347  char *c = reinterpret_cast<char *>(data);
348  assert(*c == kMagicNoRefcount || *c == kMagicRefcount);
349  if (*c == kMagicRefcount) {
350  SavedState *state = reinterpret_cast<SavedState *>(data);
351  LogCvmfs(kLogCache, kLogDebug, "Restoring non-refcount cache manager from "
352  "refcounted posix cache manager - this "
353  " is not possible, keep refcounting.");
354  fd_mgr_->AssignFrom(state->fd_mgr.weak_ref());
355  do_refcount_ = true;
356  }
357  return -1;
358 }
359 
360 
362  assert(data);
363  SavedState *state = reinterpret_cast<SavedState *>(data);
364  if (state->magic_number == kMagicRefcount) {
365  delete state;
366  } else {
367  // If not refcounted, the state is the dummy SavedState
368  // of the regular posix cache manager
369  free(data);
370  }
371  return true;
372 }
373 
374 
375 
377  int new_fd = do_refcount_ ? fd_mgr_->Dup(fd) : dup(fd);
378  if (new_fd < 0)
379  return -errno;
380  return new_fd;
381 }
382 
383 
385  if (transaction->buf_pos == 0)
386  return 0;
387  int written =
388  write(transaction->fd, transaction->buffer, transaction->buf_pos);
389  if (written < 0)
390  return -errno;
391  if (static_cast<unsigned>(written) != transaction->buf_pos) {
392  transaction->buf_pos -= written;
393  return -EIO;
394  }
395  transaction->buf_pos = 0;
396  return 0;
397 }
398 
399 
401  return cache_path_ + "/" + id.MakePathWithoutSuffix();
402 }
403 
404 
405 int64_t PosixCacheManager::GetSize(int fd) {
406  platform_stat64 info;
407  int retval = platform_fstat(fd, &info);
408  if (retval != 0)
409  return -errno;
410  return info.st_size;
411 }
412 
413 
415  const string path = GetPathInCache(object.id);
416  int result;
417  if (do_refcount_) {
418  result = fd_mgr_->Open(object.id, path);
419  } else {
420  result = open(path.c_str(), O_RDONLY);
421  }
422  if (result >= 0) {
423  LogCvmfs(kLogCache, kLogDebug, "hit %s", path.c_str());
424  // platform_disable_kcache(result);
425  quota_mgr_->Touch(object.id);
426  } else {
427  result = -errno;
428  LogCvmfs(kLogCache, kLogDebug, "miss %s (%d)", path.c_str(), result);
429  }
430  return result;
431 }
432 
433 
435  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
436  int retval = Flush(transaction);
437  if (retval < 0)
438  return retval;
439  int fd_rdonly;
440 
441  if (do_refcount_) {
442  fd_rdonly = fd_mgr_->Open(transaction->id, transaction->tmp_path.c_str());
443  } else {
444  fd_rdonly = open(transaction->tmp_path.c_str(), O_RDONLY);
445  }
446  if (fd_rdonly == -1)
447  return -errno;
448  return fd_rdonly;
449 }
450 
451 
453  int fd,
454  void *buf,
455  uint64_t size,
456  uint64_t offset)
457 {
458  int64_t result;
459  do {
460  errno = 0;
461  result = pread(fd, buf, size, offset);
462  } while ((result == -1) && (errno == EINTR));
463  if (result < 0)
464  return -errno;
465  return result;
466 }
467 
468 
469 int PosixCacheManager::Rename(const char *oldpath, const char *newpath) {
470  int result;
471  if (rename_workaround_ != kRenameLink) {
472  result = rename(oldpath, newpath);
473  if (result < 0)
474  return -errno;
475  return 0;
476  }
477 
478  result = link(oldpath, newpath);
479  if (result < 0) {
480  if (errno == EEXIST) {
481  LogCvmfs(kLogCache, kLogDebug, "%s already existed, ignoring", newpath);
482  } else {
483  return -errno;
484  }
485  }
486  result = unlink(oldpath);
487  if (result < 0)
488  return -errno;
489  return 0;
490 }
491 
492 
500  unsigned char *buf[4096];
501  int nbytes;
502  uint64_t pos = 0;
503  if (is_tmpfs()) {
504  return 0;
505  }
506  do {
507  nbytes = Pread(fd, buf, 4096, pos);
508  pos += nbytes;
509  } while (nbytes == 4096);
510  LogCvmfs(kLogCache, kLogDebug, "read-ahead %d, %" PRIu64, fd, pos);
511  if (nbytes < 0)
512  return nbytes;
513  return 0;
514 }
515 
516 
517 int PosixCacheManager::Reset(void *txn) {
518  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
519  transaction->buf_pos = 0;
520  transaction->size = 0;
521  int retval = lseek(transaction->fd, 0, SEEK_SET);
522  if (retval < 0)
523  return -errno;
524  retval = ftruncate(transaction->fd, 0);
525  if (retval < 0)
526  return -errno;
527  return 0;
528 }
529 
530 
532  const shash::Any &id,
533  uint64_t size,
534  void *txn)
535 {
536  atomic_inc32(&no_inflight_txns_);
537  if (cache_mode_ == kCacheReadOnly) {
538  atomic_dec32(&no_inflight_txns_);
539  return -EROFS;
540  }
541 
542  if (size != kSizeUnknown) {
543  if (size > quota_mgr_->GetMaxFileSize()) {
544  LogCvmfs(kLogCache, kLogDebug, "file too big for lru cache (%" PRIu64 " "
545  "requested but only %" PRIu64 " bytes free)",
546  size, quota_mgr_->GetMaxFileSize());
547  atomic_dec32(&no_inflight_txns_);
548  return -ENOSPC;
549  }
550 
551  // For large files, ensure enough free cache space before writing the chunk
552  if (size > kBigFile) {
553  uint64_t cache_size = quota_mgr_->GetSize();
554  uint64_t cache_capacity = quota_mgr_->GetCapacity();
555  assert(cache_capacity >= size);
556  if ((cache_size + size) > cache_capacity) {
557  uint64_t leave_size =
558  std::min(cache_capacity / 2, cache_capacity - size);
559  quota_mgr_->Cleanup(leave_size);
560  }
561  }
562  }
563 
564  string path_in_cache = GetPathInCache(id);
565  Transaction *transaction = new (txn) Transaction(id, path_in_cache);
566 
567  char *template_path = NULL;
568  unsigned temp_path_len = 0;
569  if (rename_workaround_ == kRenameSamedir) {
570  temp_path_len = path_in_cache.length() + 6;
571  template_path = reinterpret_cast<char *>(alloca(temp_path_len + 1));
572  memcpy(template_path, path_in_cache.data(), path_in_cache.length());
573  memset(template_path + path_in_cache.length(), 'X', 6);
574  } else {
575  temp_path_len = txn_template_path_.length();
576  template_path = reinterpret_cast<char *>(alloca(temp_path_len + 1));
577  memcpy(template_path, &txn_template_path_[0], temp_path_len);
578  }
579  template_path[temp_path_len] = '\0';
580 
581  transaction->fd = mkstemp(template_path);
582  if (transaction->fd == -1) {
583  transaction->~Transaction();
584  atomic_dec32(&no_inflight_txns_);
585  return -errno;
586  }
587 
588  LogCvmfs(kLogCache, kLogDebug, "start transaction on %s has result %d",
589  template_path, transaction->fd);
590  transaction->tmp_path = template_path;
591  transaction->expected_size = size;
592  return transaction->fd;
593 }
594 
595 
597 {
598  return manifest::Manifest::ReadBreadcrumb(fqrn, cache_path_);
599 }
600 
601 
603  return manifest.ExportBreadcrumb(cache_path_, 0600);
604 }
605 
606 
608  manifest::Breadcrumb breadcrumb) {
609  return breadcrumb.Export(fqrn, cache_path_, 0600);
610 }
611 
612 
614  cache_mode_ = kCacheReadOnly;
615  while (atomic_read32(&no_inflight_txns_) != 0)
616  SafeSleepMs(50);
617 
618  QuotaManager *old_manager = quota_mgr_;
619  quota_mgr_ = new NoopQuotaManager();
620  delete old_manager;
621 }
622 
623 
624 int64_t PosixCacheManager::Write(const void *buf, uint64_t size, void *txn) {
625  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
626 
627  if (transaction->expected_size != kSizeUnknown) {
628  if (transaction->size + size > transaction->expected_size) {
630  "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
631  transaction->size + size, transaction->expected_size);
632  return -EFBIG;
633  }
634  }
635 
636  uint64_t written = 0;
637  const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
638  while (written < size) {
639  if (transaction->buf_pos == sizeof(transaction->buffer)) {
640  int retval = Flush(transaction);
641  if (retval != 0) {
642  transaction->size += written;
643  return retval;
644  }
645  }
646  uint64_t remaining = size - written;
647  uint64_t space_in_buffer =
648  sizeof(transaction->buffer) - transaction->buf_pos;
649  uint64_t batch_size = std::min(remaining, space_in_buffer);
650  memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
651  transaction->buf_pos += batch_size;
652  written += batch_size;
653  read_pos += batch_size;
654  }
655  transaction->size += written;
656  return written;
657 }
bool MakeCacheDirectories(const std::string &path, const mode_t mode)
Definition: posix.cc:893
RenameWorkarounds rename_workaround_
Definition: cache_posix.h:165
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
Definition: cache_posix.cc:624
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct stat64 platform_stat64
virtual int Open(const LabeledObject &object)
Definition: cache_posix.cc:414
std::string GetDescription() const
Definition: cache.h:110
int Flush(Transaction *transaction)
Definition: cache_posix.cc:384
void TearDown2ReadOnly()
Definition: cache_posix.cc:613
virtual int CommitTxn(void *txn)
Definition: cache_posix.cc:148
T * weak_ref() const
Definition: pointer.h:42
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
EFileSystemTypes type
Definition: posix.h:55
virtual int AbortTxn(void *txn)
Definition: cache_posix.cc:113
virtual void * DoSaveState()
Definition: cache_posix.cc:319
virtual int64_t GetSize(int fd)
Definition: cache_posix.cc:405
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_posix.cc:131
assert((mem||(size==0))&&"Out Of Memory")
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
Definition: cache_posix.cc:293
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:63
virtual int Dup(int fd)
Definition: cache_posix.cc:376
int Rename(const char *oldpath, const char *newpath)
Definition: cache_posix.cc:469
std::string GetPathInCache(const shash::Any &id)
Definition: cache_posix.cc:400
bool InitCacheDirectory(const string &cache_path)
Definition: cache_posix.cc:231
bool IsPinned() const
Definition: cache.h:103
virtual int OpenFromTxn(void *txn)
Definition: cache_posix.cc:434
virtual int Readahead(int fd)
Definition: cache_posix.cc:499
bool FileExists(const std::string &path)
Definition: posix.cc:802
int32_t atomic_int32
Definition: atomic.h:17
unsigned char buffer[4096]
Definition: cache_posix.h:131
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:246
void Transaction()
FileSystemInfo GetFileSystemInfo(const std::string &path)
Definition: posix.cc:180
static const uint64_t kBigFile
Definition: cache_posix.h:67
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
Definition: cache_posix.cc:531
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
Definition: cache_posix.cc:596
bool IsCatalog() const
Definition: cache.h:102
bool Export(const std::string &fqrn, const std::string &directory, const int mode) const
Definition: manifest.cc:48
virtual int Reset(void *txn)
Definition: cache_posix.cc:517
virtual bool DoFreeState(void *data)
Definition: cache_posix.cc:361
static Breadcrumb ReadBreadcrumb(const std::string &repo_name, const std::string &directory)
Definition: manifest.cc:256
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2049
static PosixCacheManager * Create(const std::string &cache_path, const bool alien_cache, const RenameWorkarounds rename_workaround=kRenameNormal, const bool do_refcount=false)
Definition: cache_posix.cc:272
virtual int DoRestoreState(void *data)
Definition: cache_posix.cc:331
virtual std::string Describe()
Definition: cache_posix.cc:303
int platform_fstat(int filedes, platform_stat64 *buf)
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
Definition: cache_posix.cc:602
static void size_t size
Definition: smalloc.h:54
virtual int Close(int fd)
Definition: cache_posix.cc:140
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
Definition: cache_posix.cc:452
UniquePtr< FdRefcountMgr > fd_mgr
Definition: cache_posix.h:183
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528