CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
glue_buffer.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 
8 #include "glue_buffer.h"
9 
10 #include <errno.h>
11 #include <poll.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstdlib>
16 #include <cstring>
17 #include <string>
18 #include <vector>
19 
20 #include "util/exception.h"
21 #include "util/logging.h"
22 #include "util/mutex.h"
23 #include "util/platform.h"
24 #include "util/posix.h"
25 #include "util/smalloc.h"
26 
27 using namespace std; // NOLINT
28 
29 namespace glue {
30 
31 PathStore &PathStore::operator=(const PathStore &other) {
32  if (&other == this)
33  return *this;
34 
35  delete string_heap_;
36  CopyFrom(other);
37  return *this;
38 }
39 
40 
41 PathStore::PathStore(const PathStore &other) { CopyFrom(other); }
42 
43 
44 void PathStore::CopyFrom(const PathStore &other) {
45  map_ = other.map_;
46 
47  string_heap_ = new StringHeap(other.string_heap_->used());
48  shash::Md5 empty_path = map_.empty_key();
49  for (unsigned i = 0; i < map_.capacity(); ++i) {
50  if (map_.keys()[i] != empty_path) {
51  (map_.values() + i)->name = string_heap_->AddString(
52  map_.values()[i].name.length(), map_.values()[i].name.data());
53  }
54  }
55 }
56 
57 
58 //------------------------------------------------------------------------------
59 
60 
61 void InodeTracker::InitLock() {
62  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
63  int retval = pthread_mutex_init(lock_, NULL);
64  assert(retval == 0);
65 }
66 
67 
68 void InodeTracker::CopyFrom(const InodeTracker &other) {
69  assert(other.version_ == kVersion);
70  version_ = kVersion;
71  path_map_ = other.path_map_;
72  inode_ex_map_ = other.inode_ex_map_;
73  inode_references_ = other.inode_references_;
74  statistics_ = other.statistics_;
75 }
76 
77 
78 InodeTracker::InodeTracker() {
79  version_ = kVersion;
80  InitLock();
81 }
82 
83 
84 InodeTracker::InodeTracker(const InodeTracker &other) {
85  CopyFrom(other);
86  InitLock();
87 }
88 
89 
90 InodeTracker &InodeTracker::operator=(const InodeTracker &other) {
91  if (&other == this)
92  return *this;
93 
94  CopyFrom(other);
95  return *this;
96 }
97 
98 
99 InodeTracker::~InodeTracker() {
100  pthread_mutex_destroy(lock_);
101  free(lock_);
102 }
103 
104 
105 //------------------------------------------------------------------------------
106 
107 DentryTracker::DentryTracker() : version_(kVersion), is_active_(true) {
108  pipe_terminate_[0] = pipe_terminate_[1] = -1;
110  InitLock();
111 }
112 
113 
115  if (pipe_terminate_[1] >= 0) {
116  char t = 'T';
117  WritePipe(pipe_terminate_[1], &t, 1);
118  pthread_join(thread_cleaner_, NULL);
120  }
121  pthread_mutex_destroy(lock_);
122  free(lock_);
123 }
124 
125 
127  CopyFrom(other);
128  pipe_terminate_[0] = pipe_terminate_[1] = -1;
130  InitLock();
131 }
132 
133 
135  if (&other == this)
136  return *this;
137 
138  Lock();
139  CopyFrom(other);
140  Unlock();
141  return *this;
142 }
143 
144 
146  assert(other.version_ == kVersion);
147 
148  version_ = kVersion;
149  statistics_ = other.statistics_;
150  is_active_ = other.is_active_;
151  entries_ = other.entries_;
152 }
153 
154 
156  Lock();
157  DentryTracker *new_tracker = new DentryTracker(*this);
158  statistics_.num_remove += entries_.size();
159  entries_.Clear();
160  Unlock();
161  return new_tracker;
162 }
163 
164 
165 void DentryTracker::SpawnCleaner(unsigned interval_s) {
166  assert(pipe_terminate_[0] == -1);
167  cleaning_interval_ms_ = interval_s * 1000;
168  if (cleaning_interval_ms_ == 0)
171  int retval = pthread_create(&thread_cleaner_, NULL, MainCleaner, this);
172  assert(retval == 0);
173 }
174 
175 
176 void *DentryTracker::MainCleaner(void *data) {
177  DentryTracker *tracker = reinterpret_cast<DentryTracker *>(data);
178  LogCvmfs(kLogCvmfs, kLogDebug, "starting negative entry cache cleaner");
179 
180  struct pollfd watch_term;
181  watch_term.fd = tracker->pipe_terminate_[0];
182  watch_term.events = POLLIN | POLLPRI;
183  int timeout_ms = tracker->cleaning_interval_ms_;
184  ;
185  uint64_t deadline = platform_monotonic_time() + timeout_ms / 1000;
186  while (true) {
187  watch_term.revents = 0;
188  int retval = poll(&watch_term, 1, timeout_ms);
189  if (retval < 0) {
190  if (errno == EINTR) {
191  if (timeout_ms >= 0) {
192  uint64_t now = platform_monotonic_time();
193  timeout_ms = (now > deadline) ? 0 : (deadline - now) * 1000;
194  }
195  continue;
196  }
197  abort();
198  }
199  timeout_ms = tracker->cleaning_interval_ms_;
200  deadline = platform_monotonic_time() + timeout_ms / 1000;
201 
202  if (retval == 0) {
203  LogCvmfs(kLogCvmfs, kLogDebug, "negative entry cleaner: pruning");
204  tracker->Prune();
205  continue;
206  }
207 
208  assert(watch_term.revents != 0);
209 
210  char c = 0;
211  ReadPipe(tracker->pipe_terminate_[0], &c, 1);
212  assert(c == 'T');
213  break;
214  }
215  LogCvmfs(kLogCvmfs, kLogDebug, "stopping negative entry cache cleaner");
216  return NULL;
217 }
218 
219 
221  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
222  int retval = pthread_mutex_init(lock_, NULL);
223  assert(retval == 0);
224 }
225 
226 
228  Lock();
230  Unlock();
231 }
232 
233 
235  Entry *head = NULL;
236  Lock();
237  entries_.Peek(&head);
238  return Cursor(head);
239 }
240 
241 
242 bool DentryTracker::NextEntry(Cursor *cursor, uint64_t *inode_parent,
243  NameString *name) {
244  if (cursor->head == NULL)
245  return false;
246  if (cursor->pos >= entries_.size())
247  return false;
248  Entry *e = cursor->head + cursor->pos;
249  *inode_parent = e->inode_parent;
250  *name = e->name;
251  cursor->pos++;
252  return true;
253 }
254 
255 
256 void DentryTracker::EndEnumerate(Cursor * /* cursor */) { Unlock(); }
257 
258 
259 //------------------------------------------------------------------------------
260 
261 
262 PageCacheTracker::PageCacheTracker() : version_(kVersion), is_active_(true) {
263  map_.Init(16, 0, hasher_inode);
264  InitLock();
265 }
266 
267 
269  pthread_mutex_destroy(lock_);
270  free(lock_);
271 }
272 
273 
275  CopyFrom(other);
276  InitLock();
277 }
278 
279 
281  if (&other == this)
282  return *this;
283 
284  MutexLockGuard guard(lock_);
285  CopyFrom(other);
286  return *this;
287 }
288 
289 
291  assert(other.version_ == kVersion);
292 
293  version_ = kVersion;
294  is_active_ = other.is_active_;
295  statistics_ = other.statistics_;
296 
297  map_.Init(16, 0, hasher_inode);
298  map_ = other.map_;
299  stat_store_ = other.stat_store_;
300 }
301 
302 
304  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
305  int retval = pthread_mutex_init(lock_, NULL);
306  assert(retval == 0);
307 }
308 
310  uint64_t inode, const shash::Any &hash, const struct stat &info) {
311  assert(inode == info.st_ino);
312 
313  OpenDirectives open_directives;
314  // Old behavior: always flush page cache on open
315  if (!is_active_)
316  return open_directives;
317 
318  if (inode != info.st_ino) {
320  "invalid entry on open: %" PRIu64 " with st_ino=%" PRIu64,
321  " hash=%s size=%" PRIu64, inode, info.st_ino, hash.ToString().c_str(),
322  info.st_size);
323  }
324 
325  MutexLockGuard guard(lock_);
326 
327  Entry entry;
328  bool retval = map_.Lookup(inode, &entry);
329  if (!retval) {
330  open_directives.keep_cache = true;
331  open_directives.direct_io = false;
334 
335  entry.nopen = 1;
336  entry.idx_stat = stat_store_.Add(info);
337  entry.hash = hash;
338  map_.Insert(inode, entry);
339  return open_directives;
340  }
341 
342  if (entry.hash == hash) {
343  open_directives.direct_io = false;
344  if (entry.nopen < 0) {
345  // The page cache is still in the transition phase and may contain old
346  // content. So trigger a flush of the cache in any case.
347  open_directives.keep_cache = false;
349  entry.nopen--;
350  map_.Insert(inode, entry);
351  return open_directives;
352  } else {
353  open_directives.keep_cache = true;
355  if (entry.nopen++ == 0)
356  entry.idx_stat = stat_store_.Add(info);
357  map_.Insert(inode, entry);
358  return open_directives;
359  }
360  }
361 
362  // Page cache mismatch and old data has still open file attached to it,
363  // circumvent the page cache entirely and use direct I/O. In this case,
364  // cvmfs_close() will _not_ call Close().
365  if (entry.nopen != 0) {
366  open_directives.keep_cache = true;
367  open_directives.direct_io = true;
369  return open_directives;
370  }
371 
372  // Stale data in the page cache, start the transition phase in which newly
373  // opened files flush the page cache and re-populate it with the new hash.
374  // The first file to reach Close() will finish the transition phase and
375  // mark the new hash as committed.
376  open_directives.direct_io = false;
377  open_directives.keep_cache = false;
379  entry.hash = hash;
380  entry.idx_stat = stat_store_.Add(info);
381  entry.nopen = -1;
382  map_.Insert(inode, entry);
383  return open_directives;
384 }
385 
387  OpenDirectives open_directives(true, true);
388  // Old behavior: always flush page cache on open
389  if (!is_active_)
390  return open_directives;
391 
392  MutexLockGuard guard(lock_);
394  return open_directives;
395 }
396 
397 void PageCacheTracker::Close(uint64_t inode) {
398  if (!is_active_)
399  return;
400 
401  MutexLockGuard guard(lock_);
402  Entry entry;
403  bool retval = map_.Lookup(inode, &entry);
404 
406  "PageCacheTracker::Close Race condition? "
407  "Did not find inode %lu",
408  inode)
410  "PageCacheTracker::Close Race condition? "
411  "Inode %lu has no open entries",
412  inode)) {
413  return;
414  }
415 
416  const int32_t old_open = entry.nopen;
417  if (entry.nopen < 0) {
418  // At this point we know that any stale data has been flushed from the
419  // cache and only data related to the currently booked content hash
420  // can be present. So clear the transition bit (sign bit).
421  entry.nopen = -entry.nopen;
422  }
423  entry.nopen--;
424  if (entry.nopen == 0) {
425  // File closed, remove struct stat information
426  if (entry.idx_stat < 0) {
428  "page cache tracker: missing stat entry! Entry info: inode %" PRIu64
429  " - open counter %d - hash %s",
430  inode, old_open, entry.hash.ToString().c_str());
431  }
432  uint64_t inode_update = stat_store_.Erase(entry.idx_stat);
433  Entry entry_update;
434  retval = map_.Lookup(inode_update, &entry_update);
435  if (!retval) {
437  "invalid inode in page cache tracker: inode %" PRIu64
438  ", replacing %" PRIu64,
439  inode_update, inode);
440  }
441  assert(retval);
442  entry_update.idx_stat = entry.idx_stat;
443  map_.Insert(inode_update, entry_update);
444  entry.idx_stat = -1;
445  }
446  map_.Insert(inode, entry);
447 }
448 
450  int retval = pthread_mutex_lock(tracker_->lock_);
451  assert(retval == 0);
452 }
453 
455  int retval = pthread_mutex_unlock(tracker_->lock_);
456  assert(retval == 0);
457 }
458 
460  if (!tracker_->is_active_)
461  return;
462 
463  bool contained_inode = tracker_->map_.Erase(inode);
464  if (contained_inode)
465  tracker_->statistics_.n_remove++;
466 }
467 
468 } // namespace glue
uint64_t inode_parent
Definition: glue_buffer.h:800
InodeReferences inode_references_
Definition: glue_buffer.h:782
pthread_mutex_t * lock_
Definition: glue_buffer.h:1071
void DoPrune(uint64_t now)
Definition: glue_buffer.h:876
NameString name
Definition: glue_buffer.h:801
PageCacheTracker & operator=(const PageCacheTracker &other)
Definition: glue_buffer.cc:280
Definition: glue_buffer.h:904
#define PANIC(...)
Definition: exception.h:29
Definition: glue_buffer.h:795
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
void CopyFrom(const DentryTracker &other)
Definition: glue_buffer.cc:145
perf::Statistics * statistics_
Definition: repository.h:138
pthread_t thread_cleaner_
Definition: glue_buffer.h:895
void Unlock() const
Definition: glue_buffer.h:871
assert((mem||(size==0))&&"Out Of Memory")
bool NextEntry(Cursor *cursor, uint64_t *inode_parent, NameString *name)
Definition: glue_buffer.cc:242
StringHeap * string_heap_
Definition: glue_buffer.h:354
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
void Lock() const
Definition: glue_buffer.h:867
int32_t nopen
Definition: glue_buffer.h:913
static uint32_t hasher_inode(const uint64_t &inode)
Definition: glue_buffer.h:99
void EndEnumerate(Cursor *cursor)
Definition: glue_buffer.cc:256
Statistics statistics_
Definition: glue_buffer.h:783
void SpawnCleaner(unsigned interval_s)
Definition: glue_buffer.cc:165
static bool AssertOrLog(int t, const LogSource, const int, const char *,...)
Definition: exception.h:60
int32_t idx_stat
Definition: glue_buffer.h:917
int32_t Add(const struct stat &info)
Definition: glue_buffer.h:370
DentryTracker * Move()
Definition: glue_buffer.cc:155
static const unsigned kVersion
Definition: glue_buffer.h:1066
uint64_t platform_monotonic_time()
InodeExMap inode_ex_map_
Definition: glue_buffer.h:781
SmallHashDynamic< shash::Md5, PathInfo > map_
Definition: glue_buffer.h:353
uint64_t used() const
Definition: glue_buffer.h:196
OpenDirectives Open(uint64_t inode, const shash::Any &hash, const struct stat &info)
Definition: glue_buffer.cc:309
void Close(uint64_t inode)
Definition: glue_buffer.cc:397
OpenDirectives OpenDirect()
Definition: glue_buffer.cc:386
EvictRaii(PageCacheTracker *t)
Definition: glue_buffer.cc:449
Definition: mutex.h:42
uint64_t Erase(int32_t index)
Definition: glue_buffer.h:379
DentryTracker & operator=(const DentryTracker &other)
Definition: glue_buffer.cc:134
void CopyFrom(const PageCacheTracker &other)
Definition: glue_buffer.cc:290
const char * kVersion
Definition: preload.cc:26
BigQueue< Entry > entries_
Definition: glue_buffer.h:891
static void * MainCleaner(void *data)
Definition: glue_buffer.cc:176
static const unsigned kVersion
Definition: glue_buffer.h:862
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
pthread_mutex_t * lock_
Definition: glue_buffer.h:887
SmallHashDynamic< uint64_t, Entry > map_
Definition: glue_buffer.h:1080
shash::Any hash
Definition: glue_buffer.h:922
Statistics statistics_
Definition: glue_buffer.h:889
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545