CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
kvstore.cc
Go to the documentation of this file.
1 
5 #include "kvstore.h"
6 
7 #include <assert.h>
8 #include <errno.h>
9 #include <limits.h>
10 #include <string.h>
11 #include <unistd.h>
12 
13 #include <algorithm>
14 
15 #include "util/async.h"
16 #include "util/concurrency.h"
17 #include "util/logging.h"
18 
19 using namespace std; // NOLINT
20 
21 namespace {
22 
23 static inline uint32_t hasher_any(const shash::Any &key) {
24  // We'll just do the same thing as hasher_md5, since every hash is at
25  // least as large.
26  return (uint32_t) * (reinterpret_cast<const uint32_t *>(key.digest) + 1);
27 }
28 
29 } // anonymous namespace
30 
31 const double MemoryKvStore::kCompactThreshold = 0.8;
32 
33 
34 MemoryKvStore::MemoryKvStore(unsigned int cache_entries,
35  MemoryAllocator alloc,
36  unsigned alloc_size,
37  perf::StatisticsTemplate statistics)
38  : allocator_(alloc)
39  , used_bytes_(0)
40  , entry_count_(0)
41  , max_entries_(cache_entries)
42  , entries_(cache_entries, shash::Any(), hasher_any,
43  perf::StatisticsTemplate("lru", statistics))
44  , heap_(NULL)
45  , counters_(statistics) {
46  int retval = pthread_rwlock_init(&rwlock_, NULL);
47  assert(retval == 0);
48  switch (alloc) {
49  case kMallocHeap:
50  heap_ = new MallocHeap(
51  alloc_size, this->MakeCallback(&MemoryKvStore::OnBlockMove, this));
52  break;
53  default:
54  break;
55  }
56 }
57 
58 
60  delete heap_;
61  pthread_rwlock_destroy(&rwlock_);
62 }
63 
64 
66  bool ok;
67  struct AllocHeader a;
68  MemoryBuffer buf;
69 
70  // must be locked by caller
71  assert(ptr.pointer);
72  memcpy(&a, ptr.pointer, sizeof(a));
73  LogCvmfs(kLogKvStore, kLogDebug, "compaction moved %s to %p",
74  a.id.ToString().c_str(), ptr.pointer);
75  assert(a.version == 0);
76  const bool update_lru = false;
77  ok = entries_.Lookup(a.id, &buf, update_lru);
78  assert(ok);
79  buf.address = static_cast<char *>(ptr.pointer) + sizeof(a);
80  ok = entries_.UpdateValue(buf.id, buf);
81  assert(ok);
82 }
83 
84 
86  MemoryBuffer buf;
87  // LogCvmfs(kLogKvStore, kLogDebug, "check buffer %s", id.ToString().c_str());
88  const bool update_lru = false;
89  return entries_.Lookup(id, &buf, update_lru);
90 }
91 
92 
94  MemoryBuffer tmp;
95  AllocHeader a;
96 
97  assert(buf);
98  memcpy(&tmp, buf, sizeof(tmp));
99 
100  tmp.address = NULL;
101  if (tmp.size > 0) {
102  switch (allocator_) {
103  case kMallocLibc:
104  tmp.address = malloc(tmp.size);
105  if (!tmp.address)
106  return -errno;
107  break;
108  case kMallocHeap:
109  assert(heap_);
110  a.id = tmp.id;
111  tmp.address = heap_->Allocate(tmp.size + sizeof(a), &a, sizeof(a));
112  if (!tmp.address)
113  return -ENOMEM;
114  tmp.address = static_cast<char *>(tmp.address) + sizeof(a);
115  break;
116  default:
117  abort();
118  }
119  }
120 
121  memcpy(buf, &tmp, sizeof(*buf));
122  return 0;
123 }
124 
125 
127  AllocHeader a;
128 
129  assert(buf);
130  if (!buf->address)
131  return;
132  switch (allocator_) {
133  case kMallocLibc:
134  free(buf->address);
135  return;
136  case kMallocHeap:
137  heap_->MarkFree(static_cast<char *>(buf->address) - sizeof(a));
138  return;
139  default:
140  abort();
141  }
142 }
143 
144 
146  double utilization;
147  switch (allocator_) {
148  case kMallocHeap:
149  utilization = heap_->utilization();
150  LogCvmfs(kLogKvStore, kLogDebug, "compact requested (%f)", utilization);
151  if (utilization < kCompactThreshold) {
152  LogCvmfs(kLogKvStore, kLogDebug, "compacting heap");
153  heap_->Compact();
154  if (heap_->utilization() > utilization)
155  return true;
156  }
157  return false;
158  default:
159  // the others can't do any compact, so just ignore
160  LogCvmfs(kLogKvStore, kLogDebug, "compact requested");
161  return false;
162  }
163 }
164 
165 
166 int64_t MemoryKvStore::GetSize(const shash::Any &id) {
169  const bool update_lru = false;
170  if (entries_.Lookup(id, &mem, update_lru)) {
171  // LogCvmfs(kLogKvStore, kLogDebug, "%s is %u B", id.ToString().c_str(),
172  // mem.size);
173  return mem.size;
174  } else {
175  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on GetSize",
176  id.ToString().c_str());
177  return -ENOENT;
178  }
179 }
180 
181 
185  const bool update_lru = false;
186  if (entries_.Lookup(id, &mem, update_lru)) {
187  // LogCvmfs(kLogKvStore, kLogDebug, "%s has refcount %u",
188  // id.ToString().c_str(), mem.refcount);
189  return mem.refcount;
190  } else {
191  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on GetRefcount",
192  id.ToString().c_str());
193  return -ENOENT;
194  }
195 }
196 
197 
200  WriteLockGuard guard(rwlock_);
202  if (entries_.Lookup(id, &mem)) {
203  assert(mem.refcount < UINT_MAX);
204  ++mem.refcount;
205  entries_.Insert(id, mem);
206  LogCvmfs(kLogKvStore, kLogDebug, "increased refcount of %s to %u",
207  id.ToString().c_str(), mem.refcount);
208  return true;
209  } else {
210  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on IncRef",
211  id.ToString().c_str());
212  return false;
213  }
214 }
215 
216 
219  WriteLockGuard guard(rwlock_);
221  if (entries_.Lookup(id, &mem)) {
222  assert(mem.refcount > 0);
223  --mem.refcount;
224  entries_.Insert(id, mem);
225  LogCvmfs(kLogKvStore, kLogDebug, "decreased refcount of %s to %u",
226  id.ToString().c_str(), mem.refcount);
227  return true;
228  } else {
229  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Unref", id.ToString().c_str());
230  return false;
231  }
232 }
233 
234 
235 int64_t MemoryKvStore::Read(const shash::Any &id,
236  void *buf,
237  size_t size,
238  size_t offset) {
241  ReadLockGuard guard(rwlock_);
242  if (!entries_.Lookup(id, &mem)) {
243  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Read", id.ToString().c_str());
244  return -ENOENT;
245  }
246  if (offset > mem.size) {
247  LogCvmfs(kLogKvStore, kLogDebug, "out of bounds read (%zu>%zu) on %s",
248  offset, mem.size, id.ToString().c_str());
249  return 0;
250  }
251  uint64_t copy_size = std::min(mem.size - offset, size);
252  // LogCvmfs(kLogKvStore, kLogDebug, "copy %u B from offset %u of %s",
253  // copy_size, offset, id.ToString().c_str());
254  memcpy(buf, static_cast<char *>(mem.address) + offset, copy_size);
255  perf::Xadd(counters_.sz_read, copy_size);
256  return copy_size;
257 }
258 
259 
261  WriteLockGuard guard(rwlock_);
262  return DoCommit(buf);
263 }
264 
265 
267  // we need to be careful about refcounts. If another thread wants to read
268  // a cache entry while it's being written (OpenFromTxn put partial data in
269  // the kvstore, will be committed again later) the refcount in the kvstore
270  // will differ from the refcount in the cache transaction. To avoid leaks,
271  // either the caller needs to fetch the cache entry before every write to
272  // find the current refcount, or the kvstore can ignore the passed-in
273  // refcount if the entry already exists. This implementation does the latter,
274  // and as a result it's not possible to directly modify the refcount
275  // without a race condition. This is a hint that callers should use the
276  // refcount like a lock and not directly modify the numeric value.
277 
278  CompactMemory();
279 
282  LogCvmfs(kLogKvStore, kLogDebug, "commit %s", buf.id.ToString().c_str());
283  if (entries_.Lookup(buf.id, &mem)) {
284  LogCvmfs(kLogKvStore, kLogDebug, "commit overwrites existing entry");
285  size_t old_size = mem.size;
286  DoFree(&mem);
287  used_bytes_ -= old_size;
289  --entry_count_;
290  } else {
291  // since this is a new entry, the caller can choose the starting
292  // refcount (starting at 1 for pinning, for example)
293  mem.refcount = buf.refcount;
294  }
295  mem.object_flags = buf.object_flags;
296  mem.id = buf.id;
297  mem.size = buf.size;
298  if (entry_count_ == max_entries_) {
299  LogCvmfs(kLogKvStore, kLogDebug, "too many entries in kvstore");
300  return -ENFILE;
301  }
302  if (DoMalloc(&mem) < 0) {
303  LogCvmfs(kLogKvStore, kLogDebug, "failed to allocate %s",
304  buf.id.ToString().c_str());
305  return -EIO;
306  }
307  assert(SSIZE_MAX - mem.size > used_bytes_);
308  memcpy(mem.address, buf.address, mem.size);
309  entries_.Insert(buf.id, mem);
310  ++entry_count_;
311  used_bytes_ += mem.size;
314  return 0;
315 }
316 
317 
320  WriteLockGuard guard(rwlock_);
321  return DoDelete(id);
322 }
323 
324 
326  MemoryBuffer buf;
327  if (!entries_.Lookup(id, &buf)) {
328  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Delete",
329  id.ToString().c_str());
330  return false;
331  }
332  if (buf.refcount > 0) {
333  LogCvmfs(kLogKvStore, kLogDebug, "can't delete %s, nonzero refcount",
334  id.ToString().c_str());
335  return false;
336  }
337  assert(entry_count_ > 0);
338  --entry_count_;
339  used_bytes_ -= buf.size;
342  DoFree(&buf);
343  entries_.Forget(id);
344  LogCvmfs(kLogKvStore, kLogDebug, "deleted %s", id.ToString().c_str());
345  return true;
346 }
347 
348 
351  WriteLockGuard guard(rwlock_);
352  shash::Any key;
353  MemoryBuffer buf;
354 
355  if (used_bytes_ <= size) {
356  LogCvmfs(kLogKvStore, kLogDebug, "no need to shrink");
357  return true;
358  }
359 
360  LogCvmfs(kLogKvStore, kLogDebug, "shrinking to %zu B", size);
362  while (entries_.FilterNext()) {
363  if (used_bytes_ <= size)
364  break;
365  entries_.FilterGet(&key, &buf);
366  if (buf.refcount > 0) {
367  LogCvmfs(kLogKvStore, kLogDebug, "skip %s, nonzero refcount",
368  key.ToString().c_str());
369  continue;
370  }
371  assert(entry_count_ > 0);
372  --entry_count_;
374  used_bytes_ -= buf.size;
377  DoFree(&buf);
378  LogCvmfs(kLogKvStore, kLogDebug, "delete %s", key.ToString().c_str());
379  }
381  LogCvmfs(kLogKvStore, kLogDebug, "shrunk to %zu B", used_bytes_);
382  return used_bytes_ <= size;
383 }
virtual void FilterEnd()
Definition: lru.h:795
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
unsigned int max_entries_
Definition: kvstore.h:209
perf::Counter * n_delete
Definition: kvstore.h:77
bool CompactMemory()
Definition: kvstore.cc:145
static const double kCompactThreshold
Definition: kvstore.h:197
MemoryKvStore(unsigned int cache_entries, MemoryAllocator alloc, unsigned alloc_size, perf::StatisticsTemplate statistics)
Definition: kvstore.cc:34
void * Allocate(uint64_t size, void *header, unsigned header_size)
Definition: malloc_heap.cc:16
perf::Counter * n_read
Definition: kvstore.h:75
perf::Counter * n_getrefcount
Definition: kvstore.h:72
bool Contains(const shash::Any &id)
Definition: kvstore.cc:85
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
int64_t Read(const shash::Any &id, void *buf, size_t size, size_t offset)
Definition: kvstore.cc:235
virtual bool Insert(const Key &key, const Value &value)
Definition: lru.h:562
uint8_t version
Definition: kvstore.h:31
perf::Counter * n_unref
Definition: kvstore.h:74
unsigned int entry_count_
Definition: kvstore.h:208
assert((mem||(size==0))&&"Out Of Memory")
static uint32_t hasher_any(const ComparableHash &key)
size_t used_bytes_
Definition: kvstore.h:207
unsigned int refcount
Definition: kvstore.h:45
virtual void FilterGet(Key *key, Value *value)
Definition: lru.h:756
static CallbackTN * MakeCallback(typename BoundCallback< MallocHeap::BlockPtr, DelegateT >::CallbackMethod method, DelegateT *delegate)
Definition: async.h:213
bool DoDelete(const shash::Any &id)
Definition: kvstore.cc:325
virtual bool FilterNext()
Definition: lru.h:770
unsigned char digest[digest_size_]
Definition: hash.h:121
perf::Counter * n_getsize
Definition: kvstore.h:71
perf::Counter * sz_deleted
Definition: kvstore.h:81
MemoryAllocator allocator_
Definition: kvstore.h:206
void OnBlockMove(const MallocHeap::BlockPtr &ptr)
Definition: kvstore.cc:65
int64_t GetRefcount(const shash::Any &id)
Definition: kvstore.cc:182
int DoCommit(const MemoryBuffer &buf)
Definition: kvstore.cc:266
void Set(const int64_t val)
Definition: statistics.h:33
int Commit(const MemoryBuffer &buf)
Definition: kvstore.cc:260
perf::Counter * sz_size
Definition: kvstore.h:70
virtual void FilterDelete()
Definition: lru.h:779
bool IncRef(const shash::Any &id)
Definition: kvstore.cc:198
virtual bool UpdateValue(const Key &key, const Value &value)
Definition: lru.h:618
~MemoryKvStore()
Definition: kvstore.cc:59
perf::Counter * n_commit
Definition: kvstore.h:76
perf::Counter * sz_shrunk
Definition: kvstore.h:82
void MarkFree(void *block)
Definition: malloc_heap.cc:90
bool Unref(const shash::Any &id)
Definition: kvstore.cc:217
virtual void FilterBegin()
Definition: lru.h:745
int DoMalloc(MemoryBuffer *buf)
Definition: kvstore.cc:93
shash::Any id
Definition: kvstore.h:47
pthread_rwlock_t rwlock_
Definition: kvstore.h:212
MallocHeap * heap_
Definition: kvstore.h:211
bool ShrinkTo(size_t size)
Definition: kvstore.cc:349
void Inc(class Counter *counter)
Definition: statistics.h:50
bool Delete(const shash::Any &id)
Definition: kvstore.cc:318
int object_flags
Definition: kvstore.h:46
int64_t GetSize(const shash::Any &id)
Definition: kvstore.cc:166
lru::LruCache< shash::Any, MemoryBuffer > entries_
Definition: kvstore.h:210
double utilization()
Definition: malloc_heap.h:64
void * address
Definition: kvstore.h:43
size_t size
Definition: kvstore.h:44
shash::Any id
Definition: kvstore.h:32
virtual bool Lookup(const Key &key, Value *value, bool update_lru=true)
Definition: lru.h:646
Definition: mutex.h:42
perf::Counter * n_incref
Definition: kvstore.h:73
void DoFree(MemoryBuffer *buf)
Definition: kvstore.cc:126
perf::Counter * sz_read
Definition: kvstore.h:79
perf::Counter * n_shrinkto
Definition: kvstore.h:78
Counters counters_
Definition: kvstore.h:213
virtual bool Forget(const Key &key)
Definition: lru.h:675
mem
Definition: smalloc.h:60
static void size_t size
Definition: smalloc.h:54
void Compact()
Definition: malloc_heap.cc:35
perf::Counter * sz_committed
Definition: kvstore.h:80
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545