CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
nfs_maps_leveldb.cc
Go to the documentation of this file.
1 
17 #include "nfs_maps_leveldb.h"
18 
19 #include <unistd.h>
20 
21 #include <cassert>
22 #include <cstddef>
23 
24 #include "leveldb/cache.h"
25 #include "leveldb/db.h"
26 #include "leveldb/filter_policy.h"
27 #include "logging.h"
28 #include "smalloc.h"
29 #include "statistics.h"
30 #include "util/exception.h"
31 #include "util/pointer.h"
32 #include "util/posix.h"
33 #include "util_concurrency.h"
34 
35 using namespace std; // NOLINT
36 
37 
39  : leveldb::EnvWrapper(leveldb::Env::Default())
40  , maps_(maps)
41 {
42  atomic_init32(&num_bg_threads_);
43 }
44 
45 
46 void NfsMapsLeveldb::ForkAwareEnv::StartThread(void (*f)(void*), void* a) {
47  if (maps_->spawned_) {
48  leveldb::Env::Default()->StartThread(f, a);
49  return;
50  }
52  "single threaded leveldb::StartThread called");
53  // Unclear how to handle this because caller assumes that thread is started
54 }
55 
56 
57 void NfsMapsLeveldb::ForkAwareEnv::Schedule(void (*function)(void*), void* arg)
58 {
59  if (maps_->spawned_) {
60  leveldb::Env::Default()->Schedule(function, arg);
61  return;
62  }
64  "single threaded leveldb::Schedule called");
65  FuncArg *funcarg = new FuncArg();
66  funcarg->function = function;
67  funcarg->arg = arg;
68  funcarg->env = this;
69  atomic_inc32(&num_bg_threads_);
70  pthread_t bg_thread;
71  int retval = pthread_create(&bg_thread, NULL, MainFakeThread, funcarg);
72  assert(retval == 0);
73  retval = pthread_detach(bg_thread);
74  assert(retval == 0);
75 }
76 
77 
79  while (atomic_read32(&num_bg_threads_) > 0)
80  SafeSleepMs(100);
81 }
82 
83 
88  SafeSleepMs(micros/1000);
89 }
90 
91 
93  FuncArg *funcarg = reinterpret_cast<FuncArg *>(data);
94  funcarg->function(funcarg->arg);
95  atomic_dec32(&(funcarg->env->num_bg_threads_));
96  delete funcarg;
97  return NULL;
98 }
99 
100 
101 //------------------------------------------------------------------------------
102 
103 
105  const string &leveldb_dir,
106  const uint64_t root_inode,
107  const bool rebuild,
108  perf::Statistics *statistics)
109 {
110  assert(root_inode > 0);
112  maps->n_db_added_ = statistics->Register(
113  "nfs.leveldb.n_added", "total number of issued inode");
114 
115  maps->root_inode_ = root_inode;
116  maps->fork_aware_env_ = new ForkAwareEnv(maps.weak_ref());
117  leveldb::Status status;
118  leveldb::Options leveldb_options;
119  leveldb_options.create_if_missing = true;
120  leveldb_options.env = maps->fork_aware_env_;
121 
122  // Remove previous database traces
123  if (rebuild) {
125  "rebuilding NFS maps, might result in stale entries");
126  bool retval = RemoveTree(leveldb_dir + "/inode2path") &&
127  RemoveTree(leveldb_dir + "/path2inode");
128  if (!retval) {
129  LogCvmfs(kLogNfsMaps, kLogDebug, "failed to remove previous databases");
130  return NULL;
131  }
132  }
133 
134  // Open databases
135  maps->cache_inode2path_ = leveldb::NewLRUCache(32 * 1024*1024);
136  leveldb_options.block_cache = maps->cache_inode2path_;
137  maps->filter_inode2path_ = leveldb::NewBloomFilterPolicy(10);
138  leveldb_options.filter_policy = maps->filter_inode2path_;
139  status = leveldb::DB::Open(leveldb_options, leveldb_dir + "/inode2path",
140  &maps->db_inode2path_);
141  if (!status.ok()) {
142  LogCvmfs(kLogNfsMaps, kLogDebug, "failed to create inode2path db: %s",
143  status.ToString().c_str());
144  return NULL;
145  }
146  LogCvmfs(kLogNfsMaps, kLogDebug, "inode2path opened");
147 
148  // Hashes and inodes, no compression here
149  leveldb_options.compression = leveldb::kNoCompression;
150  // Random order, small block size to not trash caches
151  leveldb_options.block_size = 512;
152  maps->cache_path2inode_ = leveldb::NewLRUCache(8 * 1024*1024);
153  leveldb_options.block_cache = maps->cache_path2inode_;
154  maps->filter_path2inode_ = leveldb::NewBloomFilterPolicy(10);
155  leveldb_options.filter_policy = maps->filter_path2inode_;
156  status = leveldb::DB::Open(leveldb_options, leveldb_dir + "/path2inode",
157  &maps->db_path2inode_);
158  if (!status.ok()) {
159  LogCvmfs(kLogNfsMaps, kLogDebug, "failed to create path2inode db: %s",
160  status.ToString().c_str());
161  return NULL;
162  }
163  LogCvmfs(kLogNfsMaps, kLogDebug, "path2inode opened");
164 
165  // Fetch highest issued inode
166  maps->seq_ = maps->FindInode(shash::Md5(shash::AsciiPtr("?seq")));
167  LogCvmfs(kLogNfsMaps, kLogDebug, "Sequence number is %" PRIu64, maps->seq_);
168  if (maps->seq_ == 0) {
169  maps->seq_ = maps->root_inode_;
170  // Insert root inode
171  PathString root_path;
172  maps->GetInode(root_path);
173  }
174 
176 
177  return maps.Release();
178 }
179 
180 void NfsMapsLeveldb::SetInodeResidue(unsigned residue_class, unsigned remainder)
181 {
182  MutexLockGuard lock_guard(lock_);
183  if (residue_class < 2) {
185  inode_remainder_ = 0;
186  } else {
187  inode_residue_class_ = residue_class;
188  inode_remainder_ = remainder % residue_class;
189  seq_ = ((seq_ / inode_residue_class_) + 1)
191  }
192 }
193 
194 
195 uint64_t NfsMapsLeveldb::FindInode(const shash::Md5 &path) {
196  leveldb::Status status;
197  leveldb::Slice key(reinterpret_cast<const char *>(path.digest),
198  path.GetDigestSize());
199  string result;
200 
201  status = db_path2inode_->Get(leveldb::ReadOptions(), key, &result);
202  if (!status.ok() && !status.IsNotFound()) {
203  PANIC(kLogSyslogErr, "failed to read from path2inode db (path %s): %s",
204  path.ToString().c_str(), status.ToString().c_str());
205  }
206 
207  if (status.IsNotFound()) {
208  LogCvmfs(kLogNfsMaps, kLogDebug, "path %s not found",
209  path.ToString().c_str());
210  return 0;
211  } else {
212  const uint64_t *inode = reinterpret_cast<const uint64_t *>(result.data());
213  LogCvmfs(kLogNfsMaps, kLogDebug, "path %s maps to inode %" PRIu64,
214  path.ToString().c_str(), *inode);
215  return *inode;
216  }
217 }
218 
219 
220 uint64_t NfsMapsLeveldb::GetInode(const PathString &path) {
221  const shash::Md5 md5_path(path.GetChars(), path.GetLength());
222  uint64_t inode = FindInode(md5_path);
223  if (inode != 0)
224  return inode;
225 
227  // Search again to avoid race
228  inode = FindInode(md5_path);
229  if (inode != 0) {
230  return inode;
231  }
232 
233  // Issue new inode
234  inode = seq_;
236  PutPath2Inode(md5_path, inode);
237  PutInode2Path(inode, path);
239  return inode;
240 }
241 
242 
249 bool NfsMapsLeveldb::GetPath(const uint64_t inode, PathString *path) {
250  leveldb::Status status;
251  leveldb::Slice key(reinterpret_cast<const char *>(&inode), sizeof(inode));
252  string result;
253 
254  status = db_inode2path_->Get(leveldb::ReadOptions(), key, &result);
255  if (status.IsNotFound()) {
257  "failed to find inode %" PRIu64 " in NFS maps, returning ESTALE",
258  inode);
259  return false;
260  }
261  if (!status.ok()) {
263  "failed to read from inode2path db inode %" PRIu64 ": %s", inode,
264  status.ToString().c_str());
265  }
266 
267  path->Assign(result.data(), result.length());
268  LogCvmfs(kLogNfsMaps, kLogDebug, "inode %" PRIu64 " maps to path %s",
269  inode, path->c_str());
270  return true;
271 }
272 
273 
275  string stats;
276 
277  db_inode2path_->GetProperty(leveldb::Slice("leveldb.stats"), &stats);
278  stats += "inode --> path database:\n" + stats + "\n";
279 
280  db_path2inode_->GetProperty(leveldb::Slice("leveldb.stats"), &stats);
281  stats += "path --> inode database:\n" + stats + "\n";
282 
283  return stats;
284 }
285 
286 
288  : db_inode2path_(NULL)
289  , db_path2inode_(NULL)
290  , cache_inode2path_(NULL)
291  , cache_path2inode_(NULL)
292  , filter_inode2path_(NULL)
293  , filter_path2inode_(NULL)
294  , fork_aware_env_(NULL)
295  , root_inode_(0)
296  , seq_(0)
297  , lock_(NULL)
298  , spawned_(false)
300  , inode_remainder_(0)
301  , n_db_added_(NULL)
302 {
303  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
304  int retval = pthread_mutex_init(lock_, NULL);
305  assert(retval == 0);
306 }
307 
308 
311 
312  delete db_path2inode_;
313  delete cache_path2inode_;
314  delete filter_path2inode_;
315  LogCvmfs(kLogNfsMaps, kLogDebug, "path2inode closed");
316  delete db_inode2path_;
317  delete cache_inode2path_;
318  delete filter_inode2path_;
319  LogCvmfs(kLogNfsMaps, kLogDebug, "inode2path closed");
320  delete fork_aware_env_;
321  pthread_mutex_destroy(lock_);
322  free(lock_);
323 }
324 
325 
327  const uint64_t inode,
328  const PathString &path)
329 {
330  leveldb::Status status;
331  leveldb::Slice key(reinterpret_cast<const char *>(&inode), sizeof(inode));
332  leveldb::Slice value(path.GetChars(), path.GetLength());
333 
334  status = db_inode2path_->Put(leveldb::WriteOptions(), key, value);
335  if (!status.ok()) {
337  "failed to write inode2path entry (%" PRIu64 " --> %s): %s", inode,
338  path.c_str(), status.ToString().c_str());
339  }
340  LogCvmfs(kLogNfsMaps, kLogDebug, "stored inode %" PRIu64 " --> path %s",
341  inode, path.c_str());
342 }
343 
344 
346  const shash::Md5 &path,
347  const uint64_t inode)
348 {
349  leveldb::Status status;
350  leveldb::Slice key(reinterpret_cast<const char *>(path.digest),
351  path.GetDigestSize());
352  leveldb::Slice value(reinterpret_cast<const char *>(&inode), sizeof(inode));
353 
354  status = db_path2inode_->Put(leveldb::WriteOptions(), key, value);
355  if (!status.ok()) {
357  "failed to write path2inode entry (%s --> %" PRIu64 "): %s",
358  path.ToString().c_str(), inode, status.ToString().c_str());
359  }
360  LogCvmfs(kLogNfsMaps, kLogDebug, "stored path %s --> inode %" PRIu64,
361  path.ToString().c_str(), inode);
362 }
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
Counter * Register(const std::string &name, const std::string &desc)
Definition: statistics.cc:139
virtual bool GetPath(const uint64_t inode, PathString *path)
void(* function)(void *)
pthread_mutex_t * lock_
virtual void SetInodeResidue(unsigned residue_class, unsigned remainder)
const leveldb::FilterPolicy * filter_inode2path_
#define PANIC(...)
Definition: exception.h:26
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
void Assign(const char *chars, const unsigned length)
Definition: shortstring.h:53
void Open()
uint64_t root_inode_
static NfsMapsLeveldb * Create(const std::string &leveldb_dir, const uint64_t root_inode, const bool rebuild, perf::Statistics *statistics)
uint64_t FindInode(const shash::Md5 &path)
virtual uint64_t GetInode(const PathString &path)
perf::Counter * n_db_added_
assert((mem||(size==0))&&"Out Of Memory")
leveldb::DB * db_path2inode_
unsigned char digest[digest_size_]
Definition: hash.h:123
unsigned inode_residue_class_
void PutInode2Path(const uint64_t inode, const PathString &path)
ForkAwareEnv(NfsMapsLeveldb *maps)
static void * MainFakeThread(void *data)
unsigned GetDigestSize() const
Definition: hash.h:167
virtual std::string GetStatistics()
leveldb::DB * db_inode2path_
leveldb::Cache * cache_path2inode_
unsigned inode_remainder_
void Schedule(void(*function)(void *), void *arg)
void PutPath2Inode(const shash::Md5 &path, const uint64_t inode)
virtual ~NfsMapsLeveldb()
void StartThread(void(*f)(void *), void *a)
void Inc(class Counter *counter)
Definition: statistics.h:50
bool RemoveTree(const std::string &path)
Definition: posix.cc:1120
void SleepForMicroseconds(int micros)
Definition: mutex.h:42
const leveldb::FilterPolicy * filter_path2inode_
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1918
unsigned GetLength() const
Definition: shortstring.h:104
ForkAwareEnv * fork_aware_env_
const char * c_str() const
Definition: shortstring.h:118
const char * GetChars() const
Definition: shortstring.h:96
leveldb::Cache * cache_inode2path_
class ForkAwareEnv * env