GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/kvstore.cc Lines: 163 205 79.5 %
Date: 2019-02-03 02:48:13 Branches: 43 82 52.4 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "kvstore.h"
6
7
#include <unistd.h>
8
9
#include <assert.h>
10
#include <errno.h>
11
#include <limits.h>
12
#include <string.h>
13
14
#include <algorithm>
15
16
#include "logging.h"
17
#include "util/async.h"
18
#include "util_concurrency.h"
19
20
using namespace std;  // NOLINT
21
22
namespace {
23
24
71840
static inline uint32_t hasher_any(const shash::Any &key) {
25
  // We'll just do the same thing as hasher_md5, since every hash is at
26
  // least as large.
27
71840
  return (uint32_t) *(reinterpret_cast<const uint32_t *>(key.digest) + 1);
28
}
29
30
}  // anonymous namespace
31
32
const double MemoryKvStore::kCompactThreshold = 0.8;
33
34
35
101
MemoryKvStore::MemoryKvStore(
36
  unsigned int cache_entries,
37
  MemoryAllocator alloc,
38
  unsigned alloc_size,
39
  perf::StatisticsTemplate statistics)
40
  : allocator_(alloc)
41
  , used_bytes_(0)
42
  , entry_count_(0)
43
  , max_entries_(cache_entries)
44
  , entries_(cache_entries, shash::Any(), hasher_any,
45
             perf::StatisticsTemplate("lru", statistics))
46
  , heap_(NULL)
47
101
  , counters_(statistics)
