CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_filestats.cc
Go to the documentation of this file.
1 
5 #include "swissknife_filestats.h"
6 
7 #include <cassert>
8 
9 #include "crypto/hash.h"
10 #include "util/logging.h"
11 #include "util/posix.h"
12 #include "util/string.h"
13 
14 using namespace std; // NOLINT
15 
16 namespace swissknife {
17 
18 ParameterList CommandFileStats::GetParams() const {
19  ParameterList r;
20  r.push_back(Parameter::Mandatory(
21  'r', "repository URL (absolute local path or remote URL)"));
22  r.push_back(Parameter::Mandatory('o', "output database file"));
23  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
24  r.push_back(Parameter::Optional('k', "repository master key(s) / dir"));
25  r.push_back(Parameter::Optional('l', "temporary directory"));
26  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
27  r.push_back(Parameter::Optional('@', "proxy url"));
28  return r;
29 }
30 
31 int CommandFileStats::Main(const ArgumentList &args) {
32  shash::Any manual_root_hash;
33  const std::string &repo_url = *args.find('r')->second;
34  db_path_ = *args.find('o')->second;
35  const std::string &repo_name =
36  (args.count('n') > 0) ? *args.find('n')->second : "";
37  std::string repo_keys =
38  (args.count('k') > 0) ? *args.find('k')->second : "";
39  if (DirectoryExists(repo_keys))
40  repo_keys = JoinStrings(FindFilesBySuffix(repo_keys, ".pub"), ":");
41  const std::string &tmp_dir =
42  (args.count('l') > 0) ? *args.find('l')->second : "/tmp";
43  if (args.count('h') > 0) {
44  manual_root_hash = shash::MkFromHexPtr(shash::HexPtr(
45  *args.find('h')->second), shash::kSuffixCatalog);
46  }
47 
48  tmp_db_path_ = tmp_dir + "/cvmfs_filestats/";
49  atomic_init32(&num_downloaded_);
50 
51  bool success = false;
52  if (IsHttpUrl(repo_url)) {
53  const bool follow_redirects = false;
54  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
55  if (!this->InitDownloadManager(follow_redirects, proxy) ||
56  !this->InitVerifyingSignatureManager(repo_keys)) {
57  LogCvmfs(kLogCatalog, kLogStderr, "Failed to init remote connection");
58  return 1;
59  }
60 
62  history::SqliteHistory> fetcher(repo_name,
63  repo_url,
64  tmp_dir,
65  download_manager(),
66  signature_manager());
67  success = Run(&fetcher);
68  } else {
69  LocalObjectFetcher<> fetcher(repo_url, tmp_dir);
70  success = Run(&fetcher);
71  }
72 
73  return (success) ? 0 : 1;
74 }
75 
76 template <class ObjectFetcherT>
77 bool CommandFileStats::Run(ObjectFetcherT *object_fetcher)
78 {
79  atomic_init32(&finished_);
80 
81  string abs_path = GetAbsolutePath(db_path_);
82  unlink(abs_path.c_str());
83  db_ = FileStatsDatabase::Create(db_path_);
84  db_->InitStatements();
85 
86  assert(MkdirDeep(tmp_db_path_, 0755));
87 
89  params.object_fetcher = object_fetcher;
90  CatalogTraversal<ObjectFetcherT> traversal(params);
91  traversal.RegisterListener(&CommandFileStats::CatalogCallback, this);
92 
93  pthread_create(&thread_processing_, NULL, MainProcessing, this);
94 
95  bool ret = traversal.Traverse();
96 
97  atomic_inc32(&finished_);
98  pthread_join(thread_processing_, NULL);
99 
100  db_->DestroyStatements();
101 
102  return ret;
103 }
104 
105 void CommandFileStats::CatalogCallback(
107  int32_t num = atomic_read32(&num_downloaded_);
108  string out_path = tmp_db_path_ + StringifyInt(num + 1) + ".db";
109  assert(CopyPath2Path(data.catalog->database_path(), out_path));
110  atomic_inc32(&num_downloaded_);
111 }
112 
113 void *CommandFileStats::MainProcessing(void *data) {
114  CommandFileStats *repo_stats = static_cast<CommandFileStats *>(data);
115  int processed = 0;
116  int32_t downloaded = atomic_read32(&repo_stats->num_downloaded_);
117  int32_t fin = atomic_read32(&repo_stats->finished_);
118 
119  repo_stats->db_->BeginTransaction();
120  while (fin == 0 || processed < downloaded) {
121  if (processed < downloaded) {
122  LogCvmfs(kLogCatalog, kLogStdout, "Processing catalog %d", processed);
123  string db_path = repo_stats->tmp_db_path_ + "/" +
124  StringifyInt(processed + 1) + ".db";
125  repo_stats->ProcessCatalog(db_path);
126  ++processed;
127  }
128  downloaded = atomic_read32(&repo_stats->num_downloaded_);
129  fin = atomic_read32(&repo_stats->finished_);
130  }
131  repo_stats->db_->CommitTransaction();
132 
133  return NULL;
134 }
135 
136 
137 
138 void CommandFileStats::ProcessCatalog(string db_path) {
141  db_path,
143  cat_db->TakeFileOwnership();
144 
145  int64_t file_size = GetFileSize(db_path);
146  sqlite::Sql *catalog_count = new sqlite::Sql(cat_db->sqlite_db(),
147  "SELECT count(*) FROM catalog;");
148  catalog_count->Execute();
149  int cur_catalog_id = db_->StoreCatalog(catalog_count->RetrieveInt64(0),
150  file_size);
151  delete catalog_count;
152 
153  sqlite::Sql *catalog_list =
154  new sqlite::Sql(cat_db->sqlite_db(),
155  "SELECT hash, size, flags, symlink FROM catalog;");
156  sqlite::Sql *chunks_list =
157  new sqlite::Sql(cat_db->sqlite_db(),
158  "SELECT md5path_1, md5path_2, size, hash FROM chunks "
159  "ORDER BY md5path_1 ASC, md5path_2 ASC;");
160 
161  while (catalog_list->FetchRow()) {
162  const void *hash = catalog_list->RetrieveBlob(0);
163  int num_bytes = catalog_list->RetrieveBytes(0);
164  int64_t size = catalog_list->RetrieveInt64(1);
165  int flags = catalog_list->RetrieveInt(2);
166  if ((flags & catalog::SqlDirent::kFlagLink) ==
167  catalog::SqlDirent::kFlagLink) {
168  int symlink_length = catalog_list->RetrieveBytes(3);
169  db_->StoreSymlink(symlink_length);
170  } else if ((flags & catalog::SqlDirent::kFlagFile) ==
171  catalog::SqlDirent::kFlagFile)
172  {
173  if ((flags & catalog::SqlDirent::kFlagFileChunk) !=
174  catalog::SqlDirent::kFlagFileChunk)
175  {
176  int object_id = db_->StoreObject(hash, num_bytes, size);
177  db_->StoreFile(cur_catalog_id, object_id);
178  } else {
179  // Bulk hashes in addition to chunks
180  if (hash != NULL)
181  db_->StoreObject(hash, num_bytes, size);
182  }
183  }
184  }
185 
186  int old_md5path_1 = 0, old_md5path_2 = 0;
187  int md5path_1 = 0, md5path_2 = 0;
188  int cur_file_id = 0;
189  while (chunks_list->FetchRow()) {
190  md5path_1 = chunks_list->RetrieveInt(0);
191  md5path_2 = chunks_list->RetrieveInt(1);
192  if (md5path_1 != old_md5path_1 || md5path_2 != old_md5path_2) {
193  cur_file_id = db_->StoreChunkedFile(cur_catalog_id);
194  }
195  const void *hash = chunks_list->RetrieveBlob(3);
196  int num_bytes = chunks_list->RetrieveBytes(3);
197  int64_t size = chunks_list->RetrieveInt64(2);
198  db_->StoreChunk(hash, num_bytes, size, cur_file_id);
199  old_md5path_1 = md5path_1;
200  old_md5path_2 = md5path_2;
201  }
202 
203  delete catalog_list;
204  delete chunks_list;
205  delete cat_db;
206 }
207 
208 float FileStatsDatabase::kLatestSchema = 1;
209 unsigned FileStatsDatabase::kLatestSchemaRevision = 1;
210 
211 bool FileStatsDatabase::CreateEmptyDatabase() {
212  bool ret = true;
213  ret &= sqlite::Sql(sqlite_db(),
214  "CREATE TABLE catalogs ("
215  "catalog_id INTEGER PRIMARY KEY,"
216  "num_entries INTEGER,"
217  "file_size INTEGER"
218  ");").Execute();
219  ret &= sqlite::Sql(sqlite_db(),
220  "CREATE TABLE objects ("
221  "object_id INTEGER PRIMARY KEY,"
222  "hash BLOB,"
223  "size INTEGER"
224  ");").Execute();
225  ret &= sqlite::Sql(sqlite_db(),
226  "CREATE INDEX idx_object_hash "
227  "ON objects (hash);").Execute();
228  ret &= sqlite::Sql(sqlite_db(),
229  "CREATE TABLE files ("
230  "file_id INTEGER PRIMARY KEY,"
231  "catalog_id INTEGER,"
232  "FOREIGN KEY (catalog_id) REFERENCES catalogs (catalog_id)"
233  ");").Execute();
234  ret &= sqlite::Sql(sqlite_db(),
235  "CREATE TABLE files_objects ("
236  "file_id INTEGER,"
237  "object_id INTEGER,"
238  "FOREIGN KEY (file_id) REFERENCES files (file_id),"
239  "FOREIGN KEY (object_id) REFERENCES objects (object_id));").Execute();
240  ret &= sqlite::Sql(sqlite_db(),
241  "CREATE INDEX idx_file_id ON files_objects (file_id);").Execute();
242  ret &= sqlite::Sql(sqlite_db(),
243  "CREATE INDEX idx_object_id ON files_objects (object_id);").Execute();
244  ret &= sqlite::Sql(sqlite_db(),
245  "CREATE TABLE symlinks ("
246  "length INTEGER);").Execute();
247  return ret;
248 }
249 
250 void FileStatsDatabase::InitStatements() {
251  query_insert_catalog = new sqlite::Sql(sqlite_db(),
252  "INSERT INTO catalogs (num_entries, file_size) VALUES (:num, :size);");
253  query_insert_object = new sqlite::Sql(sqlite_db(),
254  "INSERT INTO objects (hash, size) VALUES (:hash, :size);");
255  query_insert_file = new sqlite::Sql(sqlite_db(),
256  "INSERT INTO files (catalog_id) VALUES (:catalog);");
257  query_insert_file_object = new sqlite::Sql(sqlite_db(),
258  "INSERT INTO files_objects (file_id, object_id) VALUES (:file, :object);");
259  query_insert_symlink = new sqlite::Sql(sqlite_db(),
260  "INSERT INTO symlinks (length) VALUES(:length);");
261  query_lookup_object = new sqlite::Sql(sqlite_db(),
262  "SELECT object_id FROM objects WHERE hash = :hash;");
263 }
264 
265 void FileStatsDatabase::DestroyStatements() {
266  delete query_insert_catalog;
267  delete query_insert_object;
268  delete query_insert_file;
269  delete query_insert_file_object;
270  delete query_insert_symlink;
271  delete query_lookup_object;
272 }
273 
274 int64_t FileStatsDatabase::StoreCatalog(int64_t num_entries,
275  int64_t file_size) {
276  query_insert_catalog->Reset();
277  query_insert_catalog->BindInt64(1, num_entries);
278  query_insert_catalog->BindInt64(2, file_size);
279  query_insert_catalog->Execute();
280  return sqlite3_last_insert_rowid(sqlite_db());
281 }
282 
283 int64_t FileStatsDatabase::StoreFile(int64_t catalog_id, int64_t object_id) {
284  query_insert_file->Reset();
285  query_insert_file->BindInt64(1, catalog_id);
286  query_insert_file->Execute();
287  int file_id = sqlite3_last_insert_rowid(sqlite_db());
288 
289  query_insert_file_object->Reset();
290  query_insert_file_object->BindInt64(1, file_id);
291  query_insert_file_object->BindInt64(2, object_id);
292  query_insert_file_object->Execute();
293  return file_id;
294 }
295 
296 int64_t FileStatsDatabase::StoreChunkedFile(int64_t catalog_id) {
297  query_insert_file->Reset();
298  query_insert_file->BindInt64(1, catalog_id);
299  query_insert_file->Execute();
300  return sqlite3_last_insert_rowid(sqlite_db());
301 }
302 
303 int64_t FileStatsDatabase::StoreChunk(const void *hash, int hash_size,
304  int64_t size, int64_t file_id) {
305  int object_id = StoreObject(hash, hash_size, size);
306 
307  query_insert_file_object->Reset();
308  query_insert_file_object->BindInt64(1, file_id);
309  query_insert_file_object->BindInt64(2, object_id);
310  query_insert_file_object->Execute();
311  return sqlite3_last_insert_rowid(sqlite_db());
312 }
313 
314 int64_t FileStatsDatabase::StoreObject(const void *hash, int hash_size,
315  int64_t size) {
316  query_lookup_object->Reset();
317  query_lookup_object->BindBlob(1, hash, hash_size);
318  if (query_lookup_object->FetchRow()) {
319  return query_lookup_object->RetrieveInt(0);
320  } else {
321  query_insert_object->Reset();
322  query_insert_object->BindBlob(1, hash, hash_size);
323  query_insert_object->BindInt64(2, size);
324  query_insert_object->Execute();
325  return sqlite3_last_insert_rowid(sqlite_db());
326  }
327 }
328 
329 int64_t FileStatsDatabase::StoreSymlink(int64_t length) {
330  query_insert_symlink->Reset();
331  query_insert_symlink->BindInt64(1, length);
332  query_insert_symlink->Execute();
333  return sqlite3_last_insert_rowid(sqlite_db());
334 }
335 
336 } // namespace swissknife
CallbackPtr RegisterListener(typename BoundClosure< CatalogTraversalData< ObjectFetcherT::CatalogTN >, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
bool Execute()
Definition: sql.cc:42
std::string database_path() const
Definition: catalog.h:184
bool FetchRow()
Definition: sql.cc:62
static Publisher * Create(const SettingsPublisher &settings)
const void * RetrieveBlob(const int idx_column) const
Definition: sql.h:436
std::vector< Parameter > ParameterList
Definition: swissknife.h:71
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:325
void TakeFileOwnership()
Definition: sql_impl.h:342
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:168
bool BeginTransaction() const
Definition: sql_impl.h:271
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
Definition: catalog_sql.h:186
static const int kFlagFile
Definition: catalog_sql.h:183
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:63
static DerivedT * Open(const std::string &filename, const OpenMode open_mode)
Definition: sql_impl.h:73
static const int kFlagLink
Definition: catalog_sql.h:184
std::string GetAbsolutePath(const std::string &path)
Definition: posix.cc:160
const char kSuffixCatalog
Definition: hash.h:54
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:846
sqlite3_int64 RetrieveInt64(const int idx_column) const
Definition: sql.h:445
string StringifyInt(const int64_t value)
Definition: string.cc:78
sqlite3 * sqlite_db() const
Definition: sql.h:147
bool CommitTransaction() const
Definition: sql_impl.h:278
bool DirectoryExists(const std::string &path)
Definition: posix.cc:813
bool Traverse(const TraversalType type=Base::kBreadthFirst)
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:83
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:801
int RetrieveInt(const int idx_column) const
Definition: sql.h:442
static void size_t size
Definition: smalloc.h:54
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1124
int RetrieveBytes(const int idx_column) const
Definition: sql.h:433
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528