7 #define __STDC_FORMAT_MACROS
14 #include <sys/types.h>
45 txn_id = uint64_t(-1);
50 memset(&
id, 0,
sizeof(
id));
56 return reinterpret_cast<char *
>(
this) +
sizeof(
ObjectHeader);
60 if (description == NULL)
62 memcpy(reinterpret_cast<char *>(
this) +
sizeof(
ObjectHeader), description,
63 strlen(description) + 1);
67 return reinterpret_cast<unsigned char *
>(
this) +
sizeof(
ObjectHeader)
105 vector<struct cvmcache_object_info>
elems;
117 const_cast<cvmcache_hash *>(&(other.
hash)))
122 const_cast<cvmcache_hash *>(&(other.
hash)))
127 const_cast<cvmcache_hash *>(&(other.
hash)))
132 const_cast<cvmcache_hash *>(&(other.
hash)))
147 return (uint32_t) * (
reinterpret_cast<const uint32_t *
>(&key.
hash));
165 assert(instance_ == NULL);
167 uint64_t mem_size_bytes;
168 if (
HasSuffix(mem_size_str,
"%",
false)) {
178 assert(instance_ != NULL);
185 delete objects_volatile_;
194 if (!Me()->objects_all_->Lookup(h, &
object))
198 Me()->objects_volatile_->Update(h);
202 if ((object->
refcnt + change_by) < 0)
205 if (object->
refcnt == 0) {
206 Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(
object);
207 Me()->CheckHighPinWatermark();
209 object->refcnt += change_by;
210 if (object->
refcnt == 0) {
211 Me()->cache_info_.pinned_bytes -= Me()->storage_->GetSize(
object);
212 Me()->in_danger_zone_ = Me()->IsInDangerZone();
222 if (!Me()->objects_all_->Lookup(h, &
object,
false))
225 info->
size =
object->size_data;
226 info->
type =
object->type;
227 info->
pinned =
object->refcnt > 0;
228 info->
description = (
object->GetDescription() == NULL)
238 unsigned char *buffer) {
241 bool retval = Me()->objects_all_->Lookup(h, &
object,
false);
245 unsigned nbytes = std::min(
246 *size, static_cast<uint32_t>(object->
size_data - offset));
247 memcpy(buffer, object->
GetData() + offset, nbytes);
257 object_header.
txn_id = txn_id;
266 object_header.
id = *id;
268 uint32_t total_size =
sizeof(object_header) + object_header.
size_desc
270 Me()->TryFreeSpace(total_size);
272 Me()->storage_->Allocate(total_size, &object_header,
273 sizeof(object_header)));
274 if (allocd_object == NULL)
278 Me()->transactions_.Insert(txn_id, allocd_object);
284 unsigned char *buffer,
287 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
294 uint32_t current_size = Me()->storage_->GetSize(txn_object);
295 uint32_t header_size = current_size - txn_object->
size_data;
296 uint32_t new_size = std::max(
298 uint32_t(current_size * kObjectExpandFactor));
299 bool did_compact = Me()->TryFreeSpace(new_size);
301 retval = Me()->transactions_.Lookup(txn_id, &txn_object);
305 Me()->storage_->Expand(txn_object, new_size));
306 if (txn_object == NULL)
308 txn_object->
size_data = new_size - header_size;
309 Me()->transactions_.Insert(txn_id, txn_object);
320 Me()->TryFreeSpace(0);
321 if (Me()->objects_all_->IsFull())
325 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
328 Me()->transactions_.Erase(txn_id);
331 if (Me()->objects_all_->Lookup(h, &existing_object)) {
334 Me()->storage_->MarkFree(txn_object);
335 if (existing_object->
refcnt == 0)
336 Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(
338 existing_object->
refcnt++;
340 txn_object->
txn_id = uint64_t(-1);
345 Me()->cache_info_.used_bytes += Me()->storage_->GetSize(txn_object);
346 Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(txn_object);
347 Me()->objects_all_->Insert(h, txn_object);
349 assert(!Me()->objects_volatile_->IsFull());
350 Me()->objects_volatile_->Insert(h, txn_object);
353 Me()->CheckHighPinWatermark();
360 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
362 Me()->transactions_.Erase(txn_id);
363 Me()->storage_->MarkFree(txn_object);
369 *info = Me()->cache_info_;
375 *used = Me()->cache_info_.used_bytes;
376 if (*used <= shrink_to)
379 Me()->DoShrink(shrink_to);
380 *used = Me()->cache_info_.used_bytes;
388 Me()->objects_all_->FilterBegin();
389 while (Me()->objects_all_->FilterNext()) {
392 Me()->objects_all_->FilterGet(&h, &
object);
393 if (object->
type != type)
397 item.
id =
object->id;
398 item.
size =
object->size_data;
400 item.
pinned =
object->refcnt != 0;
404 lst->
elems.push_back(item);
406 Me()->objects_all_->FilterEnd();
408 Me()->listings_.Insert(lst_id, lst);
416 bool retval = Me()->listings_.Lookup(listing_id, &lst);
428 bool retval = Me()->listings_.Lookup(listing_id, &lst);
433 Me()->listings_.Erase(listing_id);
440 Me()->breadcrumbs_[fqrn] = *breadcrumb;
447 map<std::string, cvmcache_breadcrumb>::const_iterator
448 itr = Me()->breadcrumbs_.find(fqrn);
449 if (itr == Me()->breadcrumbs_.end())
451 *breadcrumb = itr->second;
465 in_danger_zone_ =
false;
468 std::max(kMinSize, uint64_t(mem_size * (1.0 - kSlotFraction))));
469 memset(&cache_info_, 0,
sizeof(cache_info_));
470 cache_info_.size_bytes = heap_size;
475 memset(&hash_empty, 0,
sizeof(hash_empty));
482 uint64_t num_slots = uint64_t((heap_size * kSlotFraction)
483 / (2.0 * slot_size));
484 const unsigned mask_64 = ~((1 << 6) - 1);
487 "Allocating %" PRIu64
"MB of memory for up to %" PRIu64
" objects",
488 heap_size / (1024 * 1024), num_slots & mask_64);
493 ComparableHash(hash_empty),
498 ComparableHash(hash_empty),
508 if (!objects_all_->IsFull() && storage_->HasSpaceFor(bytes_required))
512 if (!objects_all_->IsFull()) {
515 if (storage_->HasSpaceFor(bytes_required))
519 uint64_t shrink_to = std::min(
520 storage_->capacity() - (bytes_required + 8),
521 uint64_t(storage_->capacity() * kShrinkFactor));
530 if (object->
txn_id == uint64_t(-1)) {
531 bool retval = objects_all_->UpdateValue(h,
object);
534 retval = objects_volatile_->UpdateValue(h,
object);
538 uint64_t old_size = transactions_.size();
539 transactions_.Insert(object->
txn_id,
object);
540 assert(old_size == transactions_.size());
550 "clean up cache until at most %lu KB is used", shrink_to / 1024);
552 objects_volatile_->FilterBegin();
553 while (objects_volatile_->FilterNext()) {
554 objects_volatile_->FilterGet(&h, &
object);
557 cache_info_.used_bytes -= storage_->GetSize(
object);
558 storage_->MarkFree(
object);
559 objects_volatile_->FilterDelete();
560 objects_all_->Forget(h);
561 if (storage_->compacted_bytes() <= shrink_to)
564 objects_volatile_->FilterEnd();
566 objects_all_->FilterBegin();
567 while ((storage_->compacted_bytes() > shrink_to)
568 && objects_all_->FilterNext()) {
569 objects_all_->FilterGet(&h, &
object);
573 cache_info_.used_bytes -= storage_->GetSize(
object);
574 storage_->MarkFree(
object);
575 objects_all_->FilterDelete();
577 objects_all_->FilterEnd();
580 cache_info_.no_shrink++;
584 if (!Me()->in_danger_zone_ && Me()->IsInDangerZone()) {
586 "high watermark of pinned files");
587 Me()->in_danger_zone_ =
true;
593 return (static_cast<double>(cache_info_.pinned_bytes)
594 / static_cast<double>(cache_info_.size_bytes))
595 > kDangerZoneThreshold;
618 static void Usage(
const char *progname) {
633 int main(
int argc,
char **argv) {
649 "CVMFS_CACHE_PLUGIN_DEBUGLOG");
650 if (debug_log != NULL) {
655 if (locator == NULL) {
661 if (mem_size == NULL) {
673 memset(&sa, 0,
sizeof(sa));
675 sa.sa_flags = SA_SIGINFO;
676 sigfillset(&sa.sa_mask);
677 int retval = sigaction(SIGUSR2, &sa, NULL);
681 memset(&callbacks, 0,
sizeof(callbacks));
709 if ((pid = fork()) == 0) {
710 if ((pid = fork()) == 0) {
711 int null_read = open(
"/dev/null", O_RDONLY);
712 int null_write = open(
"/dev/null", O_WRONLY);
713 assert((null_read >= 0) && (null_write >= 0));
714 int retval = dup2(null_read, 0);
716 retval = dup2(null_write, 1);
718 retval = dup2(null_write, 2);
731 waitpid(pid, &statloc, 0);
737 "Listening for cvmfs clients on %s\n"
738 "NOTE: this process needs to run as user cvmfs\n",
748 "Press <R Enter> to ask clients to release nested catalogs");
751 retval = read(fileno(stdin), &buf, 1);
756 " ... asking clients to release nested catalogs");
763 "CernVM-FS RAM cache plugin started in supervised mode");
static int ram_commit_txn(uint64_t txn_id)
static int ram_pread(struct cvmcache_hash *id, uint64_t offset, uint32_t *size, unsigned char *buffer)
int(* cvmcache_chrefcnt)(struct cvmcache_hash *id, int32_t change_by)
static int ram_listing_next(int64_t listing_id, struct cvmcache_object_info *item)
static int ram_write_txn(uint64_t txn_id, unsigned char *buffer, uint32_t size)
struct cvmcache_context * ctx
struct cvmcache_hash hash
static int ram_chrefcnt(struct cvmcache_hash *id, int32_t change_by)
int(* cvmcache_info)(struct cvmcache_info *info)
SmallHashDynamic< uint64_t, ObjectHeader * > transactions_
static void Usage(const char *progname)
void cvmcache_terminate(struct cvmcache_context *ctx)
perf::Statistics statistics_
int cvmcache_listen(struct cvmcache_context *ctx, char *locator)
int(* cvmcache_listing_next)(int64_t lst_id, struct cvmcache_object_info *item)
void cvmcache_cleanup_global()
static const double kObjectExpandFactor
static int ram_listing_begin(uint64_t lst_id, enum cvmcache_object_type type)
void cvmcache_terminate_watchdog()
perf::Statistics * statistics_
static PluginRamCache * Create(const string &mem_size_str)
assert((mem||(size==0))&&"Out Of Memory")
void OnBlockMove(const MallocHeap::BlockPtr &ptr)
bool operator!=(const ComparableHash &other) const
int cvmcache_is_supervised()
static uint32_t hasher_any(const ComparableHash &key)
int(* cvmcache_pread)(struct cvmcache_hash *id, uint64_t offset, uint32_t *size, unsigned char *buffer)
static PluginRamCache * GetInstance()
int(* cvmcache_breadcrumb_load)(const char *fqrn, cvmcache_breadcrumb *breadcrumb)
#define SetLogDebugFile(filename)
static int ram_info(struct cvmcache_info *info)
void cvmcache_wait_for(struct cvmcache_context *ctx)
SmallHashDynamic< uint64_t, Listing * > listings_
void CheckHighPinWatermark()
static int ram_abort_txn(uint64_t txn_id)
void cvmcache_ask_detach(struct cvmcache_context *ctx)
vector< struct cvmcache_object_info > elems
int(* cvmcache_write_txn)(uint64_t txn_id, unsigned char *buffer, uint32_t size)
bool operator==(const ComparableHash &other) const
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
PluginRamCache(uint64_t mem_size)
static int ram_start_txn(struct cvmcache_hash *id, uint64_t txn_id, struct cvmcache_object_info *info)
int cvmcache_hash_cmp(struct cvmcache_hash *a, struct cvmcache_hash *b)
static const double kShrinkFactor
lru::LruCache< ComparableHash, ObjectHeader * > * objects_all_
void cvmcache_options_free(char *value)
static uint64_t RoundUp8(const uint64_t size)
void cvmcache_spawn_watchdog(const char *crash_dump_file)
static const double kDangerZoneThreshold
void cvmcache_options_fini(cvmcache_option_map *opts)
enum cvmcache_object_type type
void DoShrink(uint64_t shrink_to)
static const double kSlotFraction
static PluginRamCache * Me()
struct cvmcache_context * cvmcache_init(struct cvmcache_callbacks *callbacks)
static int ram_listing_end(int64_t listing_id)
static uint32_t hasher_uint64(const uint64_t &key)
lru::LruCache< ComparableHash, ObjectHeader * > * objects_volatile_
bool TryFreeSpace(uint64_t bytes_required)
void cvmcache_init_global()
static const uint64_t kMinSize
static int ram_shrink(uint64_t shrink_to, uint64_t *used)
int cvmcache_options_parse(cvmcache_option_map *opts, const char *path)
void DropBreadcrumbs(int sig, siginfo_t *siginfo, void *context)
int(* cvmcache_shrink)(uint64_t shrink_to, uint64_t *used)
#define CVMCACHE_SIZE_UNKNOWN
static int ram_breadcrumb_store(const char *fqrn, const cvmcache_breadcrumb *breadcrumb)
uint64_t String2Uint64(const string &value)
int(* cvmcache_listing_begin)(uint64_t lst_id, enum cvmcache_object_type type)
bool operator>(const ComparableHash &other) const
void cvmcache_process_requests(struct cvmcache_context *ctx, unsigned nworkers)
int(* cvmcache_breadcrumb_store)(const char *fqrn, const cvmcache_breadcrumb *breadcrumb)
map< std::string, cvmcache_breadcrumb > breadcrumbs_
char * cvmcache_options_get(cvmcache_option_map *opts, const char *key)
int(* cvmcache_commit_txn)(uint64_t txn_id)
int(* cvmcache_start_txn)(struct cvmcache_hash *id, uint64_t txn_id, struct cvmcache_object_info *info)
int(* cvmcache_listing_end)(int64_t lst_id)
int(* cvmcache_abort_txn)(uint64_t txn_id)
ComparableHash(const struct cvmcache_hash &h)
static int ram_obj_info(struct cvmcache_hash *id, struct cvmcache_object_info *info)
static PluginRamCache * instance_
int(* cvmcache_obj_info)(struct cvmcache_hash *id, struct cvmcache_object_info *info)
uint32_t MurmurHash2(const void *key, int len, uint32_t seed)
cvmcache_option_map * cvmcache_options_init()
static int ram_breadcrumb_load(const char *fqrn, cvmcache_breadcrumb *breadcrumb)
bool operator<(const ComparableHash &other) const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)