GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/cache_plugin/cvmfs_cache_ram.cc Lines: 0 358 0.0 %
Date: 2019-02-03 02:48:13 Branches: 0 172 0.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 *
4
 * A cache plugin that stores all data in a fixed-size memory chunk.
5
 */
6
7
#define __STDC_FORMAT_MACROS
8
9
#include <alloca.h>
10
#include <fcntl.h>
11
#include <inttypes.h>
12
#include <stdint.h>
13
#include <sys/types.h>
14
#include <sys/wait.h>
15
#include <unistd.h>
16
17
#include <algorithm>
18
#include <cassert>
19
#include <cstdio>
20
#include <cstdlib>
21
#include <cstring>
22
#include <string>
23
#include <vector>
24
25
#include "cache_plugin/libcvmfs_cache.h"
26
#include "logging.h"
27
#include "lru.h"
28
#include "malloc_heap.h"
29
#include "murmur.h"
30
#include "platform.h"
31
#include "smallhash.h"
32
#include "smalloc.h"
33
#include "util/string.h"
34
#include "util_concurrency.h"
35
36
using namespace std;  // NOLINT
37
38
/**
39
 * Header of the data pieces in the cache.  After the object header, the
40
 * zero-terminated description and the object data follows.
41
 */
42
struct ObjectHeader {
43
  ObjectHeader() {
44
    txn_id = uint64_t(-1);
45
    size_data = 0;
46
    size_desc = 0;
47
    refcnt = 0;
48
    type = CVMCACHE_OBJECT_REGULAR;
49
    memset(&id, 0, sizeof(id));
50
  }
51
52
  char *GetDescription() {
53
    if (size_desc == 0)
54
      return NULL;
55
    return reinterpret_cast<char *>(this) + sizeof(ObjectHeader);
56
  }
57
58
  void SetDescription(char *description) {
59
    if (description == NULL)
60
      return;
61
    memcpy(reinterpret_cast<char *>(this) + sizeof(ObjectHeader),
62
           description, strlen(description) + 1);
63
  }
64
65
  unsigned char *GetData() {
66
    return reinterpret_cast<unsigned char *>(this) +
67
           sizeof(ObjectHeader) + size_desc;
68
  }
69
70
  /**
71
   * Set during a running transaction so that we know where to look for pointers
72
   * when the memory block gets compacted.  Once committed, this is
73
   * uint64_t(-1).
74
   */
75
  uint64_t txn_id;
76
  /**
77
   * Can be zero.
78
   */
79
  uint32_t size_data;
80
  /**
81
   * String length + 1 (null terminated) or null if the description is NULL.
82
   */
83
  uint32_t size_desc;
84
  /**
85
   * During a transaction, neg_nbytes_written is used to track the number of
86
   * already written bytes.  On commit, refcnt is set to 1.
87
   */
88
  union {
89
    int32_t refcnt;
90
    int32_t neg_nbytes_written;
91
  };
92
  cvmcache_object_type type;
93
  struct cvmcache_hash id;
94
};
95
96
97
/**
98
 * Listings are generated and cached during the entire life time of a listing
99
 * id.  Not very memory efficient but we don't optimize for listings.
100
 */
101
struct Listing {
102
  Listing() : pos(0) { }
103
  uint64_t pos;
104
  vector<struct cvmcache_object_info> elems;
105
};
106
107
108
/**
109
 * Allows us to use a cvmcache_hash in (hash) maps.
110
 */
111
struct ComparableHash {
112
  ComparableHash() { memset(&hash, 0, sizeof(hash)); }
113
  explicit ComparableHash(const struct cvmcache_hash &h) : hash(h) { }
114
  bool operator ==(const ComparableHash &other) const {
115
    return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
116
                             const_cast<cvmcache_hash *>(&(other.hash))) == 0;
117
  }
118
  bool operator !=(const ComparableHash &other) const {
119
    return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
120
                             const_cast<cvmcache_hash *>(&(other.hash))) != 0;
121
  }
122
  bool operator <(const ComparableHash &other) const {
123
    return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
124
                             const_cast<cvmcache_hash *>(&(other.hash))) < 0;
