CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_posix.cc
Go to the documentation of this file.
1 
27 #define __STDC_FORMAT_MACROS
28 
29 
30 #include "cache_posix.h"
31 
32 #include <dirent.h>
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <inttypes.h>
36 #include <pthread.h>
37 #include <sys/stat.h>
38 #include <sys/types.h>
39 #ifndef __APPLE__
40 #include <sys/statfs.h>
41 #endif
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <vector>
51 
52 #include "crypto/hash.h"
53 #include "crypto/signature.h"
54 #include "directory_entry.h"
55 #include "manifest.h"
56 #include "manifest_fetch.h"
57 #include "network/download.h"
58 #include "quota.h"
59 #include "shortstring.h"
60 #include "statistics.h"
61 #include "util/atomic.h"
62 #include "util/logging.h"
63 #include "util/platform.h"
64 #include "util/posix.h"
65 #include "util/smalloc.h"
66 
67 using namespace std; // NOLINT
68 
69 namespace {
70 
79 class CallGuard {
80  public:
82  const int32_t global_drainout = atomic_read32(&global_drainout_);
83  drainout_ = (global_drainout != 0);
84  if (!drainout_)
85  atomic_inc32(&num_inflight_calls_);
86  }
88  if (!drainout_)
89  atomic_dec32(&num_inflight_calls_);
90  }
91  static void Drainout() {
92  atomic_cas32(&global_drainout_, 0, 1);
93  while (atomic_read32(&num_inflight_calls_) != 0)
94  SafeSleepMs(50);
95  }
96 
97  private:
98  bool drainout_;
101 };
102 atomic_int32 CallGuard::num_inflight_calls_ = 0;
103 atomic_int32 CallGuard::global_drainout_ = 0;
104 
105 } // anonymous namespace
106 
107 
108 //------------------------------------------------------------------------------
109 
110 
111 const uint64_t PosixCacheManager::kBigFile = 25 * 1024 * 1024; // 25M
112 
113 
115  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
116  LogCvmfs(kLogCache, kLogDebug, "abort %s", transaction->tmp_path.c_str());
117  close(transaction->fd);
118  const int result = unlink(transaction->tmp_path.c_str());
119  transaction->~Transaction();
120  atomic_dec32(&no_inflight_txns_);
121  if (result == -1)
122  return -errno;
123  return 0;
124 }
125 
126 
133  if (quota_mgr == NULL)
134  return false;
135  delete quota_mgr_;
136  quota_mgr_ = quota_mgr;
137  return true;
138 }
139 
140 
142  const int retval = do_refcount_ ? fd_mgr_->Close(fd) : close(fd);
143  if (retval != 0)
144  return -errno;
145  return 0;
146 }
147 
148 
150  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
151  int result;
152  LogCvmfs(kLogCache, kLogDebug, "commit %s %s",
153  transaction->final_path.c_str(), transaction->tmp_path.c_str());
154 
155  result = Flush(transaction);
156  close(transaction->fd);
157  if (result < 0) {
158  unlink(transaction->tmp_path.c_str());
159  transaction->~Transaction();
160  atomic_dec32(&no_inflight_txns_);
161  return result;
162  }
163 
164  // To support debugging, move files into quarantine on file size mismatch
165  if (transaction->size != transaction->expected_size) {
166  // Allow size to be zero if alien cache, because hadoop-fuse-dfs returns
167  // size zero for a while
168  if ((transaction->expected_size != kSizeUnknown)
169  && (reports_correct_filesize_ || (transaction->size != 0))) {
171  "size check failure for %s, expected %lu, got %lu",
172  transaction->id.ToString().c_str(), transaction->expected_size,
173  transaction->size);
174  CopyPath2Path(transaction->tmp_path,
175  cache_path_ + "/quarantaine/" + transaction->id.ToString());
176  unlink(transaction->tmp_path.c_str());
177  transaction->~Transaction();
178  atomic_dec32(&no_inflight_txns_);
179  return -EIO;
180  }
181  }
182 
183  if ((transaction->label.flags & kLabelPinned)
184  || (transaction->label.flags & kLabelCatalog)) {
185  const bool retval = quota_mgr_->Pin(
186  transaction->id, transaction->size, transaction->label.GetDescription(),
187  (transaction->label.flags & kLabelCatalog));
188  if (!retval) {
189  LogCvmfs(kLogCache, kLogDebug, "commit failed: cannot pin %s",
190  transaction->id.ToString().c_str());
191  unlink(transaction->tmp_path.c_str());
192  transaction->~Transaction();
193  atomic_dec32(&no_inflight_txns_);
194  return -ENOSPC;
195  }
196  }
197 
198  // Move the temporary file into its final location
199  if (alien_cache_) {
200  const int retval = chmod(transaction->tmp_path.c_str(), 0660);
201  assert(retval == 0);
202  }
203  result = Rename(transaction->tmp_path.c_str(),
204  transaction->final_path.c_str());
205  if (result < 0) {
206  LogCvmfs(kLogCache, kLogDebug, "commit failed: %s", strerror(errno));
207  unlink(transaction->tmp_path.c_str());
208  if ((transaction->label.flags & kLabelPinned)
209  || (transaction->label.flags & kLabelCatalog)) {
210  quota_mgr_->Remove(transaction->id);
211  }
212  } else {
213  // Success, inform quota manager
214  if (transaction->label.flags & kLabelVolatile) {
215  quota_mgr_->InsertVolatile(transaction->id, transaction->size,
216  transaction->label.GetDescription());
217  } else if (!transaction->label.IsCatalog()
218  && !transaction->label.IsPinned()) {
219  quota_mgr_->Insert(transaction->id, transaction->size,
220  transaction->label.GetDescription());
221  }
222  }
223  transaction->~Transaction();
224  atomic_dec32(&no_inflight_txns_);
225  return result;
226 }
227 
228 bool PosixCacheManager::InitCacheDirectory(const string &cache_path) {
229  const FileSystemInfo fs_info = GetFileSystemInfo(cache_path);
230 
231  if (fs_info.type == kFsTypeTmpfs) {
232  is_tmpfs_ = true;
233  }
234 
235  if (alien_cache_) {
236  if (!MakeCacheDirectories(cache_path, 0770)) {
237  return false;
238  }
240  "Cache directory structure created.");
241  switch (fs_info.type) {
242  case kFsTypeNFS:
243  rename_workaround_ = kRenameLink;
244  LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "Alien cache is on NFS.");
245  break;
246  case kFsTypeBeeGFS:
247  rename_workaround_ = kRenameSamedir;
249  "Alien cache is on BeeGFS.");
250  break;
251  default:
252  break;
253  }
254  } else {
255  if (!MakeCacheDirectories(cache_path, 0700))
256  return false;
257  }
258 
259  // TODO(jblomer): we might not need to look anymore for cvmfs 2.0 relicts
260  if (FileExists(cache_path + "/cvmfscatalog.cache")) {
262  "Not mounting on cvmfs 2.0.X cache");
263  return false;
264  }
265  return true;
266 }
267 
269  const string &cache_path,
270  const bool alien_cache,
271  const RenameWorkarounds rename_workaround,
272  const bool do_refcount) {
273  UniquePtr<PosixCacheManager> cache_manager(
274  new PosixCacheManager(cache_path, alien_cache, do_refcount));
275  assert(cache_manager.IsValid());
276 
277  cache_manager->rename_workaround_ = rename_workaround;
278 
279  const bool result_ = cache_manager->InitCacheDirectory(cache_path);
280  if (!result_) {
281  return NULL;
282  }
283 
284  return cache_manager.Release();
285 }
286 
287 
289  const int flags,
290  void *txn) {
291  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
292  transaction->label = label;
293 }
294 
295 
297  string msg;
298  if (do_refcount_) {
299  msg = "Refcounting Posix cache manager"
300  "(cache directory: "
301  + cache_path_ + ")\n";
302  } else {
303  msg = "Posix cache manager (cache directory: " + cache_path_ + ")\n";
304  }
305  return msg;
306 }
307 
308 
314  if (do_refcount_) {
315  SavedState *state = new SavedState();
316  state->fd_mgr = fd_mgr_->Clone();
317  return state;
318  }
319  char *c = reinterpret_cast<char *>(smalloc(1));
320  *c = kMagicNoRefcount;
321  return c;
322 }
323 
324 
326  assert(data);
327  if (do_refcount_) {
328  SavedState *state = reinterpret_cast<SavedState *>(data);
329  if (state->magic_number == kMagicRefcount) {
331  "Restoring refcount cache manager from "
332  "refcounted posix cache manager");
333 
334  fd_mgr_->AssignFrom(state->fd_mgr.weak_ref());
335  } else {
337  "Restoring refcount cache manager from "
338  "non-refcounted posix cache manager");
339  }
340  return -1;
341  }
342 
343  char *c = reinterpret_cast<char *>(data);
344  assert(*c == kMagicNoRefcount || *c == kMagicRefcount);
345  if (*c == kMagicRefcount) {
346  SavedState *state = reinterpret_cast<SavedState *>(data);
348  "Restoring non-refcount cache manager from "
349  "refcounted posix cache manager - this "
350  " is not possible, keep refcounting.");
351  fd_mgr_->AssignFrom(state->fd_mgr.weak_ref());
352  do_refcount_ = true;
353  }
354  return -1;
355 }
356 
357 
359  assert(data);
360  SavedState *state = reinterpret_cast<SavedState *>(data);
361  if (state->magic_number == kMagicRefcount) {
362  delete state;
363  } else {
364  // If not refcounted, the state is the dummy SavedState
365  // of the regular posix cache manager
366  free(data);
367  }
368  return true;
369 }
370 
371 
373  const int new_fd = do_refcount_ ? fd_mgr_->Dup(fd) : dup(fd);
374  if (new_fd < 0)
375  return -errno;
376  return new_fd;
377 }
378 
379 
381  if (transaction->buf_pos == 0)
382  return 0;
383  const int written =
384  write(transaction->fd, transaction->buffer, transaction->buf_pos);
385  if (written < 0)
386  return -errno;
387  if (static_cast<unsigned>(written) != transaction->buf_pos) {
388  transaction->buf_pos -= written;
389  return -EIO;
390  }
391  transaction->buf_pos = 0;
392  return 0;
393 }
394 
395 
397  return cache_path_ + "/" + id.MakePathWithoutSuffix();
398 }
399 
400 
401 int64_t PosixCacheManager::GetSize(int fd) {
402  platform_stat64 info;
403  const int retval = platform_fstat(fd, &info);
404  if (retval != 0)
405  return -errno;
406  return info.st_size;
407 }
408 
409 
411  const string path = GetPathInCache(object.id);
412  int result;
413  if (do_refcount_) {
414  result = fd_mgr_->Open(object.id, path);
415  } else {
416  result = open(path.c_str(), O_RDONLY);
417  }
418  if (result >= 0) {
419  LogCvmfs(kLogCache, kLogDebug, "hit %s", path.c_str());
420  // platform_disable_kcache(result);
421  quota_mgr_->Touch(object.id);
422  } else {
423  result = -errno;
424  LogCvmfs(kLogCache, kLogDebug, "miss %s (%d)", path.c_str(), result);
425  }
426  return result;
427 }
428 
429 
431  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
432  const int retval = Flush(transaction);
433  if (retval < 0)
434  return retval;
435  int fd_rdonly;
436 
437  if (do_refcount_) {
438  fd_rdonly = fd_mgr_->Open(transaction->id, transaction->tmp_path.c_str());
439  } else {
440  fd_rdonly = open(transaction->tmp_path.c_str(), O_RDONLY);
441  }
442  if (fd_rdonly == -1)
443  return -errno;
444  return fd_rdonly;
445 }
446 
447 
449  void *buf,
450  uint64_t size,
451  uint64_t offset) {
452  int64_t result;
453  do {
454  errno = 0;
455  result = pread(fd, buf, size, offset);
456  } while ((result == -1) && (errno == EINTR));
457  if (result < 0)
458  return -errno;
459  return result;
460 }
461 
462 
463 int PosixCacheManager::Rename(const char *oldpath, const char *newpath) {
464  int result;
465  if (rename_workaround_ != kRenameLink) {
466  result = rename(oldpath, newpath);
467  if (result < 0)
468  return -errno;
469  return 0;
470  }
471 
472  result = link(oldpath, newpath);
473  if (result < 0) {
474  if (errno == EEXIST) {
475  LogCvmfs(kLogCache, kLogDebug, "%s already existed, ignoring", newpath);
476  } else {
477  return -errno;
478  }
479  }
480  result = unlink(oldpath);
481  if (result < 0)
482  return -errno;
483  return 0;
484 }
485 
486 
494  unsigned char *buf[4096];
495  int nbytes;
496  uint64_t pos = 0;
497  if (is_tmpfs()) {
498  return 0;
499  }
500  do {
501  nbytes = Pread(fd, buf, 4096, pos);
502  pos += nbytes;
503  } while (nbytes == 4096);
504  LogCvmfs(kLogCache, kLogDebug, "read-ahead %d, %" PRIu64, fd, pos);
505  if (nbytes < 0)
506  return nbytes;
507  return 0;
508 }
509 
510 
511 int PosixCacheManager::Reset(void *txn) {
512  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
513  transaction->buf_pos = 0;
514  transaction->size = 0;
515  int retval = lseek(transaction->fd, 0, SEEK_SET);
516  if (retval < 0)
517  return -errno;
518  retval = ftruncate(transaction->fd, 0);
519  if (retval < 0)
520  return -errno;
521  return 0;
522 }
523 
524 
526  uint64_t size,
527  void *txn) {
528  atomic_inc32(&no_inflight_txns_);
529  if (cache_mode_ == kCacheReadOnly) {
530  atomic_dec32(&no_inflight_txns_);
531  return -EROFS;
532  }
533 
534  if (size != kSizeUnknown) {
535  if (size > quota_mgr_->GetMaxFileSize()) {
537  "file too big for lru cache (%" PRIu64 " "
538  "requested but only %" PRIu64 " bytes free)",
539  size, quota_mgr_->GetMaxFileSize());
540  atomic_dec32(&no_inflight_txns_);
541  return -ENOSPC;
542  }
543 
544  // For large files, ensure enough free cache space before writing the chunk
545  if (size > kBigFile) {
546  const uint64_t cache_size = quota_mgr_->GetSize();
547  const uint64_t cache_capacity = quota_mgr_->GetCapacity();
548  assert(cache_capacity >= size);
549  if ((cache_size + size) > cache_capacity) {
550  const uint64_t leave_size =
551  std::min(cache_capacity / 2, cache_capacity - size);
552  quota_mgr_->Cleanup(leave_size);
553  }
554  }
555  }
556 
557  string path_in_cache = GetPathInCache(id);
558  Transaction *transaction = new (txn) Transaction(id, path_in_cache);
559 
560  char *template_path = NULL;
561  unsigned temp_path_len = 0;
562  if (rename_workaround_ == kRenameSamedir) {
563  temp_path_len = path_in_cache.length() + 6;
564  template_path = reinterpret_cast<char *>(alloca(temp_path_len + 1));
565  memcpy(template_path, path_in_cache.data(), path_in_cache.length());
566  memset(template_path + path_in_cache.length(), 'X', 6);
567  } else {
568  temp_path_len = txn_template_path_.length();
569  template_path = reinterpret_cast<char *>(alloca(temp_path_len + 1));
570  memcpy(template_path, &txn_template_path_[0], temp_path_len);
571  }
572  template_path[temp_path_len] = '\0';
573 
574  transaction->fd = mkstemp(template_path);
575  if (transaction->fd == -1) {
576  transaction->~Transaction();
577  atomic_dec32(&no_inflight_txns_);
578  return -errno;
579  }
580 
581  LogCvmfs(kLogCache, kLogDebug, "start transaction on %s has result %d",
582  template_path, transaction->fd);
583  transaction->tmp_path = template_path;
584  transaction->expected_size = size;
585  return transaction->fd;
586 }
587 
588 
590  const std::string &fqrn) {
591  return manifest::Manifest::ReadBreadcrumb(fqrn, cache_path_);
592 }
593 
594 
596  return manifest.ExportBreadcrumb(cache_path_, 0600);
597 }
598 
599 
601  manifest::Breadcrumb breadcrumb) {
602  return breadcrumb.Export(fqrn, cache_path_, 0600);
603 }
604 
605 
607  cache_mode_ = kCacheReadOnly;
608  while (atomic_read32(&no_inflight_txns_) != 0)
609  SafeSleepMs(50);
610 
611  QuotaManager *old_manager = quota_mgr_;
612  quota_mgr_ = new NoopQuotaManager();
613  delete old_manager;
614 }
615 
616 
617 int64_t PosixCacheManager::Write(const void *buf, uint64_t size, void *txn) {
618  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
619 
620  if (transaction->expected_size != kSizeUnknown) {
621  if (transaction->size + size > transaction->expected_size) {
623  "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
624  transaction->size + size, transaction->expected_size);
625  return -EFBIG;
626  }
627  }
628 
629  uint64_t written = 0;
630  const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
631  while (written < size) {
632  if (transaction->buf_pos == sizeof(transaction->buffer)) {
633  const int retval = Flush(transaction);
634  if (retval != 0) {
635  transaction->size += written;
636  return retval;
637  }
638  }
639  const uint64_t remaining = size - written;
640  const uint64_t space_in_buffer =
641  sizeof(transaction->buffer) - transaction->buf_pos;
642  const uint64_t batch_size = std::min(remaining, space_in_buffer);
643  memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
644  transaction->buf_pos += batch_size;
645  written += batch_size;
646  read_pos += batch_size;
647  }
648  transaction->size += written;
649  return written;
650 }
bool MakeCacheDirectories(const std::string &path, const mode_t mode)
Definition: posix.cc:890
RenameWorkarounds rename_workaround_
Definition: cache_posix.h:161
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
Definition: cache_posix.cc:617
const manifest::Manifest * manifest() const
Definition: repository.h:125
struct stat64 platform_stat64
virtual int Open(const LabeledObject &object)
Definition: cache_posix.cc:410
std::string GetDescription() const
Definition: cache.h:110
int Flush(Transaction *transaction)
Definition: cache_posix.cc:380
void TearDown2ReadOnly()
Definition: cache_posix.cc:606
virtual int CommitTxn(void *txn)
Definition: cache_posix.cc:149
T * weak_ref() const
Definition: pointer.h:46
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
EFileSystemTypes type
Definition: posix.h:55
virtual int AbortTxn(void *txn)
Definition: cache_posix.cc:114
virtual void * DoSaveState()
Definition: cache_posix.cc:313
virtual int64_t GetSize(int fd)
Definition: cache_posix.cc:401
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_posix.cc:132
assert((mem||(size==0))&&"Out Of Memory")
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
Definition: cache_posix.cc:288
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:66
virtual int Dup(int fd)
Definition: cache_posix.cc:372
int Rename(const char *oldpath, const char *newpath)
Definition: cache_posix.cc:463
std::string GetPathInCache(const shash::Any &id)
Definition: cache_posix.cc:396
bool InitCacheDirectory(const string &cache_path)
Definition: cache_posix.cc:228
bool IsPinned() const
Definition: cache.h:103
virtual int OpenFromTxn(void *txn)
Definition: cache_posix.cc:430
virtual int Readahead(int fd)
Definition: cache_posix.cc:493
bool FileExists(const std::string &path)
Definition: posix.cc:803
int32_t atomic_int32
Definition: atomic.h:17
unsigned char buffer[4096]
Definition: cache_posix.h:128
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:239
void Transaction()
FileSystemInfo GetFileSystemInfo(const std::string &path)
Definition: posix.cc:179
static PosixCacheManager * Create(const std::string &cache_path, const bool alien_cache, const RenameWorkarounds rename_workaround=kRenameNormal, const bool do_refcount=true)
Definition: cache_posix.cc:268
static const uint64_t kBigFile
Definition: cache_posix.h:67
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
Definition: cache_posix.cc:525
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
Definition: cache_posix.cc:589
bool IsCatalog() const
Definition: cache.h:102
bool Export(const std::string &fqrn, const std::string &directory, const int mode) const
Definition: manifest.cc:48
virtual int Reset(void *txn)
Definition: cache_posix.cc:511
virtual bool DoFreeState(void *data)
Definition: cache_posix.cc:358
static Breadcrumb ReadBreadcrumb(const std::string &repo_name, const std::string &directory)
Definition: manifest.cc:249
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2025
virtual int DoRestoreState(void *data)
Definition: cache_posix.cc:325
virtual std::string Describe()
Definition: cache_posix.cc:296
int platform_fstat(int filedes, platform_stat64 *buf)
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
Definition: cache_posix.cc:595
static void size_t size
Definition: smalloc.h:54
virtual int Close(int fd)
Definition: cache_posix.cc:141
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
Definition: cache_posix.cc:448
UniquePtr< FdRefcountMgr > fd_mgr
Definition: cache_posix.h:179
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545