CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
nfs_maps_leveldb.cc
Go to the documentation of this file.
1 
17 #include "nfs_maps_leveldb.h"
18 
19 #include <unistd.h>
20 
21 #include <cassert>
22 #include <cstddef>
23 
24 #include "leveldb/cache.h"
25 #include "leveldb/db.h"
26 #include "leveldb/filter_policy.h"
27 #include "statistics.h"
28 #include "util/concurrency.h"
29 #include "util/exception.h"
30 #include "util/logging.h"
31 #include "util/pointer.h"
32 #include "util/posix.h"
33 #include "util/smalloc.h"
34 
35 using namespace std; // NOLINT
36 
37 
39  : leveldb::EnvWrapper(leveldb::Env::Default()), maps_(maps) {
40  atomic_init32(&num_bg_threads_);
41 }
42 
43 
44 void NfsMapsLeveldb::ForkAwareEnv::StartThread(void (*f)(void *), void *a) {
45  if (maps_->spawned_) {
46  leveldb::Env::Default()->StartThread(f, a);
47  return;
48  }
50  "single threaded leveldb::StartThread called");
51  // Unclear how to handle this because caller assumes that thread is started
52 }
53 
54 
55 void NfsMapsLeveldb::ForkAwareEnv::Schedule(void (*function)(void *),
56  void *arg) {
57  if (maps_->spawned_) {
58  leveldb::Env::Default()->Schedule(function, arg);
59  return;
60  }
61  LogCvmfs(kLogNfsMaps, kLogDebug, "single threaded leveldb::Schedule called");
62  FuncArg *funcarg = new FuncArg();
63  funcarg->function = function;
64  funcarg->arg = arg;
65  funcarg->env = this;
66  atomic_inc32(&num_bg_threads_);
67  pthread_t bg_thread;
68  int retval = pthread_create(&bg_thread, NULL, MainFakeThread, funcarg);
69  assert(retval == 0);
70  retval = pthread_detach(bg_thread);
71  assert(retval == 0);
72 }
73 
74 
76  while (atomic_read32(&num_bg_threads_) > 0)
77  SafeSleepMs(100);
78 }
79 
80 
85  SafeSleepMs(micros / 1000);
86 }
87 
88 
90  FuncArg *funcarg = reinterpret_cast<FuncArg *>(data);
91  funcarg->function(funcarg->arg);
92  atomic_dec32(&(funcarg->env->num_bg_threads_));
93  delete funcarg;
94  return NULL;
95 }
96 
97 
98 //------------------------------------------------------------------------------
99 
100 
101 NfsMapsLeveldb *NfsMapsLeveldb::Create(const string &leveldb_dir,
102  const uint64_t root_inode,
103  const bool rebuild,
104  perf::Statistics *statistics) {
105  assert(root_inode > 0);
107  maps->n_db_added_ = statistics->Register("nfs.leveldb.n_added",
108  "total number of issued inode");
109 
110  maps->root_inode_ = root_inode;
111  maps->fork_aware_env_ = new ForkAwareEnv(maps.weak_ref());
112  leveldb::Status status;
113  leveldb::Options leveldb_options;
114  leveldb_options.create_if_missing = true;
115  leveldb_options.env = maps->fork_aware_env_;
116 
117  // Remove previous database traces
118  if (rebuild) {
120  "rebuilding NFS maps, might result in stale entries");
121  const bool retval = RemoveTree(leveldb_dir + "/inode2path") &&
122  RemoveTree(leveldb_dir + "/path2inode");
123  if (!retval) {
124  LogCvmfs(kLogNfsMaps, kLogDebug, "failed to remove previous databases");
125  return NULL;
126  }
127  }
128 
129  // Open databases
130  maps->cache_inode2path_ = leveldb::NewLRUCache(32 * 1024 * 1024);
131  leveldb_options.block_cache = maps->cache_inode2path_;
132  maps->filter_inode2path_ = leveldb::NewBloomFilterPolicy(10);
133  leveldb_options.filter_policy = maps->filter_inode2path_;
134  status = leveldb::DB::Open(leveldb_options, leveldb_dir + "/inode2path",
135  &maps->db_inode2path_);
136  if (!status.ok()) {
137  LogCvmfs(kLogNfsMaps, kLogDebug, "failed to create inode2path db: %s",
138  status.ToString().c_str());
139  return NULL;
140  }
141  LogCvmfs(kLogNfsMaps, kLogDebug, "inode2path opened");
142 
143  // Hashes and inodes, no compression here
144  leveldb_options.compression = leveldb::kNoCompression;
145  // Random order, small block size to not trash caches
146  leveldb_options.block_size = 512;
147  maps->cache_path2inode_ = leveldb::NewLRUCache(8 * 1024 * 1024);
148  leveldb_options.block_cache = maps->cache_path2inode_;
149  maps->filter_path2inode_ = leveldb::NewBloomFilterPolicy(10);
150  leveldb_options.filter_policy = maps->filter_path2inode_;
151  status = leveldb::DB::Open(leveldb_options, leveldb_dir + "/path2inode",
152  &maps->db_path2inode_);
153  if (!status.ok()) {
154  LogCvmfs(kLogNfsMaps, kLogDebug, "failed to create path2inode db: %s",
155  status.ToString().c_str());
156  return NULL;
157  }
158  LogCvmfs(kLogNfsMaps, kLogDebug, "path2inode opened");
159 
160  // Fetch highest issued inode
161  maps->seq_ = maps->FindInode(shash::Md5(shash::AsciiPtr("?seq")));
162  LogCvmfs(kLogNfsMaps, kLogDebug, "Sequence number is %" PRIu64, maps->seq_);
163  if (maps->seq_ == 0) {
164  maps->seq_ = maps->root_inode_;
165  // Insert root inode
166  const PathString root_path;
167  maps->GetInode(root_path);
168  }
169 
171 
172  return maps.Release();
173 }
174 
175 void NfsMapsLeveldb::SetInodeResidue(unsigned residue_class,
176  unsigned remainder) {
177  const MutexLockGuard lock_guard(lock_);
178  if (residue_class < 2) {
180  inode_remainder_ = 0;
181  } else {
182  inode_residue_class_ = residue_class;
183  inode_remainder_ = remainder % residue_class;
186  }
187 }
188 
189 
190 uint64_t NfsMapsLeveldb::FindInode(const shash::Md5 &path) {
191  leveldb::Status status;
192  const leveldb::Slice key(reinterpret_cast<const char *>(path.digest),
193  path.GetDigestSize());
194  string result;
195 
196  status = db_path2inode_->Get(leveldb::ReadOptions(), key, &result);
197  if (!status.ok() && !status.IsNotFound()) {
198  PANIC(kLogSyslogErr, "failed to read from path2inode db (path %s): %s",
199  path.ToString().c_str(), status.ToString().c_str());
200  }
201 
202  if (status.IsNotFound()) {
203  LogCvmfs(kLogNfsMaps, kLogDebug, "path %s not found",
204  path.ToString().c_str());
205  return 0;
206  } else {
207  const uint64_t *inode = reinterpret_cast<const uint64_t *>(result.data());
208  LogCvmfs(kLogNfsMaps, kLogDebug, "path %s maps to inode %" PRIu64,
209  path.ToString().c_str(), *inode);
210  return *inode;
211  }
212 }
213 
214 
215 uint64_t NfsMapsLeveldb::GetInode(const PathString &path) {
216  const shash::Md5 md5_path(path.GetChars(), path.GetLength());
217  uint64_t inode = FindInode(md5_path);
218  if (inode != 0)
219  return inode;
220 
221  const MutexLockGuard m(lock_);
222  // Search again to avoid race
223  inode = FindInode(md5_path);
224  if (inode != 0) {
225  return inode;
226  }
227 
228  // Issue new inode
229  inode = seq_;
231  PutPath2Inode(md5_path, inode);
232  PutInode2Path(inode, path);
234  return inode;
235 }
236 
237 
244 bool NfsMapsLeveldb::GetPath(const uint64_t inode, PathString *path) {
245  leveldb::Status status;
246  const leveldb::Slice key(reinterpret_cast<const char *>(&inode),
247  sizeof(inode));
248  string result;
249 
250  status = db_inode2path_->Get(leveldb::ReadOptions(), key, &result);
251  if (status.IsNotFound()) {
253  "failed to find inode %" PRIu64 " in NFS maps, returning ESTALE",
254  inode);
255  return false;
256  }
257  if (!status.ok()) {
259  "failed to read from inode2path db inode %" PRIu64 ": %s", inode,
260  status.ToString().c_str());
261  }
262 
263  path->Assign(result.data(), result.length());
264  LogCvmfs(kLogNfsMaps, kLogDebug, "inode %" PRIu64 " maps to path %s", inode,
265  path->c_str());
266  return true;
267 }
268 
269 
271  string stats;
272 
273  db_inode2path_->GetProperty(leveldb::Slice("leveldb.stats"), &stats);
274  stats += "inode --> path database:\n" + stats + "\n";
275 
276  db_path2inode_->GetProperty(leveldb::Slice("leveldb.stats"), &stats);
277  stats += "path --> inode database:\n" + stats + "\n";
278 
279  return stats;
280 }
281 
282 
284  : db_inode2path_(NULL)
285  , db_path2inode_(NULL)
286  , cache_inode2path_(NULL)
287  , cache_path2inode_(NULL)
288  , filter_inode2path_(NULL)
289  , filter_path2inode_(NULL)
290  , fork_aware_env_(NULL)
291  , root_inode_(0)
292  , seq_(0)
293  , lock_(NULL)
294  , spawned_(false)
296  , inode_remainder_(0)
297  , n_db_added_(NULL) {
298  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
299  const int retval = pthread_mutex_init(lock_, NULL);
300  assert(retval == 0);
301 }
302 
303 
306 
307  delete db_path2inode_;
308  delete cache_path2inode_;
309  delete filter_path2inode_;
310  LogCvmfs(kLogNfsMaps, kLogDebug, "path2inode closed");
311  delete db_inode2path_;
312  delete cache_inode2path_;
313  delete filter_inode2path_;
314  LogCvmfs(kLogNfsMaps, kLogDebug, "inode2path closed");
315  delete fork_aware_env_;
316  pthread_mutex_destroy(lock_);
317  free(lock_);
318 }
319 
320 
321 void NfsMapsLeveldb::PutInode2Path(const uint64_t inode,
322  const PathString &path) {
323  leveldb::Status status;
324  const leveldb::Slice key(reinterpret_cast<const char *>(&inode),
325  sizeof(inode));
326  const leveldb::Slice value(path.GetChars(), path.GetLength());
327 
328  status = db_inode2path_->Put(leveldb::WriteOptions(), key, value);
329  if (!status.ok()) {
331  "failed to write inode2path entry (%" PRIu64 " --> %s): %s", inode,
332  path.c_str(), status.ToString().c_str());
333  }
334  LogCvmfs(kLogNfsMaps, kLogDebug, "stored inode %" PRIu64 " --> path %s",
335  inode, path.c_str());
336 }
337 
338 
340  const uint64_t inode) {
341  leveldb::Status status;
342  const leveldb::Slice key(reinterpret_cast<const char *>(path.digest),
343  path.GetDigestSize());
344  const leveldb::Slice value(reinterpret_cast<const char *>(&inode),
345  sizeof(inode));
346 
347  status = db_path2inode_->Put(leveldb::WriteOptions(), key, value);
348  if (!status.ok()) {
350  "failed to write path2inode entry (%s --> %" PRIu64 "): %s",
351  path.ToString().c_str(), inode, status.ToString().c_str());
352  }
353  LogCvmfs(kLogNfsMaps, kLogDebug, "stored path %s --> inode %" PRIu64,
354  path.ToString().c_str(), inode);
355 }
Counter * Register(const std::string &name, const std::string &desc)
Definition: statistics.cc:163
virtual bool GetPath(const uint64_t inode, PathString *path)
void(* function)(void *)
pthread_mutex_t * lock_
virtual void SetInodeResidue(unsigned residue_class, unsigned remainder)
const leveldb::FilterPolicy * filter_inode2path_
#define PANIC(...)
Definition: exception.h:29
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
void Assign(const char *chars, const unsigned length)
Definition: shortstring.h:61
void Open()
uint64_t root_inode_
static NfsMapsLeveldb * Create(const std::string &leveldb_dir, const uint64_t root_inode, const bool rebuild, perf::Statistics *statistics)
uint64_t FindInode(const shash::Md5 &path)
virtual uint64_t GetInode(const PathString &path)
perf::Counter * n_db_added_
assert((mem||(size==0))&&"Out Of Memory")
leveldb::DB * db_path2inode_
unsigned char digest[digest_size_]
Definition: hash.h:121
unsigned inode_residue_class_
void PutInode2Path(const uint64_t inode, const PathString &path)
ForkAwareEnv(NfsMapsLeveldb *maps)
static void * MainFakeThread(void *data)
unsigned GetDigestSize() const
Definition: hash.h:164
virtual std::string GetStatistics()
leveldb::DB * db_inode2path_
leveldb::Cache * cache_path2inode_
unsigned inode_remainder_
void Schedule(void(*function)(void *), void *arg)
void PutPath2Inode(const shash::Md5 &path, const uint64_t inode)
virtual ~NfsMapsLeveldb()
void StartThread(void(*f)(void *), void *a)
void Inc(class Counter *counter)
Definition: statistics.h:50
bool RemoveTree(const std::string &path)
Definition: posix.cc:1101
void SleepForMicroseconds(int micros)
Definition: mutex.h:42
const leveldb::FilterPolicy * filter_path2inode_
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2025
unsigned GetLength() const
Definition: shortstring.h:131
ForkAwareEnv * fork_aware_env_
const char * c_str() const
Definition: shortstring.h:143
const char * GetChars() const
Definition: shortstring.h:123
leveldb::Cache * cache_inode2path_
class ForkAwareEnv * env
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545