48
{
49
101
  int retval = pthread_rwlock_init(&rwlock_, NULL);
50
101
  assert(retval == 0);
51
101
  switch (alloc) {
52
    case kMallocHeap:
53
      heap_ = new MallocHeap(alloc_size,
54
50
          this->MakeCallback(&MemoryKvStore::OnBlockMove, this));
55
      break;
56
    default:
57
      break;
58
  }
59
}
60
61
62
101
MemoryKvStore::~MemoryKvStore() {
63
101
  delete heap_;
64
101
  pthread_rwlock_destroy(&rwlock_);
65
101
}
66
67
68
void MemoryKvStore::OnBlockMove(const MallocHeap::BlockPtr &ptr) {
69
  bool ok;
70
  struct AllocHeader a;
71
  MemoryBuffer buf;
72
73
  // must be locked by caller
74
  assert(ptr.pointer);
75
  memcpy(&a, ptr.pointer, sizeof(a));
76
  LogCvmfs(kLogKvStore, kLogDebug, "compaction moved %s to %p",
77
           a.id.ToString().c_str(), ptr.pointer);
78
  assert(a.version == 0);
79
  const bool update_lru = false;
80
  ok = entries_.Lookup(a.id, &buf, update_lru);
81
  assert(ok);
82
  buf.address = static_cast<char *>(ptr.pointer) + sizeof(a);
83
  ok = entries_.UpdateValue(buf.id, buf);
84
  assert(ok);
85
}
86
87
88
48
bool MemoryKvStore::Contains(const shash::Any &id) {
89
48
  MemoryBuffer buf;
90
  // LogCvmfs(kLogKvStore, kLogDebug, "check buffer %s", id.ToString().c_str());
91
48
  const bool update_lru = false;
92
48
  return entries_.Lookup(id, &buf, update_lru);
93
}
94
95
96
143
int MemoryKvStore::DoMalloc(MemoryBuffer *buf) {
97
143
  MemoryBuffer tmp;
98
143
  AllocHeader a;
99
100
143
  assert(buf);
101
143
  memcpy(&tmp, buf, sizeof(tmp));
102
103
143
  tmp.address = NULL;
104
143
  if (tmp.size > 0) {
105
143
    switch (allocator_) {
106
      case kMallocLibc:
107
143
        tmp.address = malloc(tmp.size);
108
143
        if (!tmp.address) return -errno;
109
143
        break;
110
      case kMallocHeap:
111
        assert(heap_);
112
        a.id = tmp.id;
113
        tmp.address =
114
          heap_->Allocate(tmp.size + sizeof(a), &a, sizeof(a));
115
        if (!tmp.address) return -ENOMEM;
116
        tmp.address = static_cast<char *>(tmp.address) + sizeof(a);
117
        break;
118
      default:
119
        abort();
120
    }
121
  }
122
123
143
  memcpy(buf, &tmp, sizeof(*buf));
124
143
  return 0;
125
}
126
127
128
111
void MemoryKvStore::DoFree(MemoryBuffer *buf) {
129
111
  AllocHeader a;
130
131
111
  assert(buf);
132
111
  if (!buf->address) return;
133
111
  switch (allocator_) {
134
    case kMallocLibc:
135
111
      free(buf->address);
136
111
      return;
137
    case kMallocHeap:
138
      heap_->MarkFree(static_cast<char *>(buf->address) - sizeof(a));
139
      return;
140
    default:
141
      abort();
142
  }
143
}
144
145
146
143
bool MemoryKvStore::CompactMemory() {
147
  double utilization;
148
143
  switch (allocator_) {
149
    case kMallocHeap:
150
      utilization = heap_->utilization();
151
      LogCvmfs(kLogKvStore, kLogDebug, "compact requested (%f)", utilization);
152
      if (utilization < kCompactThreshold) {
153
        LogCvmfs(kLogKvStore, kLogDebug, "compacting heap");
154
        heap_->Compact();
155
        if (heap_->utilization() > utilization) return true;
156
      }
157
      return false;
158
    default:
159
      // the others can't do any compact, so just ignore
160
143
      LogCvmfs(kLogKvStore, kLogDebug, "compact requested");
161
143
      return false;
162
  }
163
}
164
165
166
8
int64_t MemoryKvStore::GetSize(const shash::Any &id) {
167
8
  MemoryBuffer mem;
168
8
  perf::Inc(counters_.n_getsize);
169
8
  const bool update_lru = false;
170
8
  if (entries_.Lookup(id, &mem, update_lru)) {
171
    // LogCvmfs(kLogKvStore, kLogDebug, "%s is %u B", id.ToString().c_str(),
172
    //          mem.size);
173
7
    return mem.size;
174
  } else {
175
    LogCvmfs(kLogKvStore, kLogDebug, "miss %s on GetSize",
176
1
             id.ToString().c_str());
177
1
    return -ENOENT;
178
  }
179
}
180
181
182
5
int64_t MemoryKvStore::GetRefcount(const shash::Any &id) {
183
5
  MemoryBuffer mem;
184
5
  perf::Inc(counters_.n_getrefcount);
185
5
  const bool update_lru = false;
186
5
  if (entries_.Lookup(id, &mem, update_lru)) {
187
    // LogCvmfs(kLogKvStore, kLogDebug, "%s has refcount %u",
188
    //          id.ToString().c_str(), mem.refcount);
189
5
    return mem.refcount;
190
  } else {
191
    LogCvmfs(kLogKvStore, kLogDebug, "miss %s on GetRefcount",
192
             id.ToString().c_str());
193
    return -ENOENT;
194
  }
195
}
196
197
198
11046
bool MemoryKvStore::IncRef(const shash::Any &id) {
199
11046
  perf::Inc(counters_.n_incref);
200
11046
  WriteLockGuard guard(rwlock_);
201
11046
  MemoryBuffer mem;
202
11046
  if (entries_.Lookup(id, &mem)) {
203
11045
    assert(mem.refcount < UINT_MAX);
204
11045
    ++mem.refcount;
205
11045
    entries_.Insert(id, mem);
206
    LogCvmfs(kLogKvStore, kLogDebug, "increased refcount of %s to %u",
207
11045
             id.ToString().c_str(), mem.refcount);
208
11045
    return true;
209
  } else {
210
    LogCvmfs(kLogKvStore, kLogDebug, "miss %s on IncRef",
211
1
             id.ToString().c_str());
212
1
    return false;
213
  }
214
}
215
216
217
11040
bool MemoryKvStore::Unref(const shash::Any &id) {
218
11040
  perf::Inc(counters_.n_unref);
219
11040
  WriteLockGuard guard(rwlock_);
220
11040
  MemoryBuffer mem;
221
11040
  if (entries_.Lookup(id, &mem)) {
222
11039
    assert(mem.refcount > 0);
223
11039
    --mem.refcount;
224
11039
    entries_.Insert(id, mem);
225
    LogCvmfs(kLogKvStore, kLogDebug, "decreased refcount of %s to %u",
226
11039
             id.ToString().c_str(), mem.refcount);
227
11039
    return true;
228
  } else {
229
    LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Unref",
230
1
             id.ToString().c_str());
231
1
    return false;
232
  }
