CernVM-FS  2.11.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
glue_buffer.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "cvmfs_config.h"
8 #include "glue_buffer.h"
9 
10 #include <errno.h>
11 #include <poll.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstdlib>
16 #include <cstring>
17 
18 #include <string>
19 #include <vector>
20 
21 #include "util/exception.h"
22 #include "util/logging.h"
23 #include "util/mutex.h"
24 #include "util/platform.h"
25 #include "util/posix.h"
26 #include "util/smalloc.h"
27 
28 using namespace std; // NOLINT
29 
30 namespace glue {
31 
32 PathStore &PathStore::operator= (const PathStore &other) {
33  if (&other == this)
34  return *this;
35 
36  delete string_heap_;
37  CopyFrom(other);
38  return *this;
39 }
40 
41 
42 PathStore::PathStore(const PathStore &other) {
43  CopyFrom(other);
44 }
45 
46 
47 void PathStore::CopyFrom(const PathStore &other) {
48  map_ = other.map_;
49 
50  string_heap_ = new StringHeap(other.string_heap_->used());
51  shash::Md5 empty_path = map_.empty_key();
52  for (unsigned i = 0; i < map_.capacity(); ++i) {
53  if (map_.keys()[i] != empty_path) {
54  (map_.values() + i)->name =
55  string_heap_->AddString(map_.values()[i].name.length(),
56  map_.values()[i].name.data());
57  }
58  }
59 }
60 
61 
62 //------------------------------------------------------------------------------
63 
64 
65 void InodeTracker::InitLock() {
66  lock_ =
67  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
68  int retval = pthread_mutex_init(lock_, NULL);
69  assert(retval == 0);
70 }
71 
72 
73 void InodeTracker::CopyFrom(const InodeTracker &other) {
74  assert(other.version_ == kVersion);
75  version_ = kVersion;
76  path_map_ = other.path_map_;
77  inode_ex_map_ = other.inode_ex_map_;
78  inode_references_ = other.inode_references_;
79  statistics_ = other.statistics_;
80 }
81 
82 
83 InodeTracker::InodeTracker() {
84  version_ = kVersion;
85  InitLock();
86 }
87 
88 
89 InodeTracker::InodeTracker(const InodeTracker &other) {
90  CopyFrom(other);
91  InitLock();
92 }
93 
94 
95 InodeTracker &InodeTracker::operator= (const InodeTracker &other) {
96  if (&other == this)
97  return *this;
98 
99  CopyFrom(other);
100  return *this;
101 }
102 
103 
104 InodeTracker::~InodeTracker() {
105  pthread_mutex_destroy(lock_);
106  free(lock_);
107 }
108 
109 
110 //------------------------------------------------------------------------------
111 
112 DentryTracker::DentryTracker() : version_(kVersion), is_active_(true) {
113  pipe_terminate_[0] = pipe_terminate_[1] = -1;
115  InitLock();
116 }
117 
118 
120  if (pipe_terminate_[1] >= 0) {
121  char t = 'T';
122  WritePipe(pipe_terminate_[1], &t, 1);
123  pthread_join(thread_cleaner_, NULL);
125  }
126  pthread_mutex_destroy(lock_);
127  free(lock_);
128 }
129 
130 
132  CopyFrom(other);
133  pipe_terminate_[0] = pipe_terminate_[1] = -1;
135  InitLock();
136 }
137 
138 
140  if (&other == this)
141  return *this;
142 
143  Lock();
144  CopyFrom(other);
145  Unlock();
146  return *this;
147 }
148 
149 
151  assert(other.version_ == kVersion);
152 
153  version_ = kVersion;
154  statistics_ = other.statistics_;
155  is_active_ = other.is_active_;
156  entries_ = other.entries_;
157 }
158 
159 
161  Lock();
162  DentryTracker *new_tracker = new DentryTracker(*this);
163  statistics_.num_remove += entries_.size();
164  entries_.Clear();
165  Unlock();
166  return new_tracker;
167 }
168 
169 
170 void DentryTracker::SpawnCleaner(unsigned interval_s) {
171  assert(pipe_terminate_[0] == -1);
172  cleaning_interval_ms_ = interval_s * 1000;
175  int retval = pthread_create(&thread_cleaner_, NULL, MainCleaner, this);
176  assert(retval == 0);
177 }
178 
179 
180 void *DentryTracker::MainCleaner(void *data) {
181  DentryTracker *tracker = reinterpret_cast<DentryTracker *>(data);
182  LogCvmfs(kLogCvmfs, kLogDebug, "starting negative entry cache cleaner");
183 
184  struct pollfd watch_term;
185  watch_term.fd = tracker->pipe_terminate_[0];
186  watch_term.events = POLLIN | POLLPRI;
187  int timeout_ms = tracker->cleaning_interval_ms_;;
188  uint64_t deadline = platform_monotonic_time() + timeout_ms / 1000;
189  while (true) {
190  watch_term.revents = 0;
191  int retval = poll(&watch_term, 1, timeout_ms);
192  if (retval < 0) {
193  if (errno == EINTR) {
194  if (timeout_ms >= 0) {
195  uint64_t now = platform_monotonic_time();
196  timeout_ms = (now > deadline) ? 0 : (deadline - now) * 1000;
197  }
198  continue;
199  }
200  abort();
201  }
202  timeout_ms = tracker->cleaning_interval_ms_;
203  deadline = platform_monotonic_time() + timeout_ms / 1000;
204 
205  if (retval == 0) {
206  LogCvmfs(kLogCvmfs, kLogDebug, "negative entry cleaner: pruning");
207  tracker->Prune();
208  continue;
209  }
210 
211  assert(watch_term.revents != 0);
212 
213  char c = 0;
214  ReadPipe(tracker->pipe_terminate_[0], &c, 1);
215  assert(c == 'T');
216  break;
217  }
218  LogCvmfs(kLogCvmfs, kLogDebug, "stopping negative entry cache cleaner");
219  return NULL;
220 }
221 
222 
224  lock_ =
225  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
226  int retval = pthread_mutex_init(lock_, NULL);
227  assert(retval == 0);
228 }
229 
230 
232  Lock();
234  Unlock();
235 }
236 
237 
239  Entry *head = NULL;
240  Lock();
241  entries_.Peek(&head);
242  return Cursor(head);
243 }
244 
245 
247  uint64_t *inode_parent, NameString *name)
248 {
249  if (cursor->head == NULL)
250  return false;
251  if (cursor->pos >= entries_.size())
252  return false;
253  Entry *e = cursor->head + cursor->pos;
254  *inode_parent = e->inode_parent;
255  *name = e->name;
256  cursor->pos++;
257  return true;
258 }
259 
260 
261 void DentryTracker::EndEnumerate(Cursor * /* cursor */) {
262  Unlock();
263 }
264 
265 
266 //------------------------------------------------------------------------------
267 
268 
269 PageCacheTracker::PageCacheTracker() : version_(kVersion), is_active_(true) {
270  map_.Init(16, 0, hasher_inode);
271  InitLock();
272 }
273 
274 
276  pthread_mutex_destroy(lock_);
277  free(lock_);
278 }
279 
280 
282  CopyFrom(other);
283  InitLock();
284 }
285 
286 
288  if (&other == this)
289  return *this;
290 
291  MutexLockGuard guard(lock_);
292  CopyFrom(other);
293  return *this;
294 }
295 
296 
298  assert(other.version_ == kVersion);
299 
300  version_ = kVersion;
301  is_active_ = other.is_active_;
302  statistics_ = other.statistics_;
303 
304  map_.Init(16, 0, hasher_inode);
305  map_ = other.map_;
306  stat_store_ = other.stat_store_;
307 }
308 
309 
311  lock_ =
312  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
313  int retval = pthread_mutex_init(lock_, NULL);
314  assert(retval == 0);
315 }
316 
318  uint64_t inode, const shash::Any &hash, const struct stat &info)
319 {
320  assert(inode == info.st_ino);
321 
322  OpenDirectives open_directives;
323  // Old behavior: always flush page cache on open
324  if (!is_active_)
325  return open_directives;
326 
327  MutexLockGuard guard(lock_);
328 
329  Entry entry;
330  bool retval = map_.Lookup(inode, &entry);
331  if (!retval) {
332  open_directives.keep_cache = true;
333  open_directives.direct_io = false;
336 
337  entry.nopen = 1;
338  entry.idx_stat = stat_store_.Add(info);
339  entry.hash = hash;
340  map_.Insert(inode, entry);
341  return open_directives;
342  }
343 
344  if (entry.hash == hash) {
345  open_directives.direct_io = false;
346  if (entry.nopen < 0) {
347  // The page cache is still in the transition phase and may contain old
348  // content. So trigger a flush of the cache in any case.
349  open_directives.keep_cache = false;
351  entry.nopen--;
352  map_.Insert(inode, entry);
353  return open_directives;
354  } else {
355  open_directives.keep_cache = true;
357  if (entry.nopen++ == 0)
358  entry.idx_stat = stat_store_.Add(info);
359  map_.Insert(inode, entry);
360  return open_directives;
361  }
362  }
363 
364  // Page cache mismatch and old data has still open file attached to it,
365  // circumvent the page cache entirely and use direct I/O. In this case,
366  // cvmfs_close() will _not_ call Close().
367  if (entry.nopen != 0) {
368  open_directives.keep_cache = true;
369  open_directives.direct_io = true;
371  return open_directives;
372  }
373 
374  // Stale data in the page cache, start the transition phase in which newly
375  // opened files flush the page cache and re-populate it with the new hash.
376  // The first file to reach Close() will finish the transition phase and
377  // mark the new hash as committed.
378  open_directives.direct_io = false;
379  open_directives.keep_cache = false;
381  entry.hash = hash;
382  entry.idx_stat = stat_store_.Add(info);
383  entry.nopen = -1;
384  map_.Insert(inode, entry);
385  return open_directives;
386 }
387 
389  OpenDirectives open_directives(true, true);
390  // Old behavior: always flush page cache on open
391  if (!is_active_)
392  return open_directives;
393 
394  MutexLockGuard guard(lock_);
396  return open_directives;
397 }
398 
399 void PageCacheTracker::Close(uint64_t inode) {
400  if (!is_active_)
401  return;
402 
403  MutexLockGuard guard(lock_);
404  Entry entry;
405  bool retval = map_.Lookup(inode, &entry);
406 
408  "PageCacheTracker::Close Race condition? "
409  "Did not find inode %lu",
410  inode)
412  "PageCacheTracker::Close Race condition? "
413  "Inode %lu has no open entries",
414  inode)) {
415  return;
416  }
417 
418  if (entry.nopen < 0) {
419  // At this point we know that any stale data has been flushed from the
420  // cache and only data related to the currently booked content hash
421  // can be present. So clear the transition bit (sign bit).
422  entry.nopen = -entry.nopen;
423  }
424  entry.nopen--;
425  if (entry.nopen == 0) {
426  // File closed, remove struct stat information
427  assert(entry.idx_stat >= 0);
428  uint64_t inode_update = stat_store_.Erase(entry.idx_stat);
429  Entry entry_update;
430  retval = map_.Lookup(inode_update, &entry_update);
431  assert(retval);
432  entry_update.idx_stat = entry.idx_stat;
433  map_.Insert(inode_update, entry_update);
434  entry.idx_stat = -1;
435  }
436  map_.Insert(inode, entry);
437 }
438 
440  : tracker_(t)
441 {
442  int retval = pthread_mutex_lock(tracker_->lock_);
443  assert(retval == 0);
444 }
445 
447  int retval = pthread_mutex_unlock(tracker_->lock_);
448  assert(retval == 0);
449 }
450 
452  if (!tracker_->is_active_)
453  return;
454 
455  bool contained_inode = tracker_->map_.Erase(inode);
456  if (contained_inode)
457  tracker_->statistics_.n_remove++;
458 }
459 
460 } // namespace glue
#define LogCvmfs(source, mask,...)
Definition: logging.h:25
uint64_t inode_parent
Definition: glue_buffer.h:821
InodeReferences inode_references_
Definition: glue_buffer.h:800
pthread_mutex_t * lock_
Definition: glue_buffer.h:1052
void DoPrune(uint64_t now)
Definition: glue_buffer.h:895
NameString name
Definition: glue_buffer.h:822
PageCacheTracker & operator=(const PageCacheTracker &other)
Definition: glue_buffer.cc:287
Definition: glue_buffer.h:923
Definition: glue_buffer.h:813
void CopyFrom(const DentryTracker &other)
Definition: glue_buffer.cc:150
perf::Statistics * statistics_
Definition: repository.h:139
pthread_t thread_cleaner_
Definition: glue_buffer.h:914
void Unlock() const
Definition: glue_buffer.h:890
assert((mem||(size==0))&&"Out Of Memory")
bool NextEntry(Cursor *cursor, uint64_t *inode_parent, NameString *name)
Definition: glue_buffer.cc:246
StringHeap * string_heap_
Definition: glue_buffer.h:367
void MakePipe(int pipe_fd[2])
Definition: posix.cc:489
void Lock() const
Definition: glue_buffer.h:886
int32_t nopen
Definition: glue_buffer.h:932
static uint32_t hasher_inode(const uint64_t &inode)
Definition: glue_buffer.h:99
void EndEnumerate(Cursor *cursor)
Definition: glue_buffer.cc:261
Statistics statistics_
Definition: glue_buffer.h:801
void SpawnCleaner(unsigned interval_s)
Definition: glue_buffer.cc:170
static bool AssertOrLog(int t, const LogSource, const int, const char *,...)
Definition: exception.h:61
int32_t idx_stat
Definition: glue_buffer.h:936
int32_t Add(const struct stat &info)
Definition: glue_buffer.h:383
DentryTracker * Move()
Definition: glue_buffer.cc:160
static const unsigned kVersion
Definition: glue_buffer.h:1047
uint64_t platform_monotonic_time()
InodeExMap inode_ex_map_
Definition: glue_buffer.h:799
SmallHashDynamic< shash::Md5, PathInfo > map_
Definition: glue_buffer.h:366
uint64_t used() const
Definition: glue_buffer.h:202
OpenDirectives Open(uint64_t inode, const shash::Any &hash, const struct stat &info)
Definition: glue_buffer.cc:317
void Close(uint64_t inode)
Definition: glue_buffer.cc:399
OpenDirectives OpenDirect()
Definition: glue_buffer.cc:388
EvictRaii(PageCacheTracker *t)
Definition: glue_buffer.cc:439
Definition: mutex.h:42
uint64_t Erase(int32_t index)
Definition: glue_buffer.h:392
DentryTracker & operator=(const DentryTracker &other)
Definition: glue_buffer.cc:139
void CopyFrom(const PageCacheTracker &other)
Definition: glue_buffer.cc:297
const char * kVersion
Definition: preload.cc:27
BigQueue< Entry > entries_
Definition: glue_buffer.h:910
static void * MainCleaner(void *data)
Definition: glue_buffer.cc:180
static const unsigned kVersion
Definition: glue_buffer.h:881
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:498
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:510
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:548
pthread_mutex_t * lock_
Definition: glue_buffer.h:906
SmallHashDynamic< uint64_t, Entry > map_
Definition: glue_buffer.h:1061
shash::Any hash
Definition: glue_buffer.h:941
Statistics statistics_
Definition: glue_buffer.h:908