125
  }
126
  bool operator >(const ComparableHash &other) const {
127
    return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
128
                             const_cast<cvmcache_hash *>(&(other.hash))) > 0;
129
  }
130
131
  struct cvmcache_hash hash;
132
};
133
134
135
namespace {
136
137
static inline uint32_t hasher_uint64(const uint64_t &key) {
138
  return MurmurHash2(&key, sizeof(key), 0x07387a4f);
139
}
140
141
static inline uint32_t hasher_any(const ComparableHash &key) {
142
  return (uint32_t) *(reinterpret_cast<const uint32_t *>(&key.hash));
143
}
144
145
}  // anonymous namespace
146
147
148
/**
149
 * Used in the PluginRamCache when detaching nested catalogs.
150
 */
151
struct cvmcache_context *ctx;
152
153
154
/**
155
 * Implements all the cache plugin callbacks.  Singelton.
156
 */
157
class PluginRamCache : public Callbackable<MallocHeap::BlockPtr> {
158
 public:
159
  static PluginRamCache *Create(const string &mem_size_str) {
160
    assert(instance_ == NULL);
161
162
    uint64_t mem_size_bytes;
163
    if (HasSuffix(mem_size_str, "%", false)) {
164
      mem_size_bytes = platform_memsize() * String2Uint64(mem_size_str) / 100;
165
    } else {
166
      mem_size_bytes = String2Uint64(mem_size_str) * 1024 * 1024;
167
    }
168
    instance_ = new PluginRamCache(mem_size_bytes);
169
    return instance_;
170
  }
171
172
  ~PluginRamCache() {
173
    delete storage_;
174
    delete objects_all_;
175
    delete objects_volatile_;
176
    instance_ = NULL;
177
  }
178
179
  static int ram_chrefcnt(struct cvmcache_hash *id, int32_t change_by) {
180
    ComparableHash h(*id);
181
    ObjectHeader *object;
182
    if (!Me()->objects_all_->Lookup(h, &object))
183
      return CVMCACHE_STATUS_NOENTRY;
184
185
    if (object->type == CVMCACHE_OBJECT_VOLATILE)
186
      Me()->objects_volatile_->Update(h);
187
188
    if (change_by == 0)
189
      return CVMCACHE_STATUS_OK;
190
    if ((object->refcnt + change_by) < 0)
191
      return CVMCACHE_STATUS_BADCOUNT;
192
193
    if (object->refcnt == 0) {
194
      Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(object);
195
      Me()->CheckHighPinWatermark();
196
    }
197
    object->refcnt += change_by;
198
    if (object->refcnt == 0) {
199
      Me()->cache_info_.pinned_bytes -= Me()->storage_->GetSize(object);
200
      Me()->in_danger_zone_ = Me()->IsInDangerZone();
201
    }
202
    return CVMCACHE_STATUS_OK;
203
  }
204
205
206
  static int ram_obj_info(
207
    struct cvmcache_hash *id,
208
    struct cvmcache_object_info *info)
209
  {
210
    ComparableHash h(*id);
211
    ObjectHeader *object;
212
    if (!Me()->objects_all_->Lookup(h, &object, false))
213
      return CVMCACHE_STATUS_NOENTRY;
214
215
    info->size = object->size_data;
216
    info->type = object->type;
217
    info->pinned = object->refcnt > 0;
218
    info->description = (object->GetDescription() == NULL)
219
                        ? NULL
220
                        : strdup(object->GetDescription());
221
    return CVMCACHE_STATUS_OK;
222
  }
223
224
225
  static int ram_pread(struct cvmcache_hash *id,
226
                      uint64_t offset,
227
                      uint32_t *size,
228
                      unsigned char *buffer)
229
  {
230
    ComparableHash h(*id);
231
    ObjectHeader *object;
232
    bool retval = Me()->objects_all_->Lookup(h, &object, false);
233
    assert(retval);
234
    if (offset > object->size_data)
235
      return CVMCACHE_STATUS_OUTOFBOUNDS;
236
    unsigned nbytes =
237
      std::min(*size, static_cast<uint32_t>(object->size_data - offset));
238
    memcpy(buffer, object->GetData() + offset, nbytes);
239
    *size = nbytes;
240
    return CVMCACHE_STATUS_OK;
241
  }
242
243
244
  static int ram_start_txn(
245
    struct cvmcache_hash *id,
246
    uint64_t txn_id,
247
    struct cvmcache_object_info *info)
248
  {
249
    ObjectHeader object_header;
250
    object_header.txn_id = txn_id;
251
    if (info->size != CVMCACHE_SIZE_UNKNOWN)
252
      object_header.size_data = info->size;
253
    else
254
      object_header.size_data = 4096;
255
    if (info->description != NULL)
256
      object_header.size_desc = strlen(info->description) + 1;
257
    object_header.refcnt = 1;
258
    object_header.type = info->type;
259
    object_header.id = *id;
260
261
    uint32_t total_size = sizeof(object_header) +
262
                          object_header.size_desc + object_header.size_data;
263
    Me()->TryFreeSpace(total_size);
264
    ObjectHeader *allocd_object = reinterpret_cast<ObjectHeader *>(
265
      Me()->storage_->Allocate(total_size,
266
                               &object_header, sizeof(object_header)));
267
    if (allocd_object == NULL)
268
      return CVMCACHE_STATUS_NOSPACE;
269
270
    allocd_object->SetDescription(info->description);
271
    Me()->transactions_.Insert(txn_id, allocd_object);
272
    return CVMCACHE_STATUS_OK;
273
  }
274
275
276
  static int ram_write_txn(
277
    uint64_t txn_id,
278
    unsigned char *buffer,
279
    uint32_t size)
280
  {
281
    ObjectHeader *txn_object;
282
    int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
283
    assert(retval);
284
    assert(size > 0);
285
286
    if (txn_object->neg_nbytes_written > 0)
287
      txn_object->neg_nbytes_written = 0;
288
    if ((size - txn_object->neg_nbytes_written) > txn_object->size_data) {
289
      uint32_t current_size = Me()->storage_->GetSize(txn_object);
290
      uint32_t header_size = current_size - txn_object->size_data;
291
      uint32_t new_size = std::max(
292
        header_size + size - txn_object->neg_nbytes_written,
293
        uint32_t(current_size * kObjectExpandFactor));
294
      bool did_compact = Me()->TryFreeSpace(new_size);
295
      if (did_compact) {
296
        retval = Me()->transactions_.Lookup(txn_id, &txn_object);
297
        assert(retval);
298
      }
299
      txn_object = reinterpret_cast<ObjectHeader *>(
300
        Me()->storage_->Expand(txn_object, new_size));
301
      if (txn_object == NULL)
302
        return CVMCACHE_STATUS_NOSPACE;
303
      txn_object->size_data = new_size - header_size;
304
      Me()->transactions_.Insert(txn_id, txn_object);
305
    }
306
307
    memcpy(txn_object->GetData() - txn_object->neg_nbytes_written,
308
           buffer, size);
309
    txn_object->neg_nbytes_written -= size;
310
    return CVMCACHE_STATUS_OK;
311
  }
312
313
314
  static int ram_commit_txn(uint64_t txn_id) {
315
    Me()->TryFreeSpace(0);
316
    if (Me()->objects_all_->IsFull())
317
      return CVMCACHE_STATUS_NOSPACE;
318
319
    ObjectHeader *txn_object;
320
    int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
321
    assert(retval);
322
323
    Me()->transactions_.Erase(txn_id);
324
    ComparableHash h(txn_object->id);
325
    ObjectHeader *existing_object;
326
    if (Me()->objects_all_->Lookup(h, &existing_object)) {
327
      // Concurrent addition of same objects, drop the one at hand and
328
      // increase ref count of existing copy
329
      Me()->storage_->MarkFree(txn_object);
330
      if (existing_object->refcnt == 0)
331
        Me()->cache_info_.pinned_bytes +=
332
          Me()->storage_->GetSize(existing_object);
333
      existing_object->refcnt++;
334
    } else {
335
      txn_object->txn_id = uint64_t(-1);
336
      if (txn_object->neg_nbytes_written > 0)
337
        txn_object->neg_nbytes_written = 0;
338
      txn_object->size_data = -(txn_object->neg_nbytes_written);
339
      txn_object->refcnt = 1;
340
      Me()->cache_info_.used_bytes += Me()->storage_->GetSize(txn_object);
341
      Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(txn_object);
342
      Me()->objects_all_->Insert(h, txn_object);
343
      if (txn_object->type == CVMCACHE_OBJECT_VOLATILE) {
344
        assert(!Me()->objects_volatile_->IsFull());
345
        Me()->objects_volatile_->Insert(h, txn_object);
346
      }
347
    }
348
    Me()->CheckHighPinWatermark();
349
    return CVMCACHE_STATUS_OK;
350
  }
351
352
353
  static int ram_abort_txn(uint64_t txn_id) {
354
    ObjectHeader *txn_object = NULL;
355
    int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
356
    assert(retval);
357
    Me()->transactions_.Erase(txn_id);
358
    Me()->storage_->MarkFree(txn_object);
359
    return CVMCACHE_STATUS_OK;
360
  }
361
362
363
  static int ram_info(struct cvmcache_info *info) {
364
    *info = Me()->cache_info_;
365
    return CVMCACHE_STATUS_OK;
366
  }
367
368
369
  static int ram_shrink(uint64_t shrink_to, uint64_t *used) {
370
    *used = Me()->cache_info_.used_bytes;
371
    if (*used <= shrink_to)
372
      return CVMCACHE_STATUS_OK;
373
374
    Me()->DoShrink(shrink_to);
375
    *used = Me()->cache_info_.used_bytes;
376
    return (*used <= shrink_to) ? CVMCACHE_STATUS_OK : CVMCACHE_STATUS_PARTIAL;
377
  }
378
379
380
  static int ram_listing_begin(
381
    uint64_t lst_id,
382
    enum cvmcache_object_type type)
383
  {
384
    Listing *lst = new Listing();
385
    Me()->objects_all_->FilterBegin();
386
    while (Me()->objects_all_->FilterNext()) {
387
      ComparableHash h;
388
      ObjectHeader *object;
389
      Me()->objects_all_->FilterGet(&h, &object);
390
      if (object->type != type)
391
        continue;
392
393
      struct cvmcache_object_info item;
394
      item.id = object->id;
395
      item.size = object->size_data;
396
      item.type = type;
397
      item.pinned = object->refcnt != 0;
398
      item.description = (object->size_desc > 0)
399
                         ? strdup(object->GetDescription())
400
                         : NULL;
401
      lst->elems.push_back(item);
402
    }
403
    Me()->objects_all_->FilterEnd();
404
405
    Me()->listings_.Insert(lst_id, lst);
406
    return CVMCACHE_STATUS_OK;
407
  }
408
409
410
  static int ram_listing_next(
411
    int64_t listing_id,
412
    struct cvmcache_object_info *item)
413
  {
414
    Listing *lst;
415
    bool retval = Me()->listings_.Lookup(listing_id, &lst);
416
    assert(retval);
417
    if (lst->pos >= lst->elems.size())
418
      return CVMCACHE_STATUS_OUTOFBOUNDS;
419
    *item = lst->elems[lst->pos];
420
    lst->pos++;
421
    return CVMCACHE_STATUS_OK;
422
  }
423
424
425
  static int ram_listing_end(int64_t listing_id) {
426
    Listing *lst;
427
    bool retval = Me()->listings_.Lookup(listing_id, &lst);
428
    assert(retval);
429
430
    // Don't free description strings, done by the library
431
    delete lst;
432
    Me()->listings_.Erase(listing_id);
433
    return CVMCACHE_STATUS_OK;
434
  }
435
436
 private:
437
  static const uint64_t kMinSize;  // 100 * 1024 * 1024;
438
  static const double kShrinkFactor;  //  = 0.75;
439
  static const double kObjectExpandFactor;  // = 1.5;
440
  static const double kSlotFraction;  // = 0.04;
441
  static const double kDangerZoneThreshold;  // = 0.7
442
443
  static PluginRamCache *instance_;
444
  static PluginRamCache *Me() {
445
    return instance_;
446
  }
447
  explicit PluginRamCache(uint64_t mem_size) {
448
    in_danger_zone_ = false;
449
450
    uint64_t heap_size = RoundUp8(
451
      std::max(kMinSize, uint64_t(mem_size * (1.0 - kSlotFraction))));
452
    memset(&cache_info_, 0, sizeof(cache_info_));
453
    cache_info_.size_bytes = heap_size;
454
    storage_ = new MallocHeap(
455
      heap_size, this->MakeCallback(&PluginRamCache::OnBlockMove, this));
456
457
    struct cvmcache_hash hash_empty;
458
    memset(&hash_empty, 0, sizeof(hash_empty));
459
460
    transactions_.Init(64, uint64_t(-1), hasher_uint64);
461
    listings_.Init(8, uint64_t(-1), hasher_uint64);
462
463
    double slot_size =
464
      lru::LruCache<ComparableHash, ObjectHeader *>::GetEntrySize();
465
    uint64_t num_slots = uint64_t((heap_size * kSlotFraction) /
466
                         (2.0 * slot_size));
467
    const unsigned mask_64 = ~((1 << 6) - 1);
468
469
    LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "Allocating %" PRIu64
470
             "MB of memory for up to %" PRIu64 " objects",
471
             heap_size / (1024 * 1024), num_slots & mask_64);
472
473
    // Number of cache entries must be a multiple of 64
474
    objects_all_ = new lru::LruCache<ComparableHash, ObjectHeader *>(
475
      num_slots & mask_64,
476
      ComparableHash(hash_empty),
477
      hasher_any,
478
      perf::StatisticsTemplate("objects_all", &statistics_));
479
    objects_volatile_ = new lru::LruCache<ComparableHash, ObjectHeader *>(
480
      num_slots & mask_64,
481
      ComparableHash(hash_empty),
482
      hasher_any,
483
      perf::StatisticsTemplate("objects_volatile", &statistics_));
484
  }
