CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_filestats.cc
Go to the documentation of this file.
1 
5 #include "swissknife_filestats.h"
6 
7 #include <cassert>
8 
9 #include "hash.h"
10 #include "logging.h"
11 #include "util/posix.h"
12 #include "util/string.h"
13 
14 using namespace std; // NOLINT
15 
16 namespace swissknife {
17 
18 ParameterList CommandFileStats::GetParams() const {
19  ParameterList r;
20  r.push_back(Parameter::Mandatory(
21  'r', "repository URL (absolute local path or remote URL)"));
22  r.push_back(Parameter::Mandatory('o', "output database file"));
23  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
24  r.push_back(Parameter::Optional('k', "repository master key(s) / dir"));
25  r.push_back(Parameter::Optional('l', "temporary directory"));
26  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
27  return r;
28 }
29 
30 int CommandFileStats::Main(const ArgumentList &args) {
31  shash::Any manual_root_hash;
32  const std::string &repo_url = *args.find('r')->second;
33  db_path_ = *args.find('o')->second;
34  const std::string &repo_name =
35  (args.count('n') > 0) ? *args.find('n')->second : "";
36  std::string repo_keys =
37  (args.count('k') > 0) ? *args.find('k')->second : "";
38  if (DirectoryExists(repo_keys))
39  repo_keys = JoinStrings(FindFilesBySuffix(repo_keys, ".pub"), ":");
40  const std::string &tmp_dir =
41  (args.count('l') > 0) ? *args.find('l')->second : "/tmp";
42  if (args.count('h') > 0) {
43  manual_root_hash = shash::MkFromHexPtr(shash::HexPtr(
44  *args.find('h')->second), shash::kSuffixCatalog);
45  }
46 
47  tmp_db_path_ = tmp_dir + "/cvmfs_filestats/";
48  atomic_init32(&num_downloaded_);
49 
50  bool success = false;
51  if (IsHttpUrl(repo_url)) {
52  const bool follow_redirects = false;
53  if (!this->InitDownloadManager(follow_redirects) ||
54  !this->InitVerifyingSignatureManager(repo_keys)) {
55  LogCvmfs(kLogCatalog, kLogStderr, "Failed to init remote connection");
56  return 1;
57  }
58 
60  history::SqliteHistory> fetcher(repo_name,
61  repo_url,
62  tmp_dir,
63  download_manager(),
64  signature_manager());
65  success = Run(&fetcher);
66  } else {
67  LocalObjectFetcher<> fetcher(repo_url, tmp_dir);
68  success = Run(&fetcher);
69  }
70 
71  return (success) ? 0 : 1;
72 }
73 
74 template <class ObjectFetcherT>
75 bool CommandFileStats::Run(ObjectFetcherT *object_fetcher)
76 {
77  atomic_init32(&finished_);
78 
79  string abs_path = GetAbsolutePath(db_path_);
80  unlink(abs_path.c_str());
81  db_ = FileStatsDatabase::Create(db_path_);
82  db_->InitStatements();
83 
84  assert(MkdirDeep(tmp_db_path_, 0755));
85 
87  params.object_fetcher = object_fetcher;
88  CatalogTraversal<ObjectFetcherT> traversal(params);
89  traversal.RegisterListener(&CommandFileStats::CatalogCallback, this);
90 
91  pthread_create(&thread_processing_, NULL, MainProcessing, this);
92 
93  bool ret = traversal.Traverse();
94 
95  atomic_inc32(&finished_);
96  pthread_join(thread_processing_, NULL);
97 
98  db_->DestroyStatements();
99 
100  return ret;
101 }
102 
103 void CommandFileStats::CatalogCallback(
105  int32_t num = atomic_read32(&num_downloaded_);
106  string out_path = tmp_db_path_ + StringifyInt(num + 1) + ".db";
107  assert(CopyPath2Path(data.catalog->database_path(), out_path));
108  atomic_inc32(&num_downloaded_);
109 }
110 
111 void *CommandFileStats::MainProcessing(void *data) {
112  CommandFileStats *repo_stats = static_cast<CommandFileStats *>(data);
113  int processed = 0;
114  int32_t downloaded = atomic_read32(&repo_stats->num_downloaded_);
115  int32_t fin = atomic_read32(&repo_stats->finished_);
116 
117  repo_stats->db_->BeginTransaction();
118  while (fin == 0 || processed < downloaded) {
119  if (processed < downloaded) {
120  LogCvmfs(kLogCatalog, kLogStdout, "Processing catalog %d", processed);
121  string db_path = repo_stats->tmp_db_path_ + "/" +
122  StringifyInt(processed + 1) + ".db";
123  repo_stats->ProcessCatalog(db_path);
124  ++processed;
125  }
126  downloaded = atomic_read32(&repo_stats->num_downloaded_);
127  fin = atomic_read32(&repo_stats->finished_);
128  }
129  repo_stats->db_->CommitTransaction();
130 
131  return NULL;
132 }
133 
134 
135 
136 void CommandFileStats::ProcessCatalog(string db_path) {
139  db_path,
141  cat_db->TakeFileOwnership();
142 
143  int64_t file_size = GetFileSize(db_path);
144  sqlite::Sql *catalog_count = new sqlite::Sql(cat_db->sqlite_db(),
145  "SELECT count(*) FROM catalog;");
146  catalog_count->Execute();
147  int cur_catalog_id = db_->StoreCatalog(catalog_count->RetrieveInt64(0),
148  file_size);
149  delete catalog_count;
150 
151  sqlite::Sql *catalog_list =
152  new sqlite::Sql(cat_db->sqlite_db(),
153  "SELECT hash, size, flags, symlink FROM catalog;");
154  sqlite::Sql *chunks_list =
155  new sqlite::Sql(cat_db->sqlite_db(),
156  "SELECT md5path_1, md5path_2, size, hash FROM chunks "
157  "ORDER BY md5path_1 ASC, md5path_2 ASC;");
158 
159  while (catalog_list->FetchRow()) {
160  const void *hash = catalog_list->RetrieveBlob(0);
161  int num_bytes = catalog_list->RetrieveBytes(0);
162  int64_t size = catalog_list->RetrieveInt64(1);
163  int flags = catalog_list->RetrieveInt(2);
164  if ((flags & catalog::SqlDirent::kFlagLink) ==
165  catalog::SqlDirent::kFlagLink) {
166  int symlink_length = catalog_list->RetrieveBytes(3);
167  db_->StoreSymlink(symlink_length);
168  } else if ((flags & catalog::SqlDirent::kFlagFile) ==
169  catalog::SqlDirent::kFlagFile)
170  {
171  if ((flags & catalog::SqlDirent::kFlagFileChunk) !=
172  catalog::SqlDirent::kFlagFileChunk)
173  {
174  int object_id = db_->StoreObject(hash, num_bytes, size);
175  db_->StoreFile(cur_catalog_id, object_id);
176  } else {
177  // Bulk hashes in addition to chunks
178  if (hash != NULL)
179  db_->StoreObject(hash, num_bytes, size);
180  }
181  }
182  }
183 
184  int old_md5path_1 = 0, old_md5path_2 = 0;
185  int md5path_1 = 0, md5path_2 = 0;
186  int cur_file_id = 0;
187  while (chunks_list->FetchRow()) {
188  md5path_1 = chunks_list->RetrieveInt(0);
189  md5path_2 = chunks_list->RetrieveInt(1);
190  if (md5path_1 != old_md5path_1 || md5path_2 != old_md5path_2) {
191  cur_file_id = db_->StoreChunkedFile(cur_catalog_id);
192  }
193  const void *hash = chunks_list->RetrieveBlob(3);
194  int num_bytes = chunks_list->RetrieveBytes(3);
195  int64_t size = chunks_list->RetrieveInt64(2);
196  db_->StoreChunk(hash, num_bytes, size, cur_file_id);
197  old_md5path_1 = md5path_1;
198  old_md5path_2 = md5path_2;
199  }
200 
201  delete catalog_list;
202  delete chunks_list;
203  delete cat_db;
204 }
205 
206 float FileStatsDatabase::kLatestSchema = 1;
207 unsigned FileStatsDatabase::kLatestSchemaRevision = 1;
208 
209 bool FileStatsDatabase::CreateEmptyDatabase() {
210  bool ret = true;
211  ret &= sqlite::Sql(sqlite_db(),
212  "CREATE TABLE catalogs ("
213  "catalog_id INTEGER PRIMARY KEY,"
214  "num_entries INTEGER,"
215  "file_size INTEGER"
216  ");").Execute();
217  ret &= sqlite::Sql(sqlite_db(),
218  "CREATE TABLE objects ("
219  "object_id INTEGER PRIMARY KEY,"
220  "hash BLOB,"
221  "size INTEGER"
222  ");").Execute();
223  ret &= sqlite::Sql(sqlite_db(),
224  "CREATE INDEX idx_object_hash "
225  "ON objects (hash);").Execute();
226  ret &= sqlite::Sql(sqlite_db(),
227  "CREATE TABLE files ("
228  "file_id INTEGER PRIMARY KEY,"
229  "catalog_id INTEGER,"
230  "FOREIGN KEY (catalog_id) REFERENCES catalogs (catalog_id)"
231  ");").Execute();
232  ret &= sqlite::Sql(sqlite_db(),
233  "CREATE TABLE files_objects ("
234  "file_id INTEGER,"
235  "object_id INTEGER,"
236  "FOREIGN KEY (file_id) REFERENCES files (file_id),"
237  "FOREIGN KEY (object_id) REFERENCES objects (object_id));").Execute();
238  ret &= sqlite::Sql(sqlite_db(),
239  "CREATE INDEX idx_file_id ON files_objects (file_id);").Execute();
240  ret &= sqlite::Sql(sqlite_db(),
241  "CREATE INDEX idx_object_id ON files_objects (object_id);").Execute();
242  ret &= sqlite::Sql(sqlite_db(),
243  "CREATE TABLE symlinks ("
244  "length INTEGER);").Execute();
245  return ret;
246 }
247 
248 void FileStatsDatabase::InitStatements() {
249  query_insert_catalog = new sqlite::Sql(sqlite_db(),
250  "INSERT INTO catalogs (num_entries, file_size) VALUES (:num, :size);");
251  query_insert_object = new sqlite::Sql(sqlite_db(),
252  "INSERT INTO objects (hash, size) VALUES (:hash, :size);");
253  query_insert_file = new sqlite::Sql(sqlite_db(),
254  "INSERT INTO files (catalog_id) VALUES (:catalog);");
255  query_insert_file_object = new sqlite::Sql(sqlite_db(),
256  "INSERT INTO files_objects (file_id, object_id) VALUES (:file, :object);");
257  query_insert_symlink = new sqlite::Sql(sqlite_db(),
258  "INSERT INTO symlinks (length) VALUES(:length);");
259  query_lookup_object = new sqlite::Sql(sqlite_db(),
260  "SELECT object_id FROM objects WHERE hash = :hash;");
261 }
262 
263 void FileStatsDatabase::DestroyStatements() {
264  delete query_insert_catalog;
265  delete query_insert_object;
266  delete query_insert_file;
267  delete query_insert_file_object;
268  delete query_insert_symlink;
269  delete query_lookup_object;
270 }
271 
272 int64_t FileStatsDatabase::StoreCatalog(int64_t num_entries,
273  int64_t file_size) {
274  query_insert_catalog->Reset();
275  query_insert_catalog->BindInt64(1, num_entries);
276  query_insert_catalog->BindInt64(2, file_size);
277  query_insert_catalog->Execute();
278  return sqlite3_last_insert_rowid(sqlite_db());
279 }
280 
281 int64_t FileStatsDatabase::StoreFile(int64_t catalog_id, int64_t object_id) {
282  query_insert_file->Reset();
283  query_insert_file->BindInt64(1, catalog_id);
284  query_insert_file->Execute();
285  int file_id = sqlite3_last_insert_rowid(sqlite_db());
286 
287  query_insert_file_object->Reset();
288  query_insert_file_object->BindInt64(1, file_id);
289  query_insert_file_object->BindInt64(2, object_id);
290  query_insert_file_object->Execute();
291  return file_id;
292 }
293 
294 int64_t FileStatsDatabase::StoreChunkedFile(int64_t catalog_id) {
295  query_insert_file->Reset();
296  query_insert_file->BindInt64(1, catalog_id);
297  query_insert_file->Execute();
298  return sqlite3_last_insert_rowid(sqlite_db());
299 }
300 
301 int64_t FileStatsDatabase::StoreChunk(const void *hash, int hash_size,
302  int64_t size, int64_t file_id) {
303  int object_id = StoreObject(hash, hash_size, size);
304 
305  query_insert_file_object->Reset();
306  query_insert_file_object->BindInt64(1, file_id);
307  query_insert_file_object->BindInt64(2, object_id);
308  query_insert_file_object->Execute();
309  return sqlite3_last_insert_rowid(sqlite_db());
310 }
311 
312 int64_t FileStatsDatabase::StoreObject(const void *hash, int hash_size,
313  int64_t size) {
314  query_lookup_object->Reset();
315  query_lookup_object->BindBlob(1, hash, hash_size);
316  if (query_lookup_object->FetchRow()) {
317  return query_lookup_object->RetrieveInt(0);
318  } else {
319  query_insert_object->Reset();
320  query_insert_object->BindBlob(1, hash, hash_size);
321  query_insert_object->BindInt64(2, size);
322  query_insert_object->Execute();
323  return sqlite3_last_insert_rowid(sqlite_db());
324  }
325 }
326 
327 int64_t FileStatsDatabase::StoreSymlink(int64_t length) {
328  query_insert_symlink->Reset();
329  query_insert_symlink->BindInt64(1, length);
330  query_insert_symlink->Execute();
331  return sqlite3_last_insert_rowid(sqlite_db());
332 }
333 
334 } // namespace swissknife
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
CallbackPtr RegisterListener(typename BoundClosure< CatalogTraversalData< ObjectFetcherT::CatalogTN >, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
bool Execute()
Definition: sql.cc:42
std::string database_path() const
Definition: catalog.h:184
bool FetchRow()
Definition: sql.cc:62
static Publisher * Create(const SettingsPublisher &settings)
const void * RetrieveBlob(const int idx_column) const
Definition: sql.h:436
std::vector< Parameter > ParameterList
Definition: swissknife.h:71
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:318
void TakeFileOwnership()
Definition: sql_impl.h:340
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:204
bool BeginTransaction() const
Definition: sql_impl.h:269
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
Definition: catalog_sql.h:186
static const int kFlagFile
Definition: catalog_sql.h:183
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:63
static DerivedT * Open(const std::string &filename, const OpenMode open_mode)
Definition: sql_impl.h:73
static const int kFlagLink
Definition: catalog_sql.h:184
std::string GetAbsolutePath(const std::string &path)
Definition: posix.cc:196
const char kSuffixCatalog
Definition: hash.h:53
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:871
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
sqlite3_int64 RetrieveInt64(const int idx_column) const
Definition: sql.h:445
string StringifyInt(const int64_t value)
Definition: string.cc:78
sqlite3 * sqlite_db() const
Definition: sql.h:147
bool CommitTransaction() const
Definition: sql_impl.h:276
bool DirectoryExists(const std::string &path)
Definition: posix.cc:838
bool Traverse(const TraversalType type=Base::kBreadthFirst)
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:826
int RetrieveInt(const int idx_column) const
Definition: sql.h:442
static void size_t size
Definition: smalloc.h:47
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1149
int RetrieveBytes(const int idx_column) const
Definition: sql.h:433