CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
kvstore.cc
Go to the documentation of this file.
1 
5 #include "kvstore.h"
6 
7 #include <unistd.h>
8 
9 #include <assert.h>
10 #include <errno.h>
11 #include <limits.h>
12 #include <string.h>
13 
14 #include <algorithm>
15 
16 #include "util/async.h"
17 #include "util/concurrency.h"
18 #include "util/logging.h"
19 
20 using namespace std; // NOLINT
21 
22 namespace {
23 
24 static inline uint32_t hasher_any(const shash::Any &key) {
25  // We'll just do the same thing as hasher_md5, since every hash is at
26  // least as large.
27  return (uint32_t) *(reinterpret_cast<const uint32_t *>(key.digest) + 1);
28 }
29 
30 } // anonymous namespace
31 
32 const double MemoryKvStore::kCompactThreshold = 0.8;
33 
34 
36  unsigned int cache_entries,
37  MemoryAllocator alloc,
38  unsigned alloc_size,
39  perf::StatisticsTemplate statistics)
40  : allocator_(alloc)
41  , used_bytes_(0)
42  , entry_count_(0)
43  , max_entries_(cache_entries)
44  , entries_(cache_entries, shash::Any(), hasher_any,
45  perf::StatisticsTemplate("lru", statistics))
46  , heap_(NULL)
47  , counters_(statistics)
48 {
49  int retval = pthread_rwlock_init(&rwlock_, NULL);
50  assert(retval == 0);
51  switch (alloc) {
52  case kMallocHeap:
53  heap_ = new MallocHeap(alloc_size,
55  break;
56  default:
57  break;
58  }
59 }
60 
61 
63  delete heap_;
64  pthread_rwlock_destroy(&rwlock_);
65 }
66 
67 
69  bool ok;
70  struct AllocHeader a;
71  MemoryBuffer buf;
72 
73  // must be locked by caller
74  assert(ptr.pointer);
75  memcpy(&a, ptr.pointer, sizeof(a));
76  LogCvmfs(kLogKvStore, kLogDebug, "compaction moved %s to %p",
77  a.id.ToString().c_str(), ptr.pointer);
78  assert(a.version == 0);
79  const bool update_lru = false;
80  ok = entries_.Lookup(a.id, &buf, update_lru);
81  assert(ok);
82  buf.address = static_cast<char *>(ptr.pointer) + sizeof(a);
83  ok = entries_.UpdateValue(buf.id, buf);
84  assert(ok);
85 }
86 
87 
89  MemoryBuffer buf;
90  // LogCvmfs(kLogKvStore, kLogDebug, "check buffer %s", id.ToString().c_str());
91  const bool update_lru = false;
92  return entries_.Lookup(id, &buf, update_lru);
93 }
94 
95 
97  MemoryBuffer tmp;
98  AllocHeader a;
99 
100  assert(buf);
101  memcpy(&tmp, buf, sizeof(tmp));
102 
103  tmp.address = NULL;
104  if (tmp.size > 0) {
105  switch (allocator_) {
106  case kMallocLibc:
107  tmp.address = malloc(tmp.size);
108  if (!tmp.address) return -errno;
109  break;
110  case kMallocHeap:
111  assert(heap_);
112  a.id = tmp.id;
113  tmp.address =
114  heap_->Allocate(tmp.size + sizeof(a), &a, sizeof(a));
115  if (!tmp.address) return -ENOMEM;
116  tmp.address = static_cast<char *>(tmp.address) + sizeof(a);
117  break;
118  default:
119  abort();
120  }
121  }
122 
123  memcpy(buf, &tmp, sizeof(*buf));
124  return 0;
125 }
126 
127 
129  AllocHeader a;
130 
131  assert(buf);
132  if (!buf->address) return;
133  switch (allocator_) {
134  case kMallocLibc:
135  free(buf->address);
136  return;
137  case kMallocHeap:
138  heap_->MarkFree(static_cast<char *>(buf->address) - sizeof(a));
139  return;
140  default:
141  abort();
142  }
143 }
144 
145 
147  double utilization;
148  switch (allocator_) {
149  case kMallocHeap:
150  utilization = heap_->utilization();
151  LogCvmfs(kLogKvStore, kLogDebug, "compact requested (%f)", utilization);
152  if (utilization < kCompactThreshold) {
153  LogCvmfs(kLogKvStore, kLogDebug, "compacting heap");
154  heap_->Compact();
155  if (heap_->utilization() > utilization) return true;
156  }
157  return false;
158  default:
159  // the others can't do any compact, so just ignore
160  LogCvmfs(kLogKvStore, kLogDebug, "compact requested");
161  return false;
162  }
163 }
164 
165 
166 int64_t MemoryKvStore::GetSize(const shash::Any &id) {
169  const bool update_lru = false;
170  if (entries_.Lookup(id, &mem, update_lru)) {
171  // LogCvmfs(kLogKvStore, kLogDebug, "%s is %u B", id.ToString().c_str(),
172  // mem.size);
173  return mem.size;
174  } else {
175  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on GetSize",
176  id.ToString().c_str());
177  return -ENOENT;
178  }
179 }
180 
181 
185  const bool update_lru = false;
186  if (entries_.Lookup(id, &mem, update_lru)) {
187  // LogCvmfs(kLogKvStore, kLogDebug, "%s has refcount %u",
188  // id.ToString().c_str(), mem.refcount);
189  return mem.refcount;
190  } else {
191  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on GetRefcount",
192  id.ToString().c_str());
193  return -ENOENT;
194  }
195 }
196 
197 
200  WriteLockGuard guard(rwlock_);
202  if (entries_.Lookup(id, &mem)) {
203  assert(mem.refcount < UINT_MAX);
204  ++mem.refcount;
205  entries_.Insert(id, mem);
206  LogCvmfs(kLogKvStore, kLogDebug, "increased refcount of %s to %u",
207  id.ToString().c_str(), mem.refcount);
208  return true;
209  } else {
210  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on IncRef",
211  id.ToString().c_str());
212  return false;
213  }
214 }
215 
216 
219  WriteLockGuard guard(rwlock_);
221  if (entries_.Lookup(id, &mem)) {
222  assert(mem.refcount > 0);
223  --mem.refcount;
224  entries_.Insert(id, mem);
225  LogCvmfs(kLogKvStore, kLogDebug, "decreased refcount of %s to %u",
226  id.ToString().c_str(), mem.refcount);
227  return true;
228  } else {
229  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Unref",
230  id.ToString().c_str());
231  return false;
232  }
233 }
234 
235 
237  const shash::Any &id,
238  void *buf,
239  size_t size,
240  size_t offset
241 ) {
244  ReadLockGuard guard(rwlock_);
245  if (!entries_.Lookup(id, &mem)) {
246  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Read", id.ToString().c_str());
247  return -ENOENT;
248  }
249  if (offset > mem.size) {
250  LogCvmfs(kLogKvStore, kLogDebug, "out of bounds read (%zu>%zu) on %s",
251  offset, mem.size, id.ToString().c_str());
252  return 0;
253  }
254  uint64_t copy_size = std::min(mem.size - offset, size);
255  // LogCvmfs(kLogKvStore, kLogDebug, "copy %u B from offset %u of %s",
256  // copy_size, offset, id.ToString().c_str());
257  memcpy(buf, static_cast<char *>(mem.address) + offset, copy_size);
258  perf::Xadd(counters_.sz_read, copy_size);
259  return copy_size;
260 }
261 
262 
264  WriteLockGuard guard(rwlock_);
265  return DoCommit(buf);
266 }
267 
268 
270  // we need to be careful about refcounts. If another thread wants to read
271  // a cache entry while it's being written (OpenFromTxn put partial data in
272  // the kvstore, will be committed again later) the refcount in the kvstore
273  // will differ from the refcount in the cache transaction. To avoid leaks,
274  // either the caller needs to fetch the cache entry before every write to
275  // find the current refcount, or the kvstore can ignore the passed-in
276  // refcount if the entry already exists. This implementation does the latter,
277  // and as a result it's not possible to directly modify the refcount
278  // without a race condition. This is a hint that callers should use the
279  // refcount like a lock and not directly modify the numeric value.
280 
281  CompactMemory();
282 
285  LogCvmfs(kLogKvStore, kLogDebug, "commit %s", buf.id.ToString().c_str());
286  if (entries_.Lookup(buf.id, &mem)) {
287  LogCvmfs(kLogKvStore, kLogDebug, "commit overwrites existing entry");
288  size_t old_size = mem.size;
289  DoFree(&mem);
290  used_bytes_ -= old_size;
292  --entry_count_;
293  } else {
294  // since this is a new entry, the caller can choose the starting
295  // refcount (starting at 1 for pinning, for example)
296  mem.refcount = buf.refcount;
297  }
298  mem.object_flags = buf.object_flags;
299  mem.id = buf.id;
300  mem.size = buf.size;
301  if (entry_count_ == max_entries_) {
302  LogCvmfs(kLogKvStore, kLogDebug, "too many entries in kvstore");
303  return -ENFILE;
304  }
305  if (DoMalloc(&mem) < 0) {
306  LogCvmfs(kLogKvStore, kLogDebug, "failed to allocate %s",
307  buf.id.ToString().c_str());
308  return -EIO;
309  }
310  assert(SSIZE_MAX - mem.size > used_bytes_);
311  memcpy(mem.address, buf.address, mem.size);
312  entries_.Insert(buf.id, mem);
313  ++entry_count_;
314  used_bytes_ += mem.size;
317  return 0;
318 }
319 
320 
323  WriteLockGuard guard(rwlock_);
324  return DoDelete(id);
325 }
326 
327 
329  MemoryBuffer buf;
330  if (!entries_.Lookup(id, &buf)) {
331  LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Delete",
332  id.ToString().c_str());
333  return false;
334  }
335  if (buf.refcount > 0) {
336  LogCvmfs(kLogKvStore, kLogDebug, "can't delete %s, nonzero refcount",
337  id.ToString().c_str());
338  return false;
339  }
340  assert(entry_count_ > 0);
341  --entry_count_;
342  used_bytes_ -= buf.size;
345  DoFree(&buf);
346  entries_.Forget(id);
347  LogCvmfs(kLogKvStore, kLogDebug, "deleted %s", id.ToString().c_str());
348  return true;
349 }
350 
351 
354  WriteLockGuard guard(rwlock_);
355  shash::Any key;
356  MemoryBuffer buf;
357 
358  if (used_bytes_ <= size) {
359  LogCvmfs(kLogKvStore, kLogDebug, "no need to shrink");
360  return true;
361  }
362 
363  LogCvmfs(kLogKvStore, kLogDebug, "shrinking to %zu B", size);
365  while (entries_.FilterNext()) {
366  if (used_bytes_ <= size) break;
367  entries_.FilterGet(&key, &buf);
368  if (buf.refcount > 0) {
369  LogCvmfs(kLogKvStore, kLogDebug, "skip %s, nonzero refcount",
370  key.ToString().c_str());
371  continue;
372  }
373  assert(entry_count_ > 0);
374  --entry_count_;
376  used_bytes_ -= buf.size;
379  DoFree(&buf);
380  LogCvmfs(kLogKvStore, kLogDebug, "delete %s", key.ToString().c_str());
381  }
383  LogCvmfs(kLogKvStore, kLogDebug, "shrunk to %zu B", used_bytes_);
384  return used_bytes_ <= size;
385 }
virtual void FilterEnd()
Definition: lru.h:789
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
unsigned int max_entries_
Definition: kvstore.h:216
perf::Counter * n_delete
Definition: kvstore.h:81
bool CompactMemory()
Definition: kvstore.cc:146
static const double kCompactThreshold
Definition: kvstore.h:204
MemoryKvStore(unsigned int cache_entries, MemoryAllocator alloc, unsigned alloc_size, perf::StatisticsTemplate statistics)
Definition: kvstore.cc:35
void * Allocate(uint64_t size, void *header, unsigned header_size)
Definition: malloc_heap.cc:16
perf::Counter * n_read
Definition: kvstore.h:79
perf::Counter * n_getrefcount
Definition: kvstore.h:76
bool Contains(const shash::Any &id)
Definition: kvstore.cc:88
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
int64_t Read(const shash::Any &id, void *buf, size_t size, size_t offset)
Definition: kvstore.cc:236
virtual bool Insert(const Key &key, const Value &value)
Definition: lru.h:556
uint8_t version
Definition: kvstore.h:31
perf::Counter * n_unref
Definition: kvstore.h:78
unsigned int entry_count_
Definition: kvstore.h:215
assert((mem||(size==0))&&"Out Of Memory")
static uint32_t hasher_any(const ComparableHash &key)
size_t used_bytes_
Definition: kvstore.h:214
unsigned int refcount
Definition: kvstore.h:49
virtual void FilterGet(Key *key, Value *value)
Definition: lru.h:750
static CallbackTN * MakeCallback(typename BoundCallback< MallocHeap::BlockPtr, DelegateT >::CallbackMethod method, DelegateT *delegate)
Definition: async.h:222
bool DoDelete(const shash::Any &id)
Definition: kvstore.cc:328
virtual bool FilterNext()
Definition: lru.h:764
unsigned char digest[digest_size_]
Definition: hash.h:124
perf::Counter * n_getsize
Definition: kvstore.h:75
perf::Counter * sz_deleted
Definition: kvstore.h:85
MemoryAllocator allocator_
Definition: kvstore.h:213
void OnBlockMove(const MallocHeap::BlockPtr &ptr)
Definition: kvstore.cc:68
int64_t GetRefcount(const shash::Any &id)
Definition: kvstore.cc:182
int DoCommit(const MemoryBuffer &buf)
Definition: kvstore.cc:269
void Set(const int64_t val)
Definition: statistics.h:33
int Commit(const MemoryBuffer &buf)
Definition: kvstore.cc:263
perf::Counter * sz_size
Definition: kvstore.h:74
virtual void FilterDelete()
Definition: lru.h:773
bool IncRef(const shash::Any &id)
Definition: kvstore.cc:198
virtual bool UpdateValue(const Key &key, const Value &value)
Definition: lru.h:612
~MemoryKvStore()
Definition: kvstore.cc:62
perf::Counter * n_commit
Definition: kvstore.h:80
perf::Counter * sz_shrunk
Definition: kvstore.h:86
void MarkFree(void *block)
Definition: malloc_heap.cc:94
bool Unref(const shash::Any &id)
Definition: kvstore.cc:217
virtual void FilterBegin()
Definition: lru.h:739
int DoMalloc(MemoryBuffer *buf)
Definition: kvstore.cc:96
shash::Any id
Definition: kvstore.h:51
pthread_rwlock_t rwlock_
Definition: kvstore.h:219
MallocHeap * heap_
Definition: kvstore.h:218
bool ShrinkTo(size_t size)
Definition: kvstore.cc:352
void Inc(class Counter *counter)
Definition: statistics.h:50
bool Delete(const shash::Any &id)
Definition: kvstore.cc:321
int object_flags
Definition: kvstore.h:50
int64_t GetSize(const shash::Any &id)
Definition: kvstore.cc:166
lru::LruCache< shash::Any, MemoryBuffer > entries_
Definition: kvstore.h:217
double utilization()
Definition: malloc_heap.h:64
void * address
Definition: kvstore.h:47
size_t size
Definition: kvstore.h:48
shash::Any id
Definition: kvstore.h:32
virtual bool Lookup(const Key &key, Value *value, bool update_lru=true)
Definition: lru.h:640
Definition: mutex.h:42
perf::Counter * n_incref
Definition: kvstore.h:77
void DoFree(MemoryBuffer *buf)
Definition: kvstore.cc:128
perf::Counter * sz_read
Definition: kvstore.h:83
perf::Counter * n_shrinkto
Definition: kvstore.h:82
Counters counters_
Definition: kvstore.h:220
virtual bool Forget(const Key &key)
Definition: lru.h:669
mem
Definition: smalloc.h:60
static void size_t size
Definition: smalloc.h:54
void Compact()
Definition: malloc_heap.cc:39
perf::Counter * sz_committed
Definition: kvstore.h:84
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528