CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
glue_buffer.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "cvmfs_config.h"
8 #include "glue_buffer.h"
9 
10 #include <errno.h>
11 #include <poll.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstdlib>
16 #include <cstring>
17 
18 #include <string>
19 #include <vector>
20 
21 #include "util/exception.h"
22 #include "util/logging.h"
23 #include "util/mutex.h"
24 #include "util/platform.h"
25 #include "util/posix.h"
26 #include "util/smalloc.h"
27 
28 using namespace std; // NOLINT
29 
30 namespace glue {
31 
32 PathStore &PathStore::operator= (const PathStore &other) {
33  if (&other == this)
34  return *this;
35 
36  delete string_heap_;
37  CopyFrom(other);
38  return *this;
39 }
40 
41 
42 PathStore::PathStore(const PathStore &other) {
43  CopyFrom(other);
44 }
45 
46 
47 void PathStore::CopyFrom(const PathStore &other) {
48  map_ = other.map_;
49 
50  string_heap_ = new StringHeap(other.string_heap_->used());
51  shash::Md5 empty_path = map_.empty_key();
52  for (unsigned i = 0; i < map_.capacity(); ++i) {
53  if (map_.keys()[i] != empty_path) {
54  (map_.values() + i)->name =
55  string_heap_->AddString(map_.values()[i].name.length(),
56  map_.values()[i].name.data());
57  }
58  }
59 }
60 
61 
62 //------------------------------------------------------------------------------
63 
64 
65 void InodeTracker::InitLock() {
66  lock_ =
67  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
68  int retval = pthread_mutex_init(lock_, NULL);
69  assert(retval == 0);
70 }
71 
72 
73 void InodeTracker::CopyFrom(const InodeTracker &other) {
74  assert(other.version_ == kVersion);
75  version_ = kVersion;
76  path_map_ = other.path_map_;
77  inode_ex_map_ = other.inode_ex_map_;
78  inode_references_ = other.inode_references_;
79  statistics_ = other.statistics_;
80 }
81 
82 
83 InodeTracker::InodeTracker() {
84  version_ = kVersion;
85  InitLock();
86 }
87 
88 
89 InodeTracker::InodeTracker(const InodeTracker &other) {
90  CopyFrom(other);
91  InitLock();
92 }
93 
94 
95 InodeTracker &InodeTracker::operator= (const InodeTracker &other) {
96  if (&other == this)
97  return *this;
98 
99  CopyFrom(other);
100  return *this;
101 }
102 
103 
104 InodeTracker::~InodeTracker() {
105  pthread_mutex_destroy(lock_);
106  free(lock_);
107 }
108 
109 
110 //------------------------------------------------------------------------------
111 
112 DentryTracker::DentryTracker() : version_(kVersion), is_active_(true) {
113  pipe_terminate_[0] = pipe_terminate_[1] = -1;
115  InitLock();
116 }
117 
118 
120  if (pipe_terminate_[1] >= 0) {
121  char t = 'T';
122  WritePipe(pipe_terminate_[1], &t, 1);
123  pthread_join(thread_cleaner_, NULL);
125  }
126  pthread_mutex_destroy(lock_);
127  free(lock_);
128 }
129 
130 
132  CopyFrom(other);
133  pipe_terminate_[0] = pipe_terminate_[1] = -1;
135  InitLock();
136 }
137 
138 
140  if (&other == this)
141  return *this;
142 
143  Lock();
144  CopyFrom(other);
145  Unlock();
146  return *this;
147 }
148 
149 
151  assert(other.version_ == kVersion);
152 
153  version_ = kVersion;
154  statistics_ = other.statistics_;
155  is_active_ = other.is_active_;
156  entries_ = other.entries_;
157 }
158 
159 
161  Lock();
162  DentryTracker *new_tracker = new DentryTracker(*this);
163  statistics_.num_remove += entries_.size();
164  entries_.Clear();
165  Unlock();
166  return new_tracker;
167 }
168 
169 
170 void DentryTracker::SpawnCleaner(unsigned interval_s) {
171  assert(pipe_terminate_[0] == -1);
172  cleaning_interval_ms_ = interval_s * 1000;
175  int retval = pthread_create(&thread_cleaner_, NULL, MainCleaner, this);
176  assert(retval == 0);
177 }
178 
179 
180 void *DentryTracker::MainCleaner(void *data) {
181  DentryTracker *tracker = reinterpret_cast<DentryTracker *>(data);
182  LogCvmfs(kLogCvmfs, kLogDebug, "starting negative entry cache cleaner");
183 
184  struct pollfd watch_term;
185  watch_term.fd = tracker->pipe_terminate_[0];
186  watch_term.events = POLLIN | POLLPRI;
187  int timeout_ms = tracker->cleaning_interval_ms_;;
188  uint64_t deadline = platform_monotonic_time() + timeout_ms / 1000;
189  while (true) {
190  watch_term.revents = 0;
191  int retval = poll(&watch_term, 1, timeout_ms);
192  if (retval < 0) {
193  if (errno == EINTR) {
194  if (timeout_ms >= 0) {
195  uint64_t now = platform_monotonic_time();
196  timeout_ms = (now > deadline) ? 0 : (deadline - now) * 1000;
197  }
198  continue;
199  }
200  abort();
201  }
202  timeout_ms = tracker->cleaning_interval_ms_;
203  deadline = platform_monotonic_time() + timeout_ms / 1000;
204 
205  if (retval == 0) {
206  LogCvmfs(kLogCvmfs, kLogDebug, "negative entry cleaner: pruning");
207  tracker->Prune();
208  continue;
209  }
210 
211  assert(watch_term.revents != 0);
212 
213  char c = 0;
214  ReadPipe(tracker->pipe_terminate_[0], &c, 1);
215  assert(c == 'T');
216  break;
217  }
218  LogCvmfs(kLogCvmfs, kLogDebug, "stopping negative entry cache cleaner");
219  return NULL;
220 }
221 
222 
224  lock_ =
225  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
226  int retval = pthread_mutex_init(lock_, NULL);
227  assert(retval == 0);
228 }
229 
230 
232  Lock();
234  Unlock();
235 }
236 
237 
239  Entry *head = NULL;
240  Lock();
241  entries_.Peek(&head);
242  return Cursor(head);
243 }
244 
245 
247  uint64_t *inode_parent, NameString *name)
248 {
249  if (cursor->head == NULL)
250  return false;
251  if (cursor->pos >= entries_.size())
252  return false;
253  Entry *e = cursor->head + cursor->pos;
254  *inode_parent = e->inode_parent;
255  *name = e->name;
256  cursor->pos++;
257  return true;
258 }
259 
260 
261 void DentryTracker::EndEnumerate(Cursor * /* cursor */) {
262  Unlock();
263 }
264 
265 
266 //------------------------------------------------------------------------------
267 
268 
269 PageCacheTracker::PageCacheTracker() : version_(kVersion), is_active_(true) {
270  map_.Init(16, 0, hasher_inode);
271  InitLock();
272 }
273 
274 
276  pthread_mutex_destroy(lock_);
277  free(lock_);
278 }
279 
280 
282  CopyFrom(other);
283  InitLock();
284 }
285 
286 
288  if (&other == this)
289  return *this;
290 
291  MutexLockGuard guard(lock_);
292  CopyFrom(other);
293  return *this;
294 }
295 
296 
298  assert(other.version_ == kVersion);
299 
300  version_ = kVersion;
301  is_active_ = other.is_active_;
302  statistics_ = other.statistics_;
303 
304  map_.Init(16, 0, hasher_inode);
305  map_ = other.map_;
306  stat_store_ = other.stat_store_;
307 }
308 
309 
311  lock_ =
312  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
313  int retval = pthread_mutex_init(lock_, NULL);
314  assert(retval == 0);
315 }
316 
318  uint64_t inode, const shash::Any &hash, const struct stat &info)
319 {
320  assert(inode == info.st_ino);
321 
322  OpenDirectives open_directives;
323  // Old behavior: always flush page cache on open
324  if (!is_active_)
325  return open_directives;
326 
327  if (inode != info.st_ino) {
329  "invalid entry on open: %" PRIu64 " with st_ino=%" PRIu64,
330  " hash=%s size=%" PRIu64,
331  inode, info.st_ino, hash.ToString().c_str(), info.st_size);
332  }
333 
334  MutexLockGuard guard(lock_);
335 
336  Entry entry;
337  bool retval = map_.Lookup(inode, &entry);
338  if (!retval) {
339  open_directives.keep_cache = true;
340  open_directives.direct_io = false;
343 
344  entry.nopen = 1;
345  entry.idx_stat = stat_store_.Add(info);
346  entry.hash = hash;
347  map_.Insert(inode, entry);
348  return open_directives;
349  }
350 
351  if (entry.hash == hash) {
352  open_directives.direct_io = false;
353  if (entry.nopen < 0) {
354  // The page cache is still in the transition phase and may contain old
355  // content. So trigger a flush of the cache in any case.
356  open_directives.keep_cache = false;
358  entry.nopen--;
359  map_.Insert(inode, entry);
360  return open_directives;
361  } else {
362  open_directives.keep_cache = true;
364  if (entry.nopen++ == 0)
365  entry.idx_stat = stat_store_.Add(info);
366  map_.Insert(inode, entry);
367  return open_directives;
368  }
369  }
370 
371  // Page cache mismatch and old data has still open file attached to it,
372  // circumvent the page cache entirely and use direct I/O. In this case,
373  // cvmfs_close() will _not_ call Close().
374  if (entry.nopen != 0) {
375  open_directives.keep_cache = true;
376  open_directives.direct_io = true;
378  return open_directives;
379  }
380 
381  // Stale data in the page cache, start the transition phase in which newly
382  // opened files flush the page cache and re-populate it with the new hash.
383  // The first file to reach Close() will finish the transition phase and
384  // mark the new hash as committed.
385  open_directives.direct_io = false;
386  open_directives.keep_cache = false;
388  entry.hash = hash;
389  entry.idx_stat = stat_store_.Add(info);
390  entry.nopen = -1;
391  map_.Insert(inode, entry);
392  return open_directives;
393 }
394 
396  OpenDirectives open_directives(true, true);
397  // Old behavior: always flush page cache on open
398  if (!is_active_)
399  return open_directives;
400 
401  MutexLockGuard guard(lock_);
403  return open_directives;
404 }
405 
406 void PageCacheTracker::Close(uint64_t inode) {
407  if (!is_active_)
408  return;
409 
410  MutexLockGuard guard(lock_);
411  Entry entry;
412  bool retval = map_.Lookup(inode, &entry);
413 
415  "PageCacheTracker::Close Race condition? "
416  "Did not find inode %lu",
417  inode)
419  "PageCacheTracker::Close Race condition? "
420  "Inode %lu has no open entries",
421  inode)) {
422  return;
423  }
424 
425  const int32_t old_open = entry.nopen;
426  if (entry.nopen < 0) {
427  // At this point we know that any stale data has been flushed from the
428  // cache and only data related to the currently booked content hash
429  // can be present. So clear the transition bit (sign bit).
430  entry.nopen = -entry.nopen;
431  }
432  entry.nopen--;
433  if (entry.nopen == 0) {
434  // File closed, remove struct stat information
435  if (entry.idx_stat < 0) {
437  "page cache tracker: missing stat entry! Entry info: inode %" PRIu64
438  " - open counter %d - hash %s",
439  inode, old_open, entry.hash.ToString().c_str());
440  }
441  uint64_t inode_update = stat_store_.Erase(entry.idx_stat);
442  Entry entry_update;
443  retval = map_.Lookup(inode_update, &entry_update);
444  if (!retval) {
446  "invalid inode in page cache tracker: inode %" PRIu64
447  ", replacing %" PRIu64, inode_update, inode);
448  }
449  assert(retval);
450  entry_update.idx_stat = entry.idx_stat;
451  map_.Insert(inode_update, entry_update);
452  entry.idx_stat = -1;
453  }
454  map_.Insert(inode, entry);
455 }
456 
458  : tracker_(t)
459 {
460  int retval = pthread_mutex_lock(tracker_->lock_);
461  assert(retval == 0);
462 }
463 
465  int retval = pthread_mutex_unlock(tracker_->lock_);
466  assert(retval == 0);
467 }
468 
470  if (!tracker_->is_active_)
471  return;
472 
473  bool contained_inode = tracker_->map_.Erase(inode);
474  if (contained_inode)
475  tracker_->statistics_.n_remove++;
476 }
477 
478 } // namespace glue
uint64_t inode_parent
Definition: glue_buffer.h:831
InodeReferences inode_references_
Definition: glue_buffer.h:810
pthread_mutex_t * lock_
Definition: glue_buffer.h:1062
void DoPrune(uint64_t now)
Definition: glue_buffer.h:905
NameString name
Definition: glue_buffer.h:832
PageCacheTracker & operator=(const PageCacheTracker &other)
Definition: glue_buffer.cc:287
Definition: glue_buffer.h:933
#define PANIC(...)
Definition: exception.h:29
Definition: glue_buffer.h:823
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
void CopyFrom(const DentryTracker &other)
Definition: glue_buffer.cc:150
perf::Statistics * statistics_
Definition: repository.h:139
pthread_t thread_cleaner_
Definition: glue_buffer.h:924
void Unlock() const
Definition: glue_buffer.h:900
assert((mem||(size==0))&&"Out Of Memory")
bool NextEntry(Cursor *cursor, uint64_t *inode_parent, NameString *name)
Definition: glue_buffer.cc:246
StringHeap * string_heap_
Definition: glue_buffer.h:368
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
void Lock() const
Definition: glue_buffer.h:896
int32_t nopen
Definition: glue_buffer.h:942
static uint32_t hasher_inode(const uint64_t &inode)
Definition: glue_buffer.h:100
void EndEnumerate(Cursor *cursor)
Definition: glue_buffer.cc:261
Statistics statistics_
Definition: glue_buffer.h:811
void SpawnCleaner(unsigned interval_s)
Definition: glue_buffer.cc:170
static bool AssertOrLog(int t, const LogSource, const int, const char *,...)
Definition: exception.h:61
int32_t idx_stat
Definition: glue_buffer.h:946
int32_t Add(const struct stat &info)
Definition: glue_buffer.h:384
DentryTracker * Move()
Definition: glue_buffer.cc:160
static const unsigned kVersion
Definition: glue_buffer.h:1057
uint64_t platform_monotonic_time()
InodeExMap inode_ex_map_
Definition: glue_buffer.h:809
SmallHashDynamic< shash::Md5, PathInfo > map_
Definition: glue_buffer.h:367
uint64_t used() const
Definition: glue_buffer.h:203
OpenDirectives Open(uint64_t inode, const shash::Any &hash, const struct stat &info)
Definition: glue_buffer.cc:317
void Close(uint64_t inode)
Definition: glue_buffer.cc:406
OpenDirectives OpenDirect()
Definition: glue_buffer.cc:395
EvictRaii(PageCacheTracker *t)
Definition: glue_buffer.cc:457
Definition: mutex.h:42
uint64_t Erase(int32_t index)
Definition: glue_buffer.h:393
DentryTracker & operator=(const DentryTracker &other)
Definition: glue_buffer.cc:139
void CopyFrom(const PageCacheTracker &other)
Definition: glue_buffer.cc:297
const char * kVersion
Definition: preload.cc:27
BigQueue< Entry > entries_
Definition: glue_buffer.h:920
static void * MainCleaner(void *data)
Definition: glue_buffer.cc:180
static const unsigned kVersion
Definition: glue_buffer.h:891
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
pthread_mutex_t * lock_
Definition: glue_buffer.h:916
SmallHashDynamic< uint64_t, Entry > map_
Definition: glue_buffer.h:1071
shash::Any hash
Definition: glue_buffer.h:951
Statistics statistics_
Definition: glue_buffer.h:918
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528