7 #define __STDC_FORMAT_MACROS
14 #include <sys/types.h>
45 txn_id = uint64_t(-1);
50 memset(&
id, 0,
sizeof(
id));
56 return reinterpret_cast<char *
>(
this) +
sizeof(
ObjectHeader);
60 if (description == NULL)
62 memcpy(reinterpret_cast<char *>(
this) +
sizeof(
ObjectHeader),
63 description, strlen(description) + 1);
67 return reinterpret_cast<unsigned char *
>(
this) +
105 vector<struct cvmcache_object_info>
elems;
117 const_cast<cvmcache_hash *>(&(other.
hash))) == 0;
121 const_cast<cvmcache_hash *>(&(other.
hash))) != 0;
125 const_cast<cvmcache_hash *>(&(other.
hash))) < 0;
129 const_cast<cvmcache_hash *>(&(other.
hash))) > 0;
143 return (uint32_t) *(
reinterpret_cast<const uint32_t *
>(&key.
hash));
161 assert(instance_ == NULL);
163 uint64_t mem_size_bytes;
164 if (
HasSuffix(mem_size_str,
"%",
false)) {
174 assert(instance_ != NULL);
181 delete objects_volatile_;
186 breadcrumbs_.clear();
192 if (!Me()->objects_all_->Lookup(h, &
object))
196 Me()->objects_volatile_->Update(h);
200 if ((object->
refcnt + change_by) < 0)
203 if (object->
refcnt == 0) {
204 Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(
object);
205 Me()->CheckHighPinWatermark();
207 object->refcnt += change_by;
208 if (object->
refcnt == 0) {
209 Me()->cache_info_.pinned_bytes -= Me()->storage_->GetSize(
object);
210 Me()->in_danger_zone_ = Me()->IsInDangerZone();
222 if (!Me()->objects_all_->Lookup(h, &
object,
false))
225 info->
size =
object->size_data;
226 info->
type =
object->type;
227 info->
pinned =
object->refcnt > 0;
228 info->
description = (
object->GetDescription() == NULL)
238 unsigned char *buffer)
242 bool retval = Me()->objects_all_->Lookup(h, &
object,
false);
247 std::min(*size, static_cast<uint32_t>(object->
size_data - offset));
248 memcpy(buffer, object->
GetData() + offset, nbytes);
260 object_header.
txn_id = txn_id;
269 object_header.
id = *id;
271 uint32_t total_size =
sizeof(object_header) +
273 Me()->TryFreeSpace(total_size);
275 Me()->storage_->Allocate(total_size,
276 &object_header,
sizeof(object_header)));
277 if (allocd_object == NULL)
281 Me()->transactions_.Insert(txn_id, allocd_object);
288 unsigned char *buffer,
292 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
299 uint32_t current_size = Me()->storage_->GetSize(txn_object);
300 uint32_t header_size = current_size - txn_object->
size_data;
301 uint32_t new_size = std::max(
303 uint32_t(current_size * kObjectExpandFactor));
304 bool did_compact = Me()->TryFreeSpace(new_size);
306 retval = Me()->transactions_.Lookup(txn_id, &txn_object);
310 Me()->storage_->Expand(txn_object, new_size));
311 if (txn_object == NULL)
313 txn_object->
size_data = new_size - header_size;
314 Me()->transactions_.Insert(txn_id, txn_object);
325 Me()->TryFreeSpace(0);
326 if (Me()->objects_all_->IsFull())
330 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
333 Me()->transactions_.Erase(txn_id);
336 if (Me()->objects_all_->Lookup(h, &existing_object)) {
339 Me()->storage_->MarkFree(txn_object);
340 if (existing_object->
refcnt == 0)
341 Me()->cache_info_.pinned_bytes +=
342 Me()->storage_->GetSize(existing_object);
343 existing_object->
refcnt++;
345 txn_object->
txn_id = uint64_t(-1);
350 Me()->cache_info_.used_bytes += Me()->storage_->GetSize(txn_object);
351 Me()->cache_info_.pinned_bytes += Me()->storage_->GetSize(txn_object);
352 Me()->objects_all_->Insert(h, txn_object);
354 assert(!Me()->objects_volatile_->IsFull());
355 Me()->objects_volatile_->Insert(h, txn_object);
358 Me()->CheckHighPinWatermark();
365 int retval = Me()->transactions_.Lookup(txn_id, &txn_object);
367 Me()->transactions_.Erase(txn_id);
368 Me()->storage_->MarkFree(txn_object);
374 *info = Me()->cache_info_;
380 *used = Me()->cache_info_.used_bytes;
381 if (*used <= shrink_to)
384 Me()->DoShrink(shrink_to);
385 *used = Me()->cache_info_.used_bytes;
395 Me()->objects_all_->FilterBegin();
396 while (Me()->objects_all_->FilterNext()) {
399 Me()->objects_all_->FilterGet(&h, &
object);
400 if (object->
type != type)
404 item.
id =
object->id;
405 item.
size =
object->size_data;
407 item.
pinned =
object->refcnt != 0;
411 lst->
elems.push_back(item);
413 Me()->objects_all_->FilterEnd();
415 Me()->listings_.Insert(lst_id, lst);
425 bool retval = Me()->listings_.Lookup(listing_id, &lst);
437 bool retval = Me()->listings_.Lookup(listing_id, &lst);
442 Me()->listings_.Erase(listing_id);
451 Me()->breadcrumbs_[fqrn] = *breadcrumb;
460 map<std::string, cvmcache_breadcrumb>::const_iterator itr =
461 Me()->breadcrumbs_.find(fqrn);
462 if (itr == Me()->breadcrumbs_.end())
464 *breadcrumb = itr->second;
480 in_danger_zone_ =
false;
483 std::max(kMinSize, uint64_t(mem_size * (1.0 - kSlotFraction))));
484 memset(&cache_info_, 0,
sizeof(cache_info_));
485 cache_info_.size_bytes = heap_size;
490 memset(&hash_empty, 0,
sizeof(hash_empty));
497 uint64_t num_slots = uint64_t((heap_size * kSlotFraction) /
499 const unsigned mask_64 = ~((1 << 6) - 1);
502 "MB of memory for up to %" PRIu64
" objects",
503 heap_size / (1024 * 1024), num_slots & mask_64);
523 if (!objects_all_->IsFull() && storage_->HasSpaceFor(bytes_required))
527 if (!objects_all_->IsFull()) {
530 if (storage_->HasSpaceFor(bytes_required))
534 uint64_t shrink_to = std::min(
535 storage_->capacity() - (bytes_required + 8),
536 uint64_t(storage_->capacity() * kShrinkFactor));
545 if (object->
txn_id == uint64_t(-1)) {
546 bool retval = objects_all_->UpdateValue(h,
object);
549 retval = objects_volatile_->UpdateValue(h,
object);
553 uint64_t old_size = transactions_.size();
554 transactions_.Insert(object->
txn_id,
object);
555 assert(old_size == transactions_.size());
565 "clean up cache until at most %lu KB is used", shrink_to / 1024);
567 objects_volatile_->FilterBegin();
568 while (objects_volatile_->FilterNext()) {
569 objects_volatile_->FilterGet(&h, &
object);
572 cache_info_.used_bytes -= storage_->GetSize(
object);
573 storage_->MarkFree(
object);
574 objects_volatile_->FilterDelete();
575 objects_all_->Forget(h);
576 if (storage_->compacted_bytes() <= shrink_to)
579 objects_volatile_->FilterEnd();
581 objects_all_->FilterBegin();
582 while ((storage_->compacted_bytes() > shrink_to) &&
583 objects_all_->FilterNext())
585 objects_all_->FilterGet(&h, &
object);
589 cache_info_.used_bytes -= storage_->GetSize(
object);
590 storage_->MarkFree(
object);
591 objects_all_->FilterDelete();
593 objects_all_->FilterEnd();
596 cache_info_.no_shrink++;
600 if (!Me()->in_danger_zone_ && Me()->IsInDangerZone()) {
602 "high watermark of pinned files");
603 Me()->in_danger_zone_ =
true;
609 return (static_cast<double>(cache_info_.pinned_bytes) /
610 static_cast<double>(cache_info_.size_bytes)) >
611 kDangerZoneThreshold;
634 static void Usage(
const char *progname) {
649 int main(
int argc,
char **argv) {
666 if (debug_log != NULL) {
671 if (locator == NULL) {
677 if (mem_size == NULL) {
689 memset(&sa, 0,
sizeof(sa));
691 sa.sa_flags = SA_SIGINFO;
692 sigfillset(&sa.sa_mask);
693 int retval = sigaction(SIGUSR2, &sa, NULL);
697 memset(&callbacks, 0,
sizeof(callbacks));
725 if ((pid = fork()) == 0) {
726 if ((pid = fork()) == 0) {
727 int null_read = open(
"/dev/null", O_RDONLY);
728 int null_write = open(
"/dev/null", O_WRONLY);
729 assert((null_read >= 0) && (null_write >= 0));
730 int retval = dup2(null_read, 0);
732 retval = dup2(null_write, 1);
734 retval = dup2(null_write, 2);
747 waitpid(pid, &statloc, 0);
753 "NOTE: this process needs to run as user cvmfs\n",
758 while (
true) sleep(1);
762 "Press <R Enter> to ask clients to release nested catalogs");
765 retval = read(fileno(stdin), &buf, 1);
770 " ... asking clients to release nested catalogs");
777 "CernVM-FS RAM cache plugin started in supervised mode");
static int ram_commit_txn(uint64_t txn_id)
static int ram_pread(struct cvmcache_hash *id, uint64_t offset, uint32_t *size, unsigned char *buffer)
int(* cvmcache_chrefcnt)(struct cvmcache_hash *id, int32_t change_by)
static int ram_listing_next(int64_t listing_id, struct cvmcache_object_info *item)
static int ram_write_txn(uint64_t txn_id, unsigned char *buffer, uint32_t size)
struct cvmcache_context * ctx
struct cvmcache_hash hash
static int ram_chrefcnt(struct cvmcache_hash *id, int32_t change_by)
int(* cvmcache_info)(struct cvmcache_info *info)
SmallHashDynamic< uint64_t, ObjectHeader * > transactions_
static void Usage(const char *progname)
void cvmcache_terminate(struct cvmcache_context *ctx)
perf::Statistics statistics_
int cvmcache_listen(struct cvmcache_context *ctx, char *locator)
int(* cvmcache_listing_next)(int64_t lst_id, struct cvmcache_object_info *item)
void cvmcache_cleanup_global()
static const double kObjectExpandFactor
static int ram_listing_begin(uint64_t lst_id, enum cvmcache_object_type type)
void cvmcache_terminate_watchdog()
perf::Statistics * statistics_
static PluginRamCache * Create(const string &mem_size_str)
assert((mem||(size==0))&&"Out Of Memory")
void OnBlockMove(const MallocHeap::BlockPtr &ptr)
int cvmcache_is_supervised()
static uint32_t hasher_any(const ComparableHash &key)
int(* cvmcache_pread)(struct cvmcache_hash *id, uint64_t offset, uint32_t *size, unsigned char *buffer)
static double GetEntrySize()
static PluginRamCache * GetInstance()
int(* cvmcache_breadcrumb_load)(const char *fqrn, cvmcache_breadcrumb *breadcrumb)
#define SetLogDebugFile(filename)
static int ram_info(struct cvmcache_info *info)
void cvmcache_wait_for(struct cvmcache_context *ctx)
SmallHashDynamic< uint64_t, Listing * > listings_
void CheckHighPinWatermark()
static int ram_abort_txn(uint64_t txn_id)
void cvmcache_ask_detach(struct cvmcache_context *ctx)
vector< struct cvmcache_object_info > elems
int(* cvmcache_write_txn)(uint64_t txn_id, unsigned char *buffer, uint32_t size)
bool HasSuffix(const std::string &str, const std::string &suffix, const bool ignore_case)
PluginRamCache(uint64_t mem_size)
static int ram_start_txn(struct cvmcache_hash *id, uint64_t txn_id, struct cvmcache_object_info *info)
int cvmcache_hash_cmp(struct cvmcache_hash *a, struct cvmcache_hash *b)
bool operator<(SharedPtr< T > const &a, SharedPtr< U > const &b)
static const double kShrinkFactor
lru::LruCache< ComparableHash, ObjectHeader * > * objects_all_
bool operator!=(const cvmcache_hash &a, const cvmcache_hash &b)
void cvmcache_options_free(char *value)
static uint64_t RoundUp8(const uint64_t size)
void cvmcache_spawn_watchdog(const char *crash_dump_file)
static const double kDangerZoneThreshold
void cvmcache_options_fini(cvmcache_option_map *opts)
enum cvmcache_object_type type
void DoShrink(uint64_t shrink_to)
static const double kSlotFraction
static PluginRamCache * Me()
struct cvmcache_context * cvmcache_init(struct cvmcache_callbacks *callbacks)
static int ram_listing_end(int64_t listing_id)
static uint32_t hasher_uint64(const uint64_t &key)
lru::LruCache< ComparableHash, ObjectHeader * > * objects_volatile_
bool TryFreeSpace(uint64_t bytes_required)
void cvmcache_init_global()
static const uint64_t kMinSize
static int ram_shrink(uint64_t shrink_to, uint64_t *used)
int cvmcache_options_parse(cvmcache_option_map *opts, const char *path)
void DropBreadcrumbs(int sig, siginfo_t *siginfo, void *context)
int(* cvmcache_shrink)(uint64_t shrink_to, uint64_t *used)
#define CVMCACHE_SIZE_UNKNOWN
bool operator==(const cvmcache_hash &a, const cvmcache_hash &b)
static int ram_breadcrumb_store(const char *fqrn, const cvmcache_breadcrumb *breadcrumb)
uint64_t String2Uint64(const string &value)
int(* cvmcache_listing_begin)(uint64_t lst_id, enum cvmcache_object_type type)
void cvmcache_process_requests(struct cvmcache_context *ctx, unsigned nworkers)
int(* cvmcache_breadcrumb_store)(const char *fqrn, const cvmcache_breadcrumb *breadcrumb)
map< std::string, cvmcache_breadcrumb > breadcrumbs_
char * cvmcache_options_get(cvmcache_option_map *opts, const char *key)
int(* cvmcache_commit_txn)(uint64_t txn_id)
int(* cvmcache_start_txn)(struct cvmcache_hash *id, uint64_t txn_id, struct cvmcache_object_info *info)
int(* cvmcache_listing_end)(int64_t lst_id)
int(* cvmcache_abort_txn)(uint64_t txn_id)
ComparableHash(const struct cvmcache_hash &h)
static int ram_obj_info(struct cvmcache_hash *id, struct cvmcache_object_info *info)
static PluginRamCache * instance_
int(* cvmcache_obj_info)(struct cvmcache_hash *id, struct cvmcache_object_info *info)
uint32_t MurmurHash2(const void *key, int len, uint32_t seed)
cvmcache_option_map * cvmcache_options_init()
static int ram_breadcrumb_load(const char *fqrn, cvmcache_breadcrumb *breadcrumb)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)