233
}
234
235
236
7
int64_t MemoryKvStore::Read(
237
  const shash::Any &id,
238
  void *buf,
239
  size_t size,
240
  size_t offset
241
) {
242
7
  MemoryBuffer mem;
243
7
  perf::Inc(counters_.n_read);
244
7
  ReadLockGuard guard(rwlock_);
245
7
  if (!entries_.Lookup(id, &mem)) {
246
    LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Read", id.ToString().c_str());
247
    return -ENOENT;
248
  }
249
7
  if (offset > mem.size) {
250
    LogCvmfs(kLogKvStore, kLogDebug, "out of bounds read (%u>%u) on %s",
251
             offset, mem.size, id.ToString().c_str());
252
    return 0;
253
  }
254
7
  uint64_t copy_size = std::min(mem.size - offset, size);
255
  // LogCvmfs(kLogKvStore, kLogDebug, "copy %u B from offset %u of %s",
256
  //          copy_size, offset, id.ToString().c_str());
257
7
  memcpy(buf, static_cast<char *>(mem.address) + offset, copy_size);
258
7
  perf::Xadd(counters_.sz_read, copy_size);
259
7
  return copy_size;
260
}
261
262
263
143
int MemoryKvStore::Commit(const MemoryBuffer &buf) {
264
143
  WriteLockGuard guard(rwlock_);
265
143
  return DoCommit(buf);
266
}
267
268
269
143
int MemoryKvStore::DoCommit(const MemoryBuffer &buf) {
270
  // we need to be careful about refcounts. If another thread wants to read
271
  // a cache entry while it's being written (OpenFromTxn put partial data in
272
  // the kvstore, will be committed again later) the refcount in the kvstore
273
  // will differ from the refcount in the cache transaction. To avoid leaks,
274
  // either the caller needs to fetch the cache entry before every write to
275
  // find the current refcount, or the kvstore can ignore the passed-in
276
  // refcount if the entry already exists. This implementation does the latter,
277
  // and as a result it's not possible to directly modify the refcount
278
  // without a race condition. This is a hint that callers should use the
279
  // refcount like a lock and not directly modify the numeric value.
280
281
143
  CompactMemory();
282
283
143
  MemoryBuffer mem;
284
143
  perf::Inc(counters_.n_commit);
285
143
  LogCvmfs(kLogKvStore, kLogDebug, "commit %s", buf.id.ToString().c_str());
286
143
  if (entries_.Lookup(buf.id, &mem)) {
287
3
    LogCvmfs(kLogKvStore, kLogDebug, "commit overwrites existing entry");
288
3
    size_t old_size = mem.size;
289
3
    DoFree(&mem);
290
3
    used_bytes_ -= old_size;
291
3
    counters_.sz_size->Set(used_bytes_);
292
3
    --entry_count_;
293
  } else {
294
    // since this is a new entry, the caller can choose the starting
295
    // refcount (starting at 1 for pinning, for example)
296
140
    mem.refcount = buf.refcount;
297
  }
298
143
  mem.object_type = buf.object_type;
299
143
  mem.id = buf.id;
300
143
  mem.size = buf.size;
301
143
  if (entry_count_ == max_entries_) {
302
    LogCvmfs(kLogKvStore, kLogDebug, "too many entries in kvstore");
303
    return -ENFILE;
304
  }
305
143
  if (DoMalloc(&mem) < 0) {
306
    LogCvmfs(kLogKvStore, kLogDebug, "failed to allocate %s",
307
      buf.id.ToString().c_str());
308
    return -EIO;
309
  }
310
143
  assert(SSIZE_MAX - mem.size > used_bytes_);
311
143
  memcpy(mem.address, buf.address, mem.size);
312
143
  entries_.Insert(buf.id, mem);
313
143
  ++entry_count_;
314
143
  used_bytes_ += mem.size;
315
143
  counters_.sz_size->Set(used_bytes_);
316
143
  perf::Xadd(counters_.sz_committed, mem.size);
317
143
  return 0;
318
}
319
320
321
6
bool MemoryKvStore::Delete(const shash::Any &id) {
322
6
  perf::Inc(counters_.n_delete);
323
6
  WriteLockGuard guard(rwlock_);
324
6
  return DoDelete(id);
325
}
326
327
328
6
bool MemoryKvStore::DoDelete(const shash::Any &id) {
329
6
  MemoryBuffer buf;
330
6
  if (!entries_.Lookup(id, &buf)) {
331
    LogCvmfs(kLogKvStore, kLogDebug, "miss %s on Delete",
332
2
             id.ToString().c_str());
333
2
    return false;
334
  }
335
4
  if (buf.refcount > 0) {
336
    LogCvmfs(kLogKvStore, kLogDebug, "can't delete %s, nonzero refcount",
337
             id.ToString().c_str());
338
    return false;
339
  }
340
4
  assert(entry_count_ > 0);
341
4
  --entry_count_;
342
4
  used_bytes_ -= buf.size;
343
4
  counters_.sz_size->Set(used_bytes_);
344
4
  perf::Xadd(counters_.sz_deleted, buf.size);
345
4
  DoFree(&buf);
346
4
  entries_.Forget(id);
347
4
  LogCvmfs(kLogKvStore, kLogDebug, "deleted %s", id.ToString().c_str());
348
4
  return true;
349
}
350
351
352
17
bool MemoryKvStore::ShrinkTo(size_t size) {
353
17
  perf::Inc(counters_.n_shrinkto);
354
17
  WriteLockGuard guard(rwlock_);
355
17
  shash::Any key;
356
17
  MemoryBuffer buf;
357
358
17
  if (used_bytes_ <= size) {
359
7
    LogCvmfs(kLogKvStore, kLogDebug, "no need to shrink");
360
7
    return true;
361
  }
362
363
10
  LogCvmfs(kLogKvStore, kLogDebug, "shrinking to %u B", size);
364
10
  entries_.FilterBegin();
365
134
  while (entries_.FilterNext()) {
366
119
    if (used_bytes_ <= size) break;
367
114
    entries_.FilterGet(&key, &buf);
368
114
    if (buf.refcount > 0) {
369
      LogCvmfs(kLogKvStore, kLogDebug, "skip %s, nonzero refcount",
370
10
               key.ToString().c_str());
371
10
      continue;
372
    }
373
104
    assert(entry_count_ > 0);
374
104
    --entry_count_;
375
104
    entries_.FilterDelete();
376
104
    used_bytes_ -= buf.size;
377
104
    perf::Xadd(counters_.sz_shrunk, buf.size);
378
104
    counters_.sz_size->Set(used_bytes_);
379
104
    DoFree(&buf);
380
104
    LogCvmfs(kLogKvStore, kLogDebug, "delete %s", key.ToString().c_str());
381
  }
382
10
  entries_.FilterEnd();
383
10
  LogCvmfs(kLogKvStore, kLogDebug, "shrunk to %u B", used_bytes_);
384
10
  return used_bytes_ <= size;
385
}