485
486
  /**
487
   * Returns true if memory compaction took place and pointers might have been
488
   * invalidated.
489
   */
490
  bool TryFreeSpace(uint64_t bytes_required) {
491
    if (!objects_all_->IsFull() && storage_->HasSpaceFor(bytes_required))
492
      return false;
493
494
    // Free space occupied due to piecewise catalog storage
495
    if (!objects_all_->IsFull()) {
496
      LogCvmfs(kLogCache, kLogDebug, "compacting ram cache");
497
      storage_->Compact();
498
      if (storage_->HasSpaceFor(bytes_required))
499
        return true;
500
    }
501
502
    uint64_t shrink_to = std::min(
503
      storage_->capacity() - (bytes_required + 8),
504
      uint64_t(storage_->capacity() * kShrinkFactor));
505
    DoShrink(shrink_to);
506
    return true;
507
  }
508
509
  void OnBlockMove(const MallocHeap::BlockPtr &ptr) {
510
    assert(ptr.pointer);
511
    ObjectHeader *object = reinterpret_cast<ObjectHeader *>(ptr.pointer);
512
    ComparableHash h(object->id);
513
    if (object->txn_id == uint64_t(-1)) {
514
      bool retval = objects_all_->UpdateValue(h, object);
515
      assert(retval);
516
      if (object->type == CVMCACHE_OBJECT_VOLATILE) {
517
        retval = objects_volatile_->UpdateValue(h, object);
518
        assert(retval);
519
      }
520
    } else {
521
      uint64_t old_size = transactions_.size();
522
      transactions_.Insert(object->txn_id, object);
523
      assert(old_size == transactions_.size());
524
    }
525
  }
526
527
528
  void DoShrink(uint64_t shrink_to) {
529
    ComparableHash h;
530
    ObjectHeader *object;
531
532
    LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
533
             "clean up cache until at most %lu KB is used", shrink_to / 1024);
534
535
    objects_volatile_->FilterBegin();
536
    while (objects_volatile_->FilterNext()) {
537
      objects_volatile_->FilterGet(&h, &object);
538
      if (object->refcnt != 0)
539
        continue;
540
      cache_info_.used_bytes -= storage_->GetSize(object);
541
      storage_->MarkFree(object);
542
      objects_volatile_->FilterDelete();
543
      objects_all_->Forget(h);
544
      if (storage_->compacted_bytes() <= shrink_to)
545
        break;
546
    }
547
    objects_volatile_->FilterEnd();
548
549
    objects_all_->FilterBegin();
550
    while ((storage_->compacted_bytes() > shrink_to) &&
551
           objects_all_->FilterNext())
552
    {
553
      objects_all_->FilterGet(&h, &object);
554
      if (object->refcnt != 0)
555
        continue;
556
      assert(object->type != CVMCACHE_OBJECT_VOLATILE);
557
      cache_info_.used_bytes -= storage_->GetSize(object);
558
      storage_->MarkFree(object);
559
      objects_all_->FilterDelete();
560
    }
561
    objects_all_->FilterEnd();
562
563
    storage_->Compact();
564
    cache_info_.no_shrink++;
565
  }
566
567
  void CheckHighPinWatermark() {
568
    if (!Me()->in_danger_zone_ && Me()->IsInDangerZone()) {
569
      LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslog,
570
               "high watermark of pinned files");
571
      Me()->in_danger_zone_ = true;
572
      cvmcache_ask_detach(ctx);
573
    }
574
  }
575
576
  bool IsInDangerZone() {
577
    return (static_cast<double>(cache_info_.pinned_bytes) /
578
            static_cast<double>(cache_info_.size_bytes)) >
579
           kDangerZoneThreshold;
580
  }
581
582
583
  struct cvmcache_info cache_info_;
584
  perf::Statistics statistics_;
585
  SmallHashDynamic<uint64_t, ObjectHeader *> transactions_;
586
  SmallHashDynamic<uint64_t, Listing *> listings_;
587
  lru::LruCache<ComparableHash, ObjectHeader *> *objects_all_;
588
  lru::LruCache<ComparableHash, ObjectHeader *> *objects_volatile_;
589
  MallocHeap *storage_;
590
  bool in_danger_zone_;
591
};  // class PluginRamCache
592
593
PluginRamCache *PluginRamCache::instance_ = NULL;
594
const uint64_t PluginRamCache::kMinSize = 100 * 1024 * 1024;
595
const double PluginRamCache::kShrinkFactor = 0.75;
596
const double PluginRamCache::kObjectExpandFactor = 1.5;
597
const double PluginRamCache::kSlotFraction = 0.04;
598
const double PluginRamCache::kDangerZoneThreshold = 0.7;
599
600
601
static void Usage(const char *progname) {
602
  LogCvmfs(kLogCache, kLogStdout, "%s <config file>", progname);
603
}
604
605
606
int main(int argc, char **argv) {
607
  if (argc < 2) {
608
    Usage(argv[0]);
609
    return 1;
610
  }
611
612
  SetLogDebugFile("/dev/null");
613
614
  cvmcache_init_global();
615
616
  cvmcache_option_map *options = cvmcache_options_init();
617
  if (cvmcache_options_parse(options, argv[1]) != 0) {
618
    LogCvmfs(kLogCache, kLogStderr, "cannot parse options file %s", argv[1]);
619
    return 1;
620
  }
621
  char *debug_log =
622
    cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_DEBUGLOG");
623
  if (debug_log != NULL) {
624
    SetLogDebugFile(debug_log);
625
    cvmcache_options_free(debug_log);
626
  }
627
  char *locator = cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_LOCATOR");
628
  if (locator == NULL) {
629
    LogCvmfs(kLogCache, kLogStderr, "CVMFS_CACHE_PLUGIN_LOCATOR missing");
630
    cvmcache_options_fini(options);
631
    return 1;
632
  }
633
  char *mem_size = cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_SIZE");
634
  if (mem_size == NULL) {
635
    LogCvmfs(kLogCache, kLogStderr, "CVMFS_CACHE_PLUGIN_SIZE missing");
636
    cvmcache_options_fini(options);
637
    return 1;
638
  }
639
  char *test_mode = cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_TEST");
640
641
  if (!test_mode)
642
    cvmcache_spawn_watchdog(NULL);
643
644
  PluginRamCache *plugin = PluginRamCache::Create(mem_size);
645
646
  struct cvmcache_callbacks callbacks;
647
  memset(&callbacks, 0, sizeof(callbacks));
648
  callbacks.cvmcache_chrefcnt = plugin->ram_chrefcnt;
649
  callbacks.cvmcache_obj_info = plugin->ram_obj_info;
650
  callbacks.cvmcache_pread = plugin->ram_pread;
651
  callbacks.cvmcache_start_txn = plugin->ram_start_txn;
652
  callbacks.cvmcache_write_txn = plugin->ram_write_txn;
653
  callbacks.cvmcache_commit_txn = plugin->ram_commit_txn;
654
  callbacks.cvmcache_abort_txn = plugin->ram_abort_txn;
655
  callbacks.cvmcache_info = plugin->ram_info;
656
  callbacks.cvmcache_shrink = plugin->ram_shrink;
657
  callbacks.cvmcache_listing_begin = plugin->ram_listing_begin;
658
  callbacks.cvmcache_listing_next = plugin->ram_listing_next;
659
  callbacks.cvmcache_listing_end = plugin->ram_listing_end;
660
  callbacks.capabilities = CVMCACHE_CAP_ALL_V1;
661
662
  ctx = cvmcache_init(&callbacks);
663
  int retval = cvmcache_listen(ctx, locator);
664
  if (!retval) {
665
    LogCvmfs(kLogCache, kLogStderr, "failed to listen on %s", locator);
666
    return 1;
667
  }
668
669
  if (test_mode) {
670
    // Daemonize, print out PID
671
    pid_t pid;
672
    int statloc;
673
    if ((pid = fork()) == 0) {
674
      if ((pid = fork()) == 0) {
675
        int null_read = open("/dev/null", O_RDONLY);
676
        int null_write = open("/dev/null", O_WRONLY);
677
        assert((null_read >= 0) && (null_write >= 0));
678
        int retval = dup2(null_read, 0);
679
        assert(retval == 0);
680
        retval = dup2(null_write, 1);
681
        assert(retval == 1);
682
        retval = dup2(null_write, 2);
683
        assert(retval == 2);
684
        close(null_read);
685
        close(null_write);
686
      } else {
687
        assert(pid > 0);
688
        printf("%d\n", pid);
689
        fflush(stdout);
690
        fsync(1);
691
        _exit(0);
692
      }
693
    } else {
694
      assert(pid > 0);
695
      waitpid(pid, &statloc, 0);
696
      _exit(0);
697
    }
698
  }
699
700
  LogCvmfs(kLogCache, kLogStdout, "Listening for cvmfs clients on %s\n"
701
           "NOTE: this process needs to run as user cvmfs\n",
702
           locator);
703
704
  cvmcache_process_requests(ctx, 0);
705
  if (test_mode)
706
    while (true) sleep(1);
707
  if (!cvmcache_is_supervised()) {
708
    LogCvmfs(kLogCache, kLogStdout, "Press <Ctrl+D> to quit");
709
    LogCvmfs(kLogCache, kLogStdout,
710
             "Press <R Enter> to ask clients to release nested catalogs");
711
    while (true) {
712
      char buf;
713
      retval = read(fileno(stdin), &buf, 1);
714
      if (retval != 1)
715
        break;
716
      if (buf == 'R') {
717
        LogCvmfs(kLogCache, kLogStdout,
718
                 "  ... asking clients to release nested catalogs");
719
        cvmcache_ask_detach(ctx);
720
      }
721
    }
722
    cvmcache_terminate(ctx);
723
  } else {
724
    LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
725
             "CernVM-FS RAM cache plugin started in supervised mode");
726
  }
727
728
  cvmcache_wait_for(ctx);
729
  LogCvmfs(kLogCache, kLogDebug | kLogStdout, "  ... good bye");
730
  cvmcache_options_free(mem_size);
731
  cvmcache_options_free(locator);
732
  cvmcache_options_fini(options);
733
  cvmcache_terminate_watchdog();
734
  cvmcache_cleanup_global();
735
  return 0;
736
}