CernVM-FS  2.11.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
glue_buffer.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "cvmfs_config.h"
8 #include "glue_buffer.h"
9 
10 #include <errno.h>
11 #include <poll.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstdlib>
16 #include <cstring>
17 
18 #include <string>
19 #include <vector>
20 
21 #include "util/logging.h"
22 #include "util/mutex.h"
23 #include "util/platform.h"
24 #include "util/posix.h"
25 #include "util/smalloc.h"
26 
27 using namespace std; // NOLINT
28 
29 namespace glue {
30 
31 PathStore &PathStore::operator= (const PathStore &other) {
32  if (&other == this)
33  return *this;
34 
35  delete string_heap_;
36  CopyFrom(other);
37  return *this;
38 }
39 
40 
41 PathStore::PathStore(const PathStore &other) {
42  CopyFrom(other);
43 }
44 
45 
46 void PathStore::CopyFrom(const PathStore &other) {
47  map_ = other.map_;
48 
49  string_heap_ = new StringHeap(other.string_heap_->used());
50  shash::Md5 empty_path = map_.empty_key();
51  for (unsigned i = 0; i < map_.capacity(); ++i) {
52  if (map_.keys()[i] != empty_path) {
53  (map_.values() + i)->name =
54  string_heap_->AddString(map_.values()[i].name.length(),
55  map_.values()[i].name.data());
56  }
57  }
58 }
59 
60 
61 //------------------------------------------------------------------------------
62 
63 
64 void InodeTracker::InitLock() {
65  lock_ =
66  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
67  int retval = pthread_mutex_init(lock_, NULL);
68  assert(retval == 0);
69 }
70 
71 
72 void InodeTracker::CopyFrom(const InodeTracker &other) {
73  assert(other.version_ == kVersion);
74  version_ = kVersion;
75  path_map_ = other.path_map_;
76  inode_ex_map_ = other.inode_ex_map_;
77  inode_references_ = other.inode_references_;
78  statistics_ = other.statistics_;
79 }
80 
81 
82 InodeTracker::InodeTracker() {
83  version_ = kVersion;
84  InitLock();
85 }
86 
87 
88 InodeTracker::InodeTracker(const InodeTracker &other) {
89  CopyFrom(other);
90  InitLock();
91 }
92 
93 
94 InodeTracker &InodeTracker::operator= (const InodeTracker &other) {
95  if (&other == this)
96  return *this;
97 
98  CopyFrom(other);
99  return *this;
100 }
101 
102 
103 InodeTracker::~InodeTracker() {
104  pthread_mutex_destroy(lock_);
105  free(lock_);
106 }
107 
108 
109 //------------------------------------------------------------------------------
110 
111 DentryTracker::DentryTracker() : version_(kVersion), is_active_(true) {
112  pipe_terminate_[0] = pipe_terminate_[1] = -1;
114  InitLock();
115 }
116 
117 
119  if (pipe_terminate_[1] >= 0) {
120  char t = 'T';
121  WritePipe(pipe_terminate_[1], &t, 1);
122  pthread_join(thread_cleaner_, NULL);
124  }
125  pthread_mutex_destroy(lock_);
126  free(lock_);
127 }
128 
129 
131  CopyFrom(other);
132  pipe_terminate_[0] = pipe_terminate_[1] = -1;
134  InitLock();
135 }
136 
137 
139  if (&other == this)
140  return *this;
141 
142  Lock();
143  CopyFrom(other);
144  Unlock();
145  return *this;
146 }
147 
148 
150  assert(other.version_ == kVersion);
151 
152  version_ = kVersion;
153  statistics_ = other.statistics_;
154  is_active_ = other.is_active_;
155  entries_ = other.entries_;
156 }
157 
158 
160  Lock();
161  DentryTracker *new_tracker = new DentryTracker(*this);
162  statistics_.num_remove += entries_.size();
163  entries_.Clear();
164  Unlock();
165  return new_tracker;
166 }
167 
168 
169 void DentryTracker::SpawnCleaner(unsigned interval_s) {
170  assert(pipe_terminate_[0] == -1);
171  cleaning_interval_ms_ = interval_s * 1000;
174  int retval = pthread_create(&thread_cleaner_, NULL, MainCleaner, this);
175  assert(retval == 0);
176 }
177 
178 
179 void *DentryTracker::MainCleaner(void *data) {
180  DentryTracker *tracker = reinterpret_cast<DentryTracker *>(data);
181  LogCvmfs(kLogCvmfs, kLogDebug, "starting negative entry cache cleaner");
182 
183  struct pollfd watch_term;
184  watch_term.fd = tracker->pipe_terminate_[0];
185  watch_term.events = POLLIN | POLLPRI;
186  int timeout_ms = tracker->cleaning_interval_ms_;;
187  uint64_t deadline = platform_monotonic_time() + timeout_ms / 1000;
188  while (true) {
189  watch_term.revents = 0;
190  int retval = poll(&watch_term, 1, timeout_ms);
191  if (retval < 0) {
192  if (errno == EINTR) {
193  if (timeout_ms >= 0) {
194  uint64_t now = platform_monotonic_time();
195  timeout_ms = (now > deadline) ? 0 : (deadline - now) * 1000;
196  }
197  continue;
198  }
199  abort();
200  }
201  timeout_ms = tracker->cleaning_interval_ms_;
202  deadline = platform_monotonic_time() + timeout_ms / 1000;
203 
204  if (retval == 0) {
205  LogCvmfs(kLogCvmfs, kLogDebug, "negative entry cleaner: pruning");
206  tracker->Prune();
207  continue;
208  }
209 
210  assert(watch_term.revents != 0);
211 
212  char c = 0;
213  ReadPipe(tracker->pipe_terminate_[0], &c, 1);
214  assert(c == 'T');
215  break;
216  }
217  LogCvmfs(kLogCvmfs, kLogDebug, "stopping negative entry cache cleaner");
218  return NULL;
219 }
220 
221 
223  lock_ =
224  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
225  int retval = pthread_mutex_init(lock_, NULL);
226  assert(retval == 0);
227 }
228 
229 
231  Lock();
233  Unlock();
234 }
235 
236 
238  Entry *head = NULL;
239  Lock();
240  entries_.Peek(&head);
241  return Cursor(head);
242 }
243 
244 
246  uint64_t *inode_parent, NameString *name)
247 {
248  if (cursor->head == NULL)
249  return false;
250  if (cursor->pos >= entries_.size())
251  return false;
252  Entry *e = cursor->head + cursor->pos;
253  *inode_parent = e->inode_parent;
254  *name = e->name;
255  cursor->pos++;
256  return true;
257 }
258 
259 
260 void DentryTracker::EndEnumerate(Cursor * /* cursor */) {
261  Unlock();
262 }
263 
264 
265 //------------------------------------------------------------------------------
266 
267 
268 PageCacheTracker::PageCacheTracker() : version_(kVersion), is_active_(true) {
269  map_.Init(16, 0, hasher_inode);
270  InitLock();
271 }
272 
273 
275  pthread_mutex_destroy(lock_);
276  free(lock_);
277 }
278 
279 
281  CopyFrom(other);
282  InitLock();
283 }
284 
285 
287  if (&other == this)
288  return *this;
289 
290  MutexLockGuard guard(lock_);
291  CopyFrom(other);
292  return *this;
293 }
294 
295 
297  assert(other.version_ == kVersion);
298 
299  version_ = kVersion;
300  is_active_ = other.is_active_;
301  statistics_ = other.statistics_;
302 
303  map_.Init(16, 0, hasher_inode);
304  map_ = other.map_;
305  stat_store_ = other.stat_store_;
306 }
307 
308 
310  lock_ =
311  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
312  int retval = pthread_mutex_init(lock_, NULL);
313  assert(retval == 0);
314 }
315 
317  uint64_t inode, const shash::Any &hash, const struct stat &info)
318 {
319  assert(inode == info.st_ino);
320 
321  OpenDirectives open_directives;
322  // Old behavior: always flush page cache on open
323  if (!is_active_)
324  return open_directives;
325 
326  MutexLockGuard guard(lock_);
327 
328  Entry entry;
329  bool retval = map_.Lookup(inode, &entry);
330  if (!retval) {
331  open_directives.keep_cache = true;
332  open_directives.direct_io = false;
335 
336  entry.nopen = 1;
337  entry.idx_stat = stat_store_.Add(info);
338  entry.hash = hash;
339  map_.Insert(inode, entry);
340  return open_directives;
341  }
342 
343  if (entry.hash == hash) {
344  open_directives.direct_io = false;
345  if (entry.nopen < 0) {
346  // The page cache is still in the transition phase and may contain old
347  // content. So trigger a flush of the cache in any case.
348  open_directives.keep_cache = false;
350  entry.nopen--;
351  map_.Insert(inode, entry);
352  return open_directives;
353  } else {
354  open_directives.keep_cache = true;
356  if (entry.nopen++ == 0)
357  entry.idx_stat = stat_store_.Add(info);
358  map_.Insert(inode, entry);
359  return open_directives;
360  }
361  }
362 
363  // Page cache mismatch and old data has still open file attached to it,
364  // circumvent the page cache entirely and use direct I/O. In this case,
365  // cvmfs_close() will _not_ call Close().
366  if (entry.nopen != 0) {
367  open_directives.keep_cache = true;
368  open_directives.direct_io = true;
370  return open_directives;
371  }
372 
373  // Stale data in the page cache, start the transition phase in which newly
374  // opened files flush the page cache and re-populate it with the new hash.
375  // The first file to reach Close() will finish the transition phase and
376  // mark the new hash as committed.
377  open_directives.direct_io = false;
378  open_directives.keep_cache = false;
380  entry.hash = hash;
381  entry.idx_stat = stat_store_.Add(info);
382  entry.nopen = -1;
383  map_.Insert(inode, entry);
384  return open_directives;
385 }
386 
388  OpenDirectives open_directives(true, true);
389  // Old behavior: always flush page cache on open
390  if (!is_active_)
391  return open_directives;
392 
393  MutexLockGuard guard(lock_);
395  return open_directives;
396 }
397 
398 void PageCacheTracker::Close(uint64_t inode) {
399  if (!is_active_)
400  return;
401 
402  MutexLockGuard guard(lock_);
403  Entry entry;
404  bool retval = map_.Lookup(inode, &entry);
405  assert(retval);
406  assert(entry.nopen != 0);
407  if (entry.nopen < 0) {
408  // At this point we know that any stale data has been flushed from the
409  // cache and only data related to the currently booked content hash
410  // can be present. So clear the transition bit (sign bit).
411  entry.nopen = -entry.nopen;
412  }
413  entry.nopen--;
414  if (entry.nopen == 0) {
415  // File closed, remove struct stat information
416  assert(entry.idx_stat >= 0);
417  uint64_t inode_update = stat_store_.Erase(entry.idx_stat);
418  Entry entry_update;
419  retval = map_.Lookup(inode_update, &entry_update);
420  assert(retval);
421  entry_update.idx_stat = entry.idx_stat;
422  map_.Insert(inode_update, entry_update);
423  entry.idx_stat = -1;
424  }
425  map_.Insert(inode, entry);
426 }
427 
429  : tracker_(t)
430 {
431  int retval = pthread_mutex_lock(tracker_->lock_);
432  assert(retval == 0);
433 }
434 
436  int retval = pthread_mutex_unlock(tracker_->lock_);
437  assert(retval == 0);
438 }
439 
441  if (!tracker_->is_active_)
442  return;
443 
444  bool contained_inode = tracker_->map_.Erase(inode);
445  if (contained_inode)
446  tracker_->statistics_.n_remove++;
447 }
448 
449 } // namespace glue
#define LogCvmfs(source, mask,...)
Definition: logging.h:22
uint64_t inode_parent
Definition: glue_buffer.h:821
InodeReferences inode_references_
Definition: glue_buffer.h:800
pthread_mutex_t * lock_
Definition: glue_buffer.h:1052
void DoPrune(uint64_t now)
Definition: glue_buffer.h:895
NameString name
Definition: glue_buffer.h:822
PageCacheTracker & operator=(const PageCacheTracker &other)
Definition: glue_buffer.cc:286
Definition: glue_buffer.h:923
Definition: glue_buffer.h:813
void CopyFrom(const DentryTracker &other)
Definition: glue_buffer.cc:149
perf::Statistics * statistics_
Definition: repository.h:139
pthread_t thread_cleaner_
Definition: glue_buffer.h:914
void Unlock() const
Definition: glue_buffer.h:890
assert((mem||(size==0))&&"Out Of Memory")
bool NextEntry(Cursor *cursor, uint64_t *inode_parent, NameString *name)
Definition: glue_buffer.cc:245
StringHeap * string_heap_
Definition: glue_buffer.h:367
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
void Lock() const
Definition: glue_buffer.h:886
int32_t nopen
Definition: glue_buffer.h:932
static uint32_t hasher_inode(const uint64_t &inode)
Definition: glue_buffer.h:99
void EndEnumerate(Cursor *cursor)
Definition: glue_buffer.cc:260
Statistics statistics_
Definition: glue_buffer.h:801
void SpawnCleaner(unsigned interval_s)
Definition: glue_buffer.cc:169
int32_t idx_stat
Definition: glue_buffer.h:936
int32_t Add(const struct stat &info)
Definition: glue_buffer.h:383
DentryTracker * Move()
Definition: glue_buffer.cc:159
static const unsigned kVersion
Definition: glue_buffer.h:1047
uint64_t platform_monotonic_time()
InodeExMap inode_ex_map_
Definition: glue_buffer.h:799
SmallHashDynamic< shash::Md5, PathInfo > map_
Definition: glue_buffer.h:366
uint64_t used() const
Definition: glue_buffer.h:202
OpenDirectives Open(uint64_t inode, const shash::Any &hash, const struct stat &info)
Definition: glue_buffer.cc:316
void Close(uint64_t inode)
Definition: glue_buffer.cc:398
OpenDirectives OpenDirect()
Definition: glue_buffer.cc:387
EvictRaii(PageCacheTracker *t)
Definition: glue_buffer.cc:428
Definition: mutex.h:42
uint64_t Erase(int32_t index)
Definition: glue_buffer.h:392
DentryTracker & operator=(const DentryTracker &other)
Definition: glue_buffer.cc:138
void CopyFrom(const PageCacheTracker &other)
Definition: glue_buffer.cc:296
const char * kVersion
Definition: preload.cc:27
BigQueue< Entry > entries_
Definition: glue_buffer.h:910
static void * MainCleaner(void *data)
Definition: glue_buffer.cc:179
static const unsigned kVersion
Definition: glue_buffer.h:881
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:546
pthread_mutex_t * lock_
Definition: glue_buffer.h:906
SmallHashDynamic< uint64_t, Entry > map_
Definition: glue_buffer.h:1061
shash::Any hash
Definition: glue_buffer.h:941
Statistics statistics_
Definition: glue_buffer.h:908