GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/cache_plugin/cvmfs_cache_ram.cc
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 0 402 0.0%
Branches: 0 174 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * A cache plugin that stores all data in a fixed-size memory chunk.
5 */
6
7 #define __STDC_FORMAT_MACROS
8
9 #include <alloca.h>
10 #include <fcntl.h>
11 #include <inttypes.h>
12 #include <signal.h>
13 #include <stdint.h>
14 #include <sys/types.h>
15 #include <sys/wait.h>
16 #include <unistd.h>
17
18 #include <algorithm>
19 #include <cassert>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25
26 #include "cache_plugin/libcvmfs_cache.h"
27 #include "lru.h"
28 #include "malloc_heap.h"
29 #include "smallhash.h"
30 #include "util/concurrency.h"
31 #include "util/logging.h"
32 #include "util/murmur.hxx"
33 #include "util/platform.h"
34 #include "util/smalloc.h"
35 #include "util/string.h"
36
37 using namespace std; // NOLINT
38
39 /**
40 * Header of the data pieces in the cache. After the object header, the
41 * zero-terminated description and the object data follows.
42 */
43 struct ObjectHeader {
44 ObjectHeader() {
45 txn_id = uint64_t(-1);
46 size_data = 0;
47 size_desc = 0;
48 refcnt = 0;
49 type = CVMCACHE_OBJECT_REGULAR;
50 memset(&id, 0, sizeof(id));
51 }
52
53 char *GetDescription() {
54 if (size_desc == 0)
55 return NULL;
56 return reinterpret_cast<char *>(this) + sizeof(ObjectHeader);
57 }
58
59 void SetDescription(char *description) {
60 if (description == NULL)
61 return;
62 memcpy(reinterpret_cast<char *>(this) + sizeof(ObjectHeader),
63 description, strlen(description) + 1);
64 }
65
66 unsigned char *GetData() {
67 return reinterpret_cast<unsigned char *>(this) +
68 sizeof(ObjectHeader) + size_desc;
69 }
70
71 /**
72 * Set during a running transaction so that we know where to look for pointers
73 * when the memory block gets compacted. Once committed, this is
74 * uint64_t(-1).
75 */
76 uint64_t txn_id;
77 /**
78 * Can be zero.
79 */
80 uint32_t size_data;
81 /**
82 * String length + 1 (null terminated) or null if the description is NULL.
83 */
84 uint32_t size_desc;
85 /**
86 * During a transaction, neg_nbytes_written is used to track the number of
87 * already written bytes. On commit, refcnt is set to 1.
88 */
89 union {
90 int32_t refcnt;
91 int32_t neg_nbytes_written;
92 };
93 cvmcache_object_type type;
94 struct cvmcache_hash id;
95 };
96
97
98 /**
99 * Listings are generated and cached during the entire life time of a listing
100 * id. Not very memory efficient but we don't optimize for listings.
101 */
102 struct Listing {
103 Listing() : pos(0) { }
104 uint64_t pos;
105 vector<struct cvmcache_object_info> elems;
106 };
107
108
109 /**
110 * Allows us to use a cvmcache_hash in (hash) maps.
111 */
112 struct ComparableHash {
113 ComparableHash() { memset(&hash, 0, sizeof(hash)); }
114 explicit ComparableHash(const struct cvmcache_hash &h) : hash(h) { }
115 bool operator ==(const ComparableHash &other) const {
116 return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
117 const_cast<cvmcache_hash *>(&(other.hash))) == 0;
118 }
119 bool operator !=(const ComparableHash &other) const {
120 return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
121 const_cast<cvmcache_hash *>(&(other.hash))) != 0;
122 }
123 bool operator <(const ComparableHash &other) const {
124 return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
125 const_cast<cvmcache_hash *>(&(other.hash))) < 0;
126 }
127 bool operator >(const ComparableHash &other) const {
128 return cvmcache_hash_cmp(const_cast<cvmcache_hash *>(&(this->hash)),
129 const_cast<cvmcache_hash *>(&(other.hash))) > 0;
130 }
131
132 struct cvmcache_hash hash;
133 };
134
135
136 namespace {
137
138 static inline uint32_t hasher_uint64(const uint64_t &key) {
139 return MurmurHash2(&key, sizeof(key), 0x07387a4f);
140 }
141
142 static inline uint32_t hasher_any(const ComparableHash &key) {
143 return (uint32_t) *(reinterpret_cast<const uint32_t *>(&key.hash));
144 }
145
146 } // anonymous namespace
147
148
149 /**
150 * Used in the PluginRamCache when detaching nested catalogs.
151 */
152 struct cvmcache_context *ctx;
153
154
155 /**
156 * Implements all the cache plugin callbacks. Singleton.
157 */
158 class PluginRamCache : public Callbackable<MallocHeap::BlockPtr> {
159 public:
160 static PluginRamCache *Create(const string &mem_size_str) {
161 assert(instance_ == NULL);
162
163 uint64_t mem_size_bytes;
164 if (HasSuffix(mem_size_str, "%", false)) {
165 mem_size_bytes = platform_memsize() * String2Uint64(mem_size_str) / 100;
166 } else {
167 mem_size_bytes = String2Uint64(mem_size_str) * 1024 * 1024;
168 }
169 instance_ = new PluginRamCache(mem_size_bytes);
170 return instance_;
171 }
172
173 static PluginRamCache *GetInstance() {
174 assert(instance_ != NULL);
175 return instance_;
176 }
177
178 ~PluginRamCache() {
179 delete storage_;
180 delete objects_all_;
181 delete objects_volatile_;
182 instance_ = NULL;
183 }
184
185 void DropBreadcrumbs() {
186 breadcrumbs_.clear();
187 }
188
189 static int ram_chrefcnt(struct cvmcache_hash *id, int32_t change_by) {
190 ComparableHash h(*id);
191 ObjectHeader *object;
192 if (!Me()->objects_all_->Lookup(h, &object))
193 return CVMCACHE_STATUS_NOENTRY;
194
195 if (object->type == CVMCACHE_OBJECT_VOLATILE)
196 Me()->objects_volatile_->Update(h);
197
198 if (change_by == 0)
199 return CVMCACHE_STATUS_OK;
200 if ((object->refcnt + change_by) < 0)
201 return CVMCACHE_STATUS_BADCOUNT;
202
203 if (object->refcnt == 0) {
204 Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(object);
205 Me()->CheckHighPinWatermark();
206 }
207 object->refcnt += change_by;
208 if (object->refcnt == 0) {
209 Me()->cache_info_.pinned_bytes -= Me()->storage_->GetSize(object);
210 Me()->in_danger_zone_ = Me()->IsInDangerZone();
211 }
212 return CVMCACHE_STATUS_OK;
213 }
214
215
216 static int ram_obj_info(
217 struct cvmcache_hash *id,
218 struct cvmcache_object_info *info)
219 {
220 ComparableHash h(*id);
221 ObjectHeader *object;
222 if (!Me()->objects_all_->Lookup(h, &object, false))
223 return CVMCACHE_STATUS_NOENTRY;
224
225 info->size = object->size_data;
226 info->type = object->type;
227 info->pinned = object->refcnt > 0;
228 info->description = (object->GetDescription() == NULL)
229 ? NULL
230 : strdup(object->GetDescription());
231 return CVMCACHE_STATUS_OK;
232 }
233
234
235 static int ram_pread(struct cvmcache_hash *id,
236 uint64_t offset,
237 uint32_t *size,
238 unsigned char *buffer)
239 {
240 ComparableHash h(*id);
241 ObjectHeader *object;
242 bool retval = Me()->objects_all_->Lookup(h, &object, false);
243 assert(retval);
244 if (offset > object->size_data)
245 return CVMCACHE_STATUS_OUTOFBOUNDS;
246 unsigned nbytes =
247 std::min(*size, static_cast<uint32_t>(object->size_data - offset));
248 memcpy(buffer, object->GetData() + offset, nbytes);
249 *size = nbytes;
250 return CVMCACHE_STATUS_OK;
251 }
252
253
254 static int ram_start_txn(
255 struct cvmcache_hash *id,
256 uint64_t txn_id,
257 struct cvmcache_object_info *info)
258 {
259 ObjectHeader object_header;
260 object_header.txn_id = txn_id;
261 if (info->size != CVMCACHE_SIZE_UNKNOWN)
262 object_header.size_data = info->size;
263 else
264 object_header.size_data = 4096;
265 if (info->description != NULL)
266 object_header.size_desc = strlen(info->description) + 1;
267 object_header.refcnt = 1;
268 object_header.type = info->type;
269 object_header.id = *id;
270
271 uint32_t total_size = sizeof(object_header) +
272 object_header.size_desc + object_header.size_data;
273 Me()->TryFreeSpace(total_size);
274 ObjectHeader *allocd_object = reinterpret_cast<ObjectHeader *>(
275 Me()->storage_->Allocate(total_size,
276 &object_header, sizeof(object_header)));
277 if (allocd_object == NULL)
278 return CVMCACHE_STATUS_NOSPACE;
279
280 allocd_object->SetDescription(info->description);
281 Me()->transactions_.Insert(txn_id, allocd_object);
282 return CVMCACHE_STATUS_OK;
283 }
284
285
286 static int ram_write_txn(
287 uint64_t txn_id,
288 unsigned char *buffer,
289 uint32_t size)
290 {
291 ObjectHeader *txn_object;
292 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
293 assert(retval);
294 assert(size > 0);
295
296 if (txn_object->neg_nbytes_written > 0)
297 txn_object->neg_nbytes_written = 0;
298 if ((size - txn_object->neg_nbytes_written) > txn_object->size_data) {
299 uint32_t current_size = Me()->storage_->GetSize(txn_object);
300 uint32_t header_size = current_size - txn_object->size_data;
301 uint32_t new_size = std::max(
302 header_size + size - txn_object->neg_nbytes_written,
303 uint32_t(current_size * kObjectExpandFactor));
304 bool did_compact = Me()->TryFreeSpace(new_size);
305 if (did_compact) {
306 retval = Me()->transactions_.Lookup(txn_id, &txn_object);
307 assert(retval);
308 }
309 txn_object = reinterpret_cast<ObjectHeader *>(
310 Me()->storage_->Expand(txn_object, new_size));
311 if (txn_object == NULL)
312 return CVMCACHE_STATUS_NOSPACE;
313 txn_object->size_data = new_size - header_size;
314 Me()->transactions_.Insert(txn_id, txn_object);
315 }
316
317 memcpy(txn_object->GetData() - txn_object->neg_nbytes_written,
318 buffer, size);
319 txn_object->neg_nbytes_written -= size;
320 return CVMCACHE_STATUS_OK;
321 }
322
323
324 static int ram_commit_txn(uint64_t txn_id) {
325 Me()->TryFreeSpace(0);
326 if (Me()->objects_all_->IsFull())
327 return CVMCACHE_STATUS_NOSPACE;
328
329 ObjectHeader *txn_object;
330 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
331 assert(retval);
332
333 Me()->transactions_.Erase(txn_id);
334 ComparableHash h(txn_object->id);
335 ObjectHeader *existing_object;
336 if (Me()->objects_all_->Lookup(h, &existing_object)) {
337 // Concurrent addition of same objects, drop the one at hand and
338 // increase ref count of existing copy
339 Me()->storage_->MarkFree(txn_object);
340 if (existing_object->refcnt == 0)
341 Me()->cache_info_.pinned_bytes +=
342 Me()->storage_->GetSize(existing_object);
343 existing_object->refcnt++;
344 } else {
345 txn_object->txn_id = uint64_t(-1);
346 if (txn_object->neg_nbytes_written > 0)
347 txn_object->neg_nbytes_written = 0;
348 txn_object->size_data = -(txn_object->neg_nbytes_written);
349 txn_object->refcnt = 1;
350 Me()->cache_info_.used_bytes += Me()->storage_->GetSize(txn_object);
351 Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(txn_object);
352 Me()->objects_all_->Insert(h, txn_object);
353 if (txn_object->type == CVMCACHE_OBJECT_VOLATILE) {
354 assert(!Me()->objects_volatile_->IsFull());
355 Me()->objects_volatile_->Insert(h, txn_object);
356 }
357 }
358 Me()->CheckHighPinWatermark();
359 return CVMCACHE_STATUS_OK;
360 }
361
362
363 static int ram_abort_txn(uint64_t txn_id) {
364 ObjectHeader *txn_object = NULL;
365 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
366 assert(retval);
367 Me()->transactions_.Erase(txn_id);
368 Me()->storage_->MarkFree(txn_object);
369 return CVMCACHE_STATUS_OK;
370 }
371
372
373 static int ram_info(struct cvmcache_info *info) {
374 *info = Me()->cache_info_;
375 return CVMCACHE_STATUS_OK;
376 }
377
378
379 static int ram_shrink(uint64_t shrink_to, uint64_t *used) {
380 *used = Me()->cache_info_.used_bytes;
381 if (*used <= shrink_to)
382 return CVMCACHE_STATUS_OK;
383
384 Me()->DoShrink(shrink_to);
385 *used = Me()->cache_info_.used_bytes;
386 return (*used <= shrink_to) ? CVMCACHE_STATUS_OK : CVMCACHE_STATUS_PARTIAL;
387 }
388
389
390 static int ram_listing_begin(
391 uint64_t lst_id,
392 enum cvmcache_object_type type)
393 {
394 Listing *lst = new Listing();
395 Me()->objects_all_->FilterBegin();
396 while (Me()->objects_all_->FilterNext()) {
397 ComparableHash h;
398 ObjectHeader *object;
399 Me()->objects_all_->FilterGet(&h, &object);
400 if (object->type != type)
401 continue;
402
403 struct cvmcache_object_info item;
404 item.id = object->id;
405 item.size = object->size_data;
406 item.type = type;
407 item.pinned = object->refcnt != 0;
408 item.description = (object->size_desc > 0)
409 ? strdup(object->GetDescription())
410 : NULL;
411 lst->elems.push_back(item);
412 }
413 Me()->objects_all_->FilterEnd();
414
415 Me()->listings_.Insert(lst_id, lst);
416 return CVMCACHE_STATUS_OK;
417 }
418
419
420 static int ram_listing_next(
421 int64_t listing_id,
422 struct cvmcache_object_info *item)
423 {
424 Listing *lst;
425 bool retval = Me()->listings_.Lookup(listing_id, &lst);
426 assert(retval);
427 if (lst->pos >= lst->elems.size())
428 return CVMCACHE_STATUS_OUTOFBOUNDS;
429 *item = lst->elems[lst->pos];
430 lst->pos++;
431 return CVMCACHE_STATUS_OK;
432 }
433
434
435 static int ram_listing_end(int64_t listing_id) {
436 Listing *lst;
437 bool retval = Me()->listings_.Lookup(listing_id, &lst);
438 assert(retval);
439
440 // Don't free description strings, done by the library
441 delete lst;
442 Me()->listings_.Erase(listing_id);
443 return CVMCACHE_STATUS_OK;
444 }
445
446
447 static int ram_breadcrumb_store(
448 const char *fqrn,
449 const cvmcache_breadcrumb *breadcrumb)
450 {
451 Me()->breadcrumbs_[fqrn] = *breadcrumb;
452 return CVMCACHE_STATUS_OK;
453 }
454
455
456 static int ram_breadcrumb_load(
457 const char *fqrn,
458 cvmcache_breadcrumb *breadcrumb)
459 {
460 map<std::string, cvmcache_breadcrumb>::const_iterator itr =
461 Me()->breadcrumbs_.find(fqrn);
462 if (itr == Me()->breadcrumbs_.end())
463 return CVMCACHE_STATUS_NOENTRY;
464 *breadcrumb = itr->second;
465 return CVMCACHE_STATUS_OK;
466 }
467
468 private:
469 static const uint64_t kMinSize; // 100 * 1024 * 1024;
470 static const double kShrinkFactor; // = 0.75;
471 static const double kObjectExpandFactor; // = 1.5;
472 static const double kSlotFraction; // = 0.04;
473 static const double kDangerZoneThreshold; // = 0.7
474
475 static PluginRamCache *instance_;
476 static PluginRamCache *Me() {
477 return instance_;
478 }
479 explicit PluginRamCache(uint64_t mem_size) {
480 in_danger_zone_ = false;
481
482 uint64_t heap_size = RoundUp8(
483 std::max(kMinSize, uint64_t(mem_size * (1.0 - kSlotFraction))));
484 memset(&cache_info_, 0, sizeof(cache_info_));
485 cache_info_.size_bytes = heap_size;
486 storage_ = new MallocHeap(
487 heap_size, this->MakeCallback(&PluginRamCache::OnBlockMove, this));
488
489 struct cvmcache_hash hash_empty;
490 memset(&hash_empty, 0, sizeof(hash_empty));
491
492 transactions_.Init(64, uint64_t(-1), hasher_uint64);
493 listings_.Init(8, uint64_t(-1), hasher_uint64);
494
495 double slot_size =
496 lru::LruCache<ComparableHash, ObjectHeader *>::GetEntrySize();
497 uint64_t num_slots = uint64_t((heap_size * kSlotFraction) /
498 (2.0 * slot_size));
499 const unsigned mask_64 = ~((1 << 6) - 1);
500
501 LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "Allocating %" PRIu64
502 "MB of memory for up to %" PRIu64 " objects",
503 heap_size / (1024 * 1024), num_slots & mask_64);
504
505 // Number of cache entries must be a multiple of 64
506 objects_all_ = new lru::LruCache<ComparableHash, ObjectHeader *>(
507 num_slots & mask_64,
508 ComparableHash(hash_empty),
509 hasher_any,
510 perf::StatisticsTemplate("objects_all", &statistics_));
511 objects_volatile_ = new lru::LruCache<ComparableHash, ObjectHeader *>(
512 num_slots & mask_64,
513 ComparableHash(hash_empty),
514 hasher_any,
515 perf::StatisticsTemplate("objects_volatile", &statistics_));
516 }
517
518 /**
519 * Returns true if memory compaction took place and pointers might have been
520 * invalidated.
521 */
522 bool TryFreeSpace(uint64_t bytes_required) {
523 if (!objects_all_->IsFull() && storage_->HasSpaceFor(bytes_required))
524 return false;
525
526 // Free space occupied due to piecewise catalog storage
527 if (!objects_all_->IsFull()) {
528 LogCvmfs(kLogCache, kLogDebug, "compacting ram cache");
529 storage_->Compact();
530 if (storage_->HasSpaceFor(bytes_required))
531 return true;
532 }
533
534 uint64_t shrink_to = std::min(
535 storage_->capacity() - (bytes_required + 8),
536 uint64_t(storage_->capacity() * kShrinkFactor));
537 DoShrink(shrink_to);
538 return true;
539 }
540
541 void OnBlockMove(const MallocHeap::BlockPtr &ptr) {
542 assert(ptr.pointer);
543 ObjectHeader *object = reinterpret_cast<ObjectHeader *>(ptr.pointer);
544 ComparableHash h(object->id);
545 if (object->txn_id == uint64_t(-1)) {
546 bool retval = objects_all_->UpdateValue(h, object);
547 assert(retval);
548 if (object->type == CVMCACHE_OBJECT_VOLATILE) {
549 retval = objects_volatile_->UpdateValue(h, object);
550 assert(retval);
551 }
552 } else {
553 uint64_t old_size = transactions_.size();
554 transactions_.Insert(object->txn_id, object);
555 assert(old_size == transactions_.size());
556 }
557 }
558
559
560 void DoShrink(uint64_t shrink_to) {
561 ComparableHash h;
562 ObjectHeader *object;
563
564 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
565 "clean up cache until at most %lu KB is used", shrink_to / 1024);
566
567 objects_volatile_->FilterBegin();
568 while (objects_volatile_->FilterNext()) {
569 objects_volatile_->FilterGet(&h, &object);
570 if (object->refcnt != 0)
571 continue;
572 cache_info_.used_bytes -= storage_->GetSize(object);
573 storage_->MarkFree(object);
574 objects_volatile_->FilterDelete();
575 objects_all_->Forget(h);
576 if (storage_->compacted_bytes() <= shrink_to)
577 break;
578 }
579 objects_volatile_->FilterEnd();
580
581 objects_all_->FilterBegin();
582 while ((storage_->compacted_bytes() > shrink_to) &&
583 objects_all_->FilterNext())
584 {
585 objects_all_->FilterGet(&h, &object);
586 if (object->refcnt != 0)
587 continue;
588 assert(object->type != CVMCACHE_OBJECT_VOLATILE);
589 cache_info_.used_bytes -= storage_->GetSize(object);
590 storage_->MarkFree(object);
591 objects_all_->FilterDelete();
592 }
593 objects_all_->FilterEnd();
594
595 storage_->Compact();
596 cache_info_.no_shrink++;
597 }
598
599 void CheckHighPinWatermark() {
600 if (!Me()->in_danger_zone_ && Me()->IsInDangerZone()) {
601 LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslog,
602 "high watermark of pinned files");
603 Me()->in_danger_zone_ = true;
604 cvmcache_ask_detach(ctx);
605 }
606 }
607
608 bool IsInDangerZone() {
609 return (static_cast<double>(cache_info_.pinned_bytes) /
610 static_cast<double>(cache_info_.size_bytes)) >
611 kDangerZoneThreshold;
612 }
613
614
615 struct cvmcache_info cache_info_;
616 perf::Statistics statistics_;
617 SmallHashDynamic<uint64_t, ObjectHeader *> transactions_;
618 SmallHashDynamic<uint64_t, Listing *> listings_;
619 lru::LruCache<ComparableHash, ObjectHeader *> *objects_all_;
620 lru::LruCache<ComparableHash, ObjectHeader *> *objects_volatile_;
621 map<std::string, cvmcache_breadcrumb> breadcrumbs_;
622 MallocHeap *storage_;
623 bool in_danger_zone_;
624 }; // class PluginRamCache
625
626 PluginRamCache *PluginRamCache::instance_ = NULL;
627 const uint64_t PluginRamCache::kMinSize = 100 * 1024 * 1024;
628 const double PluginRamCache::kShrinkFactor = 0.75;
629 const double PluginRamCache::kObjectExpandFactor = 1.5;
630 const double PluginRamCache::kSlotFraction = 0.04;
631 const double PluginRamCache::kDangerZoneThreshold = 0.7;
632
633
634 static void Usage(const char *progname) {
635 LogCvmfs(kLogCache, kLogStdout, "%s <config file>", progname);
636 }
637
638
639 /**
640 * For testing and debugging purposes, the cache manager drops its
641 * breadcrumb cache upon SIGUSR2 retrieval
642 */
643 void DropBreadcrumbs(int sig, siginfo_t *siginfo, void *context) {
644 LogCvmfs(kLogCache, kLogSyslog | kLogDebug, "dropping breadcrumbs");
645 PluginRamCache::GetInstance()->DropBreadcrumbs();
646 }
647
648
649 int main(int argc, char **argv) {
650 if (argc < 2) {
651 Usage(argv[0]);
652 return 1;
653 }
654
655 SetLogDebugFile("/dev/null");
656
657 cvmcache_init_global();
658
659 cvmcache_option_map *options = cvmcache_options_init();
660 if (cvmcache_options_parse(options, argv[1]) != 0) {
661 LogCvmfs(kLogCache, kLogStderr, "cannot parse options file %s", argv[1]);
662 return 1;
663 }
664 char *debug_log =
665 cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_DEBUGLOG");
666 if (debug_log != NULL) {
667 SetLogDebugFile(debug_log);
668 cvmcache_options_free(debug_log);
669 }
670 char *locator = cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_LOCATOR");
671 if (locator == NULL) {
672 LogCvmfs(kLogCache, kLogStderr, "CVMFS_CACHE_PLUGIN_LOCATOR missing");
673 cvmcache_options_fini(options);
674 return 1;
675 }
676 char *mem_size = cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_SIZE");
677 if (mem_size == NULL) {
678 LogCvmfs(kLogCache, kLogStderr, "CVMFS_CACHE_PLUGIN_SIZE missing");
679 cvmcache_options_fini(options);
680 return 1;
681 }
682 char *test_mode = cvmcache_options_get(options, "CVMFS_CACHE_PLUGIN_TEST");
683
684 if (!test_mode)
685 cvmcache_spawn_watchdog(NULL);
686
687 PluginRamCache *plugin = PluginRamCache::Create(mem_size);
688 struct sigaction sa;
689 memset(&sa, 0, sizeof(sa));
690 sa.sa_sigaction = DropBreadcrumbs;
691 sa.sa_flags = SA_SIGINFO;
692 sigfillset(&sa.sa_mask);
693 int retval = sigaction(SIGUSR2, &sa, NULL);
694 assert(retval == 0);
695
696 struct cvmcache_callbacks callbacks;
697 memset(&callbacks, 0, sizeof(callbacks));
698 callbacks.cvmcache_chrefcnt = plugin->ram_chrefcnt;
699 callbacks.cvmcache_obj_info = plugin->ram_obj_info;
700 callbacks.cvmcache_pread = plugin->ram_pread;
701 callbacks.cvmcache_start_txn = plugin->ram_start_txn;
702 callbacks.cvmcache_write_txn = plugin->ram_write_txn;
703 callbacks.cvmcache_commit_txn = plugin->ram_commit_txn;
704 callbacks.cvmcache_abort_txn = plugin->ram_abort_txn;
705 callbacks.cvmcache_info = plugin->ram_info;
706 callbacks.cvmcache_shrink = plugin->ram_shrink;
707 callbacks.cvmcache_listing_begin = plugin->ram_listing_begin;
708 callbacks.cvmcache_listing_next = plugin->ram_listing_next;
709 callbacks.cvmcache_listing_end = plugin->ram_listing_end;
710 callbacks.cvmcache_breadcrumb_store = plugin->ram_breadcrumb_store;
711 callbacks.cvmcache_breadcrumb_load = plugin->ram_breadcrumb_load;
712 callbacks.capabilities = CVMCACHE_CAP_ALL_V2;
713
714 ctx = cvmcache_init(&callbacks);
715 retval = cvmcache_listen(ctx, locator);
716 if (!retval) {
717 LogCvmfs(kLogCache, kLogStderr, "failed to listen on %s", locator);
718 return 1;
719 }
720
721 if (test_mode) {
722 // Daemonize, print out PID
723 pid_t pid;
724 int statloc;
725 if ((pid = fork()) == 0) {
726 if ((pid = fork()) == 0) {
727 int null_read = open("/dev/null", O_RDONLY);
728 int null_write = open("/dev/null", O_WRONLY);
729 assert((null_read >= 0) && (null_write >= 0));
730 int retval = dup2(null_read, 0);
731 assert(retval == 0);
732 retval = dup2(null_write, 1);
733 assert(retval == 1);
734 retval = dup2(null_write, 2);
735 assert(retval == 2);
736 close(null_read);
737 close(null_write);
738 } else {
739 assert(pid > 0);
740 printf("%d\n", pid);
741 fflush(stdout);
742 fsync(1);
743 _exit(0);
744 }
745 } else {
746 assert(pid > 0);
747 waitpid(pid, &statloc, 0);
748 _exit(0);
749 }
750 }
751
752 LogCvmfs(kLogCache, kLogStdout, "Listening for cvmfs clients on %s\n"
753 "NOTE: this process needs to run as user cvmfs\n",
754 locator);
755
756 cvmcache_process_requests(ctx, 0);
757 if (test_mode)
758 while (true) sleep(1);
759 if (!cvmcache_is_supervised()) {
760 LogCvmfs(kLogCache, kLogStdout, "Press <Ctrl+D> to quit");
761 LogCvmfs(kLogCache, kLogStdout,
762 "Press <R Enter> to ask clients to release nested catalogs");
763 while (true) {
764 char buf;
765 retval = read(fileno(stdin), &buf, 1);
766 if (retval != 1)
767 break;
768 if (buf == 'R') {
769 LogCvmfs(kLogCache, kLogStdout,
770 " ... asking clients to release nested catalogs");
771 cvmcache_ask_detach(ctx);
772 }
773 }
774 cvmcache_terminate(ctx);
775 } else {
776 LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
777 "CernVM-FS RAM cache plugin started in supervised mode");
778 }
779
780 cvmcache_wait_for(ctx);
781 LogCvmfs(kLogCache, kLogDebug | kLogStdout, " ... good bye");
782 cvmcache_options_free(mem_size);
783 cvmcache_options_free(locator);
784 cvmcache_options_fini(options);
785 cvmcache_terminate_watchdog();
786 cvmcache_cleanup_global();
787 return 0;
788 }
789