CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_posix.cc
Go to the documentation of this file.
1 
27 #define __STDC_FORMAT_MACROS
28 
29 #include "cvmfs_config.h"
30 #include "cache_posix.h"
31 
32 #include <dirent.h>
33 #include <errno.h>
34 #include <fcntl.h>
35 #include <inttypes.h>
36 #include <pthread.h>
37 #include <sys/stat.h>
38 #include <sys/types.h>
39 #ifndef __APPLE__
40 #include <sys/statfs.h>
41 #endif
42 #include <unistd.h>
43 
44 #include <algorithm>
45 #include <cassert>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <cstring>
49 #include <map>
50 #include <vector>
51 
52 #include "atomic.h"
53 #include "directory_entry.h"
54 #include "download.h"
55 #include "hash.h"
56 #include "logging.h"
57 #include "manifest.h"
58 #include "manifest_fetch.h"
59 #include "platform.h"
60 #include "quota.h"
61 #include "shortstring.h"
62 #include "signature.h"
63 #include "smalloc.h"
64 #include "statistics.h"
65 #include "util/posix.h"
66 
67 using namespace std; // NOLINT
68 
69 namespace {
70 
79 class CallGuard {
80  public:
82  int32_t global_drainout = atomic_read32(&global_drainout_);
83  drainout_ = (global_drainout != 0);
84  if (!drainout_)
85  atomic_inc32(&num_inflight_calls_);
86  }
88  if (!drainout_)
89  atomic_dec32(&num_inflight_calls_);
90  }
91  static void Drainout() {
92  atomic_cas32(&global_drainout_, 0, 1);
93  while (atomic_read32(&num_inflight_calls_) != 0)
94  SafeSleepMs(50);
95  }
96  private:
97  bool drainout_;
100 };
101 atomic_int32 CallGuard::num_inflight_calls_ = 0;
102 atomic_int32 CallGuard::global_drainout_ = 0;
103 
104 } // anonymous namespace
105 
106 
107 //------------------------------------------------------------------------------
108 
109 
110 const uint64_t PosixCacheManager::kBigFile = 25 * 1024 * 1024; // 25M
111 
112 
114  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
115  LogCvmfs(kLogCache, kLogDebug, "abort %s", transaction->tmp_path.c_str());
116  close(transaction->fd);
117  int result = unlink(transaction->tmp_path.c_str());
118  transaction->~Transaction();
119  atomic_dec32(&no_inflight_txns_);
120  if (result == -1)
121  return -errno;
122  return 0;
123 }
124 
125 
132  if (quota_mgr == NULL)
133  return false;
134  delete quota_mgr_;
135  quota_mgr_ = quota_mgr;
136  return true;
137 }
138 
139 
141  int retval = close(fd);
142  if (retval != 0)
143  return -errno;
144  return 0;
145 }
146 
147 
149  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
150  int result;
151  LogCvmfs(kLogCache, kLogDebug, "commit %s %s",
152  transaction->final_path.c_str(), transaction->tmp_path.c_str());
153 
154  result = Flush(transaction);
155  close(transaction->fd);
156  if (result < 0) {
157  unlink(transaction->tmp_path.c_str());
158  transaction->~Transaction();
159  atomic_dec32(&no_inflight_txns_);
160  return result;
161  }
162 
163  // To support debugging, move files into quarantine on file size mismatch
164  if (transaction->size != transaction->expected_size) {
165  // Allow size to be zero if alien cache, because hadoop-fuse-dfs returns
166  // size zero for a while
167  if ( (transaction->expected_size != kSizeUnknown) &&
168  (reports_correct_filesize_ || (transaction->size != 0)) )
169  {
171  "size check failure for %s, expected %lu, got %lu",
172  transaction->id.ToString().c_str(),
173  transaction->expected_size, transaction->size);
174  CopyPath2Path(transaction->tmp_path,
175  cache_path_ + "/quarantaine/" + transaction->id.ToString());
176  unlink(transaction->tmp_path.c_str());
177  transaction->~Transaction();
178  atomic_dec32(&no_inflight_txns_);
179  return -EIO;
180  }
181  }
182 
183  if ((transaction->object_info.type == kTypePinned) ||
184  (transaction->object_info.type == kTypeCatalog))
185  {
186  bool retval = quota_mgr_->Pin(
187  transaction->id, transaction->size, transaction->object_info.description,
188  (transaction->object_info.type == kTypeCatalog));
189  if (!retval) {
190  LogCvmfs(kLogCache, kLogDebug, "commit failed: cannot pin %s",
191  transaction->id.ToString().c_str());
192  unlink(transaction->tmp_path.c_str());
193  transaction->~Transaction();
194  atomic_dec32(&no_inflight_txns_);
195  return -ENOSPC;
196  }
197  }
198 
199  // Move the temporary file into its final location
200  if (alien_cache_) {
201  int retval = chmod(transaction->tmp_path.c_str(), 0660);
202  assert(retval == 0);
203  }
204  result =
205  Rename(transaction->tmp_path.c_str(), transaction->final_path.c_str());
206  if (result < 0) {
207  LogCvmfs(kLogCache, kLogDebug, "commit failed: %s", strerror(errno));
208  unlink(transaction->tmp_path.c_str());
209  if ((transaction->object_info.type == kTypePinned) ||
210  (transaction->object_info.type == kTypeCatalog))
211  {
212  quota_mgr_->Remove(transaction->id);
213  }
214  } else {
215  // Success, inform quota manager
216  if (transaction->object_info.type == kTypeVolatile) {
217  quota_mgr_->InsertVolatile(transaction->id, transaction->size,
218  transaction->object_info.description);
219  } else if (transaction->object_info.type == kTypeRegular) {
220  quota_mgr_->Insert(transaction->id, transaction->size,
221  transaction->object_info.description);
222  }
223  }
224  transaction->~Transaction();
225  atomic_dec32(&no_inflight_txns_);
226  return result;
227 }
228 
229 
231  const string &cache_path,
232  const bool alien_cache,
233  const RenameWorkarounds rename_workaround)
234 {
235  UniquePtr<PosixCacheManager> cache_manager(
236  new PosixCacheManager(cache_path, alien_cache));
237  assert(cache_manager.IsValid());
238 
239  cache_manager->rename_workaround_ = rename_workaround;
240  if (cache_manager->alien_cache_) {
241  if (!MakeCacheDirectories(cache_path, 0770)) {
242  return NULL;
243  }
245  "Cache directory structure created.");
246  FileSystemInfo fs_info = GetFileSystemInfo(cache_path);
247  switch (fs_info.type) {
248  case kFsTypeNFS:
249  cache_manager->rename_workaround_ = kRenameLink;
251  "Alien cache is on NFS.");
252  break;
253  case kFsTypeBeeGFS:
254  cache_manager->rename_workaround_ = kRenameSamedir;
256  "Alien cache is on BeeGFS.");
257  break;
258  default:
259  break;
260  }
261  } else {
262  if (!MakeCacheDirectories(cache_path, 0700))
263  return NULL;
264  }
265 
266  // TODO(jblomer): we might not need to look anymore for cvmfs 2.0 relicts
267  if (FileExists(cache_path + "/cvmfscatalog.cache")) {
269  "Not mounting on cvmfs 2.0.X cache");
270  return NULL;
271  }
272 
273  return cache_manager.Release();
274 }
275 
276 
278  const ObjectInfo &object_info,
279  const int flags,
280  void *txn)
281 {
282  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
283  transaction->object_info = object_info;
284 }
285 
286 
288  return "Posix cache manager (cache directory: " + cache_path_ + ")\n";
289 }
290 
291 
297  char *c = reinterpret_cast<char *>(smalloc(1));
298  *c = '\0';
299  return c;
300 }
301 
302 
304  assert(data);
305  char *c = reinterpret_cast<char *>(data);
306  assert(*c == '\0');
307  return -1;
308 }
309 
310 
312  free(data);
313  return true;
314 }
315 
316 
317 
319  int new_fd = dup(fd);
320  if (new_fd < 0)
321  return -errno;
322  return new_fd;
323 }
324 
325 
327  if (transaction->buf_pos == 0)
328  return 0;
329  int written =
330  write(transaction->fd, transaction->buffer, transaction->buf_pos);
331  if (written < 0)
332  return -errno;
333  if (static_cast<unsigned>(written) != transaction->buf_pos) {
334  transaction->buf_pos -= written;
335  return -EIO;
336  }
337  transaction->buf_pos = 0;
338  return 0;
339 }
340 
341 
343  return cache_path_ + "/" + id.MakePathWithoutSuffix();
344 }
345 
346 
347 int64_t PosixCacheManager::GetSize(int fd) {
348  platform_stat64 info;
349  int retval = platform_fstat(fd, &info);
350  if (retval != 0)
351  return -errno;
352  return info.st_size;
353 }
354 
355 
357  const string path = GetPathInCache(object.id);
358  int result = open(path.c_str(), O_RDONLY);
359 
360  if (result >= 0) {
361  LogCvmfs(kLogCache, kLogDebug, "hit %s", path.c_str());
362  // platform_disable_kcache(result);
363  quota_mgr_->Touch(object.id);
364  } else {
365  result = -errno;
366  LogCvmfs(kLogCache, kLogDebug, "miss %s (%d)", path.c_str(), result);
367  }
368  return result;
369 }
370 
371 
373  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
374  int retval = Flush(transaction);
375  if (retval < 0)
376  return retval;
377  int fd_rdonly = open(transaction->tmp_path.c_str(), O_RDONLY);
378  if (fd_rdonly == -1)
379  return -errno;
380  return fd_rdonly;
381 }
382 
383 
385  int fd,
386  void *buf,
387  uint64_t size,
388  uint64_t offset)
389 {
390  int64_t result;
391  do {
392  errno = 0;
393  result = pread(fd, buf, size, offset);
394  } while ((result == -1) && (errno == EINTR));
395  if (result < 0)
396  return -errno;
397  return result;
398 }
399 
400 
401 int PosixCacheManager::Rename(const char *oldpath, const char *newpath) {
402  int result;
403  if (rename_workaround_ != kRenameLink) {
404  result = rename(oldpath, newpath);
405  if (result < 0)
406  return -errno;
407  return 0;
408  }
409 
410  result = link(oldpath, newpath);
411  if (result < 0) {
412  if (errno == EEXIST)
413  LogCvmfs(kLogCache, kLogDebug, "%s already existed, ignoring", newpath);
414  else
415  return -errno;
416  }
417  result = unlink(oldpath);
418  if (result < 0)
419  return -errno;
420  return 0;
421 }
422 
423 
429  unsigned char *buf[4096];
430  int nbytes;
431  uint64_t pos = 0;
432  do {
433  nbytes = Pread(fd, buf, 4096, pos);
434  pos += nbytes;
435  } while (nbytes == 4096);
436  LogCvmfs(kLogCache, kLogDebug, "read-ahead %d, %" PRIu64, fd, pos);
437  if (nbytes < 0)
438  return nbytes;
439  return 0;
440 }
441 
442 
443 int PosixCacheManager::Reset(void *txn) {
444  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
445  transaction->buf_pos = 0;
446  transaction->size = 0;
447  int retval = lseek(transaction->fd, 0, SEEK_SET);
448  if (retval < 0)
449  return -errno;
450  retval = ftruncate(transaction->fd, 0);
451  if (retval < 0)
452  return -errno;
453  return 0;
454 }
455 
456 
458  const shash::Any &id,
459  uint64_t size,
460  void *txn)
461 {
462  atomic_inc32(&no_inflight_txns_);
463  if (cache_mode_ == kCacheReadOnly) {
464  atomic_dec32(&no_inflight_txns_);
465  return -EROFS;
466  }
467 
468  if (size != kSizeUnknown) {
469  if (size > quota_mgr_->GetMaxFileSize()) {
470  LogCvmfs(kLogCache, kLogDebug, "file too big for lru cache (%" PRIu64 " "
471  "requested but only %" PRIu64 " bytes free)",
472  size, quota_mgr_->GetMaxFileSize());
473  atomic_dec32(&no_inflight_txns_);
474  return -ENOSPC;
475  }
476 
477  // For large files, ensure enough free cache space before writing the chunk
478  if (size > kBigFile) {
479  uint64_t cache_size = quota_mgr_->GetSize();
480  uint64_t cache_capacity = quota_mgr_->GetCapacity();
481  assert(cache_capacity >= size);
482  if ((cache_size + size) > cache_capacity) {
483  uint64_t leave_size =
484  std::min(cache_capacity / 2, cache_capacity - size);
485  quota_mgr_->Cleanup(leave_size);
486  }
487  }
488  }
489 
490  string path_in_cache = GetPathInCache(id);
491  Transaction *transaction = new (txn) Transaction(id, path_in_cache);
492 
493  char *template_path = NULL;
494  unsigned temp_path_len = 0;
495  if (rename_workaround_ == kRenameSamedir) {
496  temp_path_len = path_in_cache.length() + 6;
497  template_path = reinterpret_cast<char *>(alloca(temp_path_len + 1));
498  memcpy(template_path, path_in_cache.data(), path_in_cache.length());
499  memset(template_path + path_in_cache.length(), 'X', 6);
500  } else {
501  temp_path_len = txn_template_path_.length();
502  template_path = reinterpret_cast<char *>(alloca(temp_path_len + 1));
503  memcpy(template_path, &txn_template_path_[0], temp_path_len);
504  }
505  template_path[temp_path_len] = '\0';
506 
507  transaction->fd = mkstemp(template_path);
508  if (transaction->fd == -1) {
509  transaction->~Transaction();
510  atomic_dec32(&no_inflight_txns_);
511  return -errno;
512  }
513 
514  LogCvmfs(kLogCache, kLogDebug, "start transaction on %s has result %d",
515  template_path, transaction->fd);
516  transaction->tmp_path = template_path;
517  transaction->expected_size = size;
518  return transaction->fd;
519 }
520 
521 
523 {
524  return manifest::Manifest::ReadBreadcrumb(fqrn, cache_path_);
525 }
526 
527 
529  return manifest.ExportBreadcrumb(cache_path_, 0600);
530 }
531 
532 
534  manifest::Breadcrumb breadcrumb) {
535  return breadcrumb.Export(fqrn, cache_path_, 0600);
536 }
537 
538 
540  cache_mode_ = kCacheReadOnly;
541  while (atomic_read32(&no_inflight_txns_) != 0)
542  SafeSleepMs(50);
543 
544  QuotaManager *old_manager = quota_mgr_;
545  quota_mgr_ = new NoopQuotaManager();
546  delete old_manager;
547 }
548 
549 
550 int64_t PosixCacheManager::Write(const void *buf, uint64_t size, void *txn) {
551  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
552 
553  if (transaction->expected_size != kSizeUnknown) {
554  if (transaction->size + size > transaction->expected_size) {
556  "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
557  transaction->size + size, transaction->expected_size);
558  return -EFBIG;
559  }
560  }
561 
562  uint64_t written = 0;
563  const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
564  while (written < size) {
565  if (transaction->buf_pos == sizeof(transaction->buffer)) {
566  int retval = Flush(transaction);
567  if (retval != 0) {
568  transaction->size += written;
569  return retval;
570  }
571  }
572  uint64_t remaining = size - written;
573  uint64_t space_in_buffer =
574  sizeof(transaction->buffer) - transaction->buf_pos;
575  uint64_t batch_size = std::min(remaining, space_in_buffer);
576  memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
577  transaction->buf_pos += batch_size;
578  written += batch_size;
579  read_pos += batch_size;
580  }
581  transaction->size += written;
582  return written;
583 }
bool MakeCacheDirectories(const std::string &path, const mode_t mode)
Definition: posix.cc:907
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
RenameWorkarounds rename_workaround_
Definition: cache_posix.h:154
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
Definition: cache_posix.cc:550
const manifest::Manifest * manifest() const
Definition: repository.h:123
struct stat64 platform_stat64
int Flush(Transaction *transaction)
Definition: cache_posix.cc:326
void TearDown2ReadOnly()
Definition: cache_posix.cc:539
virtual int CommitTxn(void *txn)
Definition: cache_posix.cc:148
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
virtual int Open(const BlessedObject &object)
Definition: cache_posix.cc:356
EFileSystemTypes type
Definition: posix.h:51
virtual int AbortTxn(void *txn)
Definition: cache_posix.cc:113
virtual void * DoSaveState()
Definition: cache_posix.cc:296
virtual int64_t GetSize(int fd)
Definition: cache_posix.cc:347
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_posix.cc:131
assert((mem||(size==0))&&"Out Of Memory")
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:63
virtual int Dup(int fd)
Definition: cache_posix.cc:318
int Rename(const char *oldpath, const char *newpath)
Definition: cache_posix.cc:401
std::string description
Definition: cache.h:97
std::string GetPathInCache(const shash::Any &id)
Definition: cache_posix.cc:342
virtual int OpenFromTxn(void *txn)
Definition: cache_posix.cc:372
virtual int Readahead(int fd)
Definition: cache_posix.cc:428
bool FileExists(const std::string &path)
Definition: posix.cc:816
int32_t atomic_int32
Definition: atomic.h:17
unsigned char buffer[4096]
Definition: cache_posix.h:125
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:232
void Transaction()
virtual void CtrlTxn(const ObjectInfo &object_info, const int flags, void *txn)
Definition: cache_posix.cc:277
FileSystemInfo GetFileSystemInfo(const std::string &path)
Definition: posix.cc:216
static const uint64_t kBigFile
Definition: cache_posix.h:66
ObjectType type
Definition: cache.h:93
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
Definition: cache_posix.cc:457
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
Definition: cache_posix.cc:522
bool Export(const std::string &fqrn, const std::string &directory, const int mode) const
Definition: manifest.cc:36
virtual int Reset(void *txn)
Definition: cache_posix.cc:443
virtual bool DoFreeState(void *data)
Definition: cache_posix.cc:311
static Breadcrumb ReadBreadcrumb(const std::string &repo_name, const std::string &directory)
Definition: manifest.cc:242
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1918
virtual int DoRestoreState(void *data)
Definition: cache_posix.cc:303
virtual std::string Describe()
Definition: cache_posix.cc:287
int platform_fstat(int filedes, platform_stat64 *buf)
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
Definition: cache_posix.cc:528
static void size_t size
Definition: smalloc.h:47
virtual int Close(int fd)
Definition: cache_posix.cc:140
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
Definition: cache_posix.cc:384
static PosixCacheManager * Create(const std::string &cache_path, const bool alien_cache, const RenameWorkarounds rename_workaround=kRenameNormal)
Definition: cache_posix.cc:230