CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_filestats.cc
Go to the documentation of this file.
1 
5 #include "swissknife_filestats.h"
6 
7 #include <cassert>
8 
9 #include "crypto/hash.h"
10 #include "util/logging.h"
11 #include "util/posix.h"
12 #include "util/string.h"
13 
14 using namespace std; // NOLINT
15 
16 namespace swissknife {
17 
18 ParameterList CommandFileStats::GetParams() const {
19  ParameterList r;
20  r.push_back(Parameter::Mandatory(
21  'r', "repository URL (absolute local path or remote URL)"));
22  r.push_back(Parameter::Mandatory('o', "output database file"));
23  r.push_back(Parameter::Optional('n', "fully qualified repository name"));
24  r.push_back(Parameter::Optional('k', "repository master key(s) / dir"));
25  r.push_back(Parameter::Optional('l', "temporary directory"));
26  r.push_back(Parameter::Optional('h', "root hash (other than trunk)"));
27  r.push_back(Parameter::Optional('@', "proxy url"));
28  return r;
29 }
30 
31 int CommandFileStats::Main(const ArgumentList &args) {
32  shash::Any manual_root_hash;
33  const std::string &repo_url = *args.find('r')->second;
34  db_path_ = *args.find('o')->second;
35  const std::string &repo_name = (args.count('n') > 0) ? *args.find('n')->second
36  : "";
37  std::string repo_keys = (args.count('k') > 0) ? *args.find('k')->second : "";
38  if (DirectoryExists(repo_keys))
39  repo_keys = JoinStrings(FindFilesBySuffix(repo_keys, ".pub"), ":");
40  const std::string &tmp_dir = (args.count('l') > 0) ? *args.find('l')->second
41  : "/tmp";
42  if (args.count('h') > 0) {
43  manual_root_hash = shash::MkFromHexPtr(
44  shash::HexPtr(*args.find('h')->second), shash::kSuffixCatalog);
45  }
46 
47  tmp_db_path_ = tmp_dir + "/cvmfs_filestats/";
48  atomic_init32(&num_downloaded_);
49 
50  bool success = false;
51  if (IsHttpUrl(repo_url)) {
52  const bool follow_redirects = false;
53  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
54  if (!this->InitDownloadManager(follow_redirects, proxy)
55  || !this->InitSignatureManager(repo_keys)) {
56  LogCvmfs(kLogCatalog, kLogStderr, "Failed to init remote connection");
57  return 1;
58  }
59 
61  repo_name, repo_url, tmp_dir, download_manager(), signature_manager());
62  success = Run(&fetcher);
63  } else {
64  LocalObjectFetcher<> fetcher(repo_url, tmp_dir);
65  success = Run(&fetcher);
66  }
67 
68  return (success) ? 0 : 1;
69 }
70 
71 template<class ObjectFetcherT>
72 bool CommandFileStats::Run(ObjectFetcherT *object_fetcher) {
73  atomic_init32(&finished_);
74 
75  string abs_path = GetAbsolutePath(db_path_);
76  unlink(abs_path.c_str());
77  db_ = FileStatsDatabase::Create(db_path_);
78  db_->InitStatements();
79 
80  assert(MkdirDeep(tmp_db_path_, 0755));
81 
83  params.object_fetcher = object_fetcher;
84  CatalogTraversal<ObjectFetcherT> traversal(params);
85  traversal.RegisterListener(&CommandFileStats::CatalogCallback, this);
86 
87  pthread_create(&thread_processing_, NULL, MainProcessing, this);
88 
89  bool ret = traversal.Traverse();
90 
91  atomic_inc32(&finished_);
92  pthread_join(thread_processing_, NULL);
93 
94  db_->DestroyStatements();
95 
96  return ret;
97 }
98 
99 void CommandFileStats::CatalogCallback(
101  int32_t num = atomic_read32(&num_downloaded_);
102  string out_path = tmp_db_path_ + StringifyInt(num + 1) + ".db";
103  assert(CopyPath2Path(data.catalog->database_path(), out_path));
104  atomic_inc32(&num_downloaded_);
105 }
106 
107 void *CommandFileStats::MainProcessing(void *data) {
108  CommandFileStats *repo_stats = static_cast<CommandFileStats *>(data);
109  int processed = 0;
110  int32_t downloaded = atomic_read32(&repo_stats->num_downloaded_);
111  int32_t fin = atomic_read32(&repo_stats->finished_);
112 
113  repo_stats->db_->BeginTransaction();
114  while (fin == 0 || processed < downloaded) {
115  if (processed < downloaded) {
116  LogCvmfs(kLogCatalog, kLogStdout, "Processing catalog %d", processed);
117  string db_path = repo_stats->tmp_db_path_ + "/"
118  + StringifyInt(processed + 1) + ".db";
119  repo_stats->ProcessCatalog(db_path);
120  ++processed;
121  }
122  downloaded = atomic_read32(&repo_stats->num_downloaded_);
123  fin = atomic_read32(&repo_stats->finished_);
124  }
125  repo_stats->db_->CommitTransaction();
126 
127  return NULL;
128 }
129 
130 
131 void CommandFileStats::ProcessCatalog(string db_path) {
135  cat_db->TakeFileOwnership();
136 
137  int64_t file_size = GetFileSize(db_path);
138  sqlite::Sql *catalog_count = new sqlite::Sql(cat_db->sqlite_db(),
139  "SELECT count(*) FROM catalog;");
140  catalog_count->Execute();
141  int cur_catalog_id = db_->StoreCatalog(catalog_count->RetrieveInt64(0),
142  file_size);
143  delete catalog_count;
144 
145  sqlite::Sql *catalog_list = new sqlite::Sql(
146  cat_db->sqlite_db(), "SELECT hash, size, flags, symlink FROM catalog;");
147  sqlite::Sql *chunks_list = new sqlite::Sql(
148  cat_db->sqlite_db(),
149  "SELECT md5path_1, md5path_2, size, hash FROM chunks "
150  "ORDER BY md5path_1 ASC, md5path_2 ASC;");
151 
152  while (catalog_list->FetchRow()) {
153  const void *hash = catalog_list->RetrieveBlob(0);
154  int num_bytes = catalog_list->RetrieveBytes(0);
155  int64_t size = catalog_list->RetrieveInt64(1);
156  int flags = catalog_list->RetrieveInt(2);
157  if ((flags & catalog::SqlDirent::kFlagLink)
158  == catalog::SqlDirent::kFlagLink) {
159  int symlink_length = catalog_list->RetrieveBytes(3);
160  db_->StoreSymlink(symlink_length);
161  } else if ((flags & catalog::SqlDirent::kFlagFile)
162  == catalog::SqlDirent::kFlagFile) {
164  != catalog::SqlDirent::kFlagFileChunk) {
165  int object_id = db_->StoreObject(hash, num_bytes, size);
166  db_->StoreFile(cur_catalog_id, object_id);
167  } else {
168  // Bulk hashes in addition to chunks
169  if (hash != NULL)
170  db_->StoreObject(hash, num_bytes, size);
171  }
172  }
173  }
174 
175  int old_md5path_1 = 0, old_md5path_2 = 0;
176  int md5path_1 = 0, md5path_2 = 0;
177  int cur_file_id = 0;
178  while (chunks_list->FetchRow()) {
179  md5path_1 = chunks_list->RetrieveInt(0);
180  md5path_2 = chunks_list->RetrieveInt(1);
181  if (md5path_1 != old_md5path_1 || md5path_2 != old_md5path_2) {
182  cur_file_id = db_->StoreChunkedFile(cur_catalog_id);
183  }
184  const void *hash = chunks_list->RetrieveBlob(3);
185  int num_bytes = chunks_list->RetrieveBytes(3);
186  int64_t size = chunks_list->RetrieveInt64(2);
187  db_->StoreChunk(hash, num_bytes, size, cur_file_id);
188  old_md5path_1 = md5path_1;
189  old_md5path_2 = md5path_2;
190  }
191 
192  delete catalog_list;
193  delete chunks_list;
194  delete cat_db;
195 }
196 
197 float FileStatsDatabase::kLatestSchema = 1;
198 unsigned FileStatsDatabase::kLatestSchemaRevision = 1;
199 
200 bool FileStatsDatabase::CreateEmptyDatabase() {
201  bool ret = true;
202  ret &= sqlite::Sql(sqlite_db(),
203  "CREATE TABLE catalogs ("
204  "catalog_id INTEGER PRIMARY KEY,"
205  "num_entries INTEGER,"
206  "file_size INTEGER"
207  ");")
208  .Execute();
209  ret &= sqlite::Sql(sqlite_db(),
210  "CREATE TABLE objects ("
211  "object_id INTEGER PRIMARY KEY,"
212  "hash BLOB,"
213  "size INTEGER"
214  ");")
215  .Execute();
216  ret &= sqlite::Sql(sqlite_db(),
217  "CREATE INDEX idx_object_hash "
218  "ON objects (hash);")
219  .Execute();
220  ret &= sqlite::Sql(sqlite_db(),
221  "CREATE TABLE files ("
222  "file_id INTEGER PRIMARY KEY,"
223  "catalog_id INTEGER,"
224  "FOREIGN KEY (catalog_id) REFERENCES catalogs (catalog_id)"
225  ");")
226  .Execute();
227  ret &= sqlite::Sql(sqlite_db(),
228  "CREATE TABLE files_objects ("
229  "file_id INTEGER,"
230  "object_id INTEGER,"
231  "FOREIGN KEY (file_id) REFERENCES files (file_id),"
232  "FOREIGN KEY (object_id) REFERENCES objects (object_id));")
233  .Execute();
234  ret &= sqlite::Sql(sqlite_db(),
235  "CREATE INDEX idx_file_id ON files_objects (file_id);")
236  .Execute();
237  ret &= sqlite::Sql(sqlite_db(),
238  "CREATE INDEX idx_object_id ON files_objects (object_id);")
239  .Execute();
240  ret &= sqlite::Sql(sqlite_db(),
241  "CREATE TABLE symlinks ("
242  "length INTEGER);")
243  .Execute();
244  return ret;
245 }
246 
247 void FileStatsDatabase::InitStatements() {
248  query_insert_catalog = new sqlite::Sql(
249  sqlite_db(),
250  "INSERT INTO catalogs (num_entries, file_size) VALUES (:num, :size);");
251  query_insert_object = new sqlite::Sql(
252  sqlite_db(), "INSERT INTO objects (hash, size) VALUES (:hash, :size);");
253  query_insert_file = new sqlite::Sql(
254  sqlite_db(), "INSERT INTO files (catalog_id) VALUES (:catalog);");
255  query_insert_file_object = new sqlite::Sql(
256  sqlite_db(),
257  "INSERT INTO files_objects (file_id, object_id) VALUES (:file, "
258  ":object);");
259  query_insert_symlink = new sqlite::Sql(
260  sqlite_db(), "INSERT INTO symlinks (length) VALUES(:length);");
261  query_lookup_object = new sqlite::Sql(
262  sqlite_db(), "SELECT object_id FROM objects WHERE hash = :hash;");
263 }
264 
265 void FileStatsDatabase::DestroyStatements() {
266  delete query_insert_catalog;
267  delete query_insert_object;
268  delete query_insert_file;
269  delete query_insert_file_object;
270  delete query_insert_symlink;
271  delete query_lookup_object;
272 }
273 
274 int64_t FileStatsDatabase::StoreCatalog(int64_t num_entries,
275  int64_t file_size) {
276  query_insert_catalog->Reset();
277  query_insert_catalog->BindInt64(1, num_entries);
278  query_insert_catalog->BindInt64(2, file_size);
279  query_insert_catalog->Execute();
280  return sqlite3_last_insert_rowid(sqlite_db());
281 }
282 
283 int64_t FileStatsDatabase::StoreFile(int64_t catalog_id, int64_t object_id) {
284  query_insert_file->Reset();
285  query_insert_file->BindInt64(1, catalog_id);
286  query_insert_file->Execute();
287  int file_id = sqlite3_last_insert_rowid(sqlite_db());
288 
289  query_insert_file_object->Reset();
290  query_insert_file_object->BindInt64(1, file_id);
291  query_insert_file_object->BindInt64(2, object_id);
292  query_insert_file_object->Execute();
293  return file_id;
294 }
295 
296 int64_t FileStatsDatabase::StoreChunkedFile(int64_t catalog_id) {
297  query_insert_file->Reset();
298  query_insert_file->BindInt64(1, catalog_id);
299  query_insert_file->Execute();
300  return sqlite3_last_insert_rowid(sqlite_db());
301 }
302 
303 int64_t FileStatsDatabase::StoreChunk(const void *hash, int hash_size,
304  int64_t size, int64_t file_id) {
305  int object_id = StoreObject(hash, hash_size, size);
306 
307  query_insert_file_object->Reset();
308  query_insert_file_object->BindInt64(1, file_id);
309  query_insert_file_object->BindInt64(2, object_id);
310  query_insert_file_object->Execute();
311  return sqlite3_last_insert_rowid(sqlite_db());
312 }
313 
314 int64_t FileStatsDatabase::StoreObject(const void *hash, int hash_size,
315  int64_t size) {
316  query_lookup_object->Reset();
317  query_lookup_object->BindBlob(1, hash, hash_size);
318  if (query_lookup_object->FetchRow()) {
319  return query_lookup_object->RetrieveInt(0);
320  } else {
321  query_insert_object->Reset();
322  query_insert_object->BindBlob(1, hash, hash_size);
323  query_insert_object->BindInt64(2, size);
324  query_insert_object->Execute();
325  return sqlite3_last_insert_rowid(sqlite_db());
326  }
327 }
328 
329 int64_t FileStatsDatabase::StoreSymlink(int64_t length) {
330  query_insert_symlink->Reset();
331  query_insert_symlink->BindInt64(1, length);
332  query_insert_symlink->Execute();
333  return sqlite3_last_insert_rowid(sqlite_db());
334 }
335 
336 } // namespace swissknife
CallbackPtr RegisterListener(typename BoundClosure< CatalogTraversalData< ObjectFetcherT::CatalogTN >, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
bool Execute()
Definition: sql.cc:41
std::string database_path() const
Definition: catalog.h:180
bool FetchRow()
Definition: sql.cc:61
static Publisher * Create(const SettingsPublisher &settings)
const void * RetrieveBlob(const int idx_column) const
Definition: sql.h:431
std::vector< Parameter > ParameterList
Definition: swissknife.h:71
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:356
void TakeFileOwnership()
Definition: sql_impl.h:334
bool IsHttpUrl(const std::string &path)
Definition: posix.cc:167
bool BeginTransaction() const
Definition: sql_impl.h:268
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
Definition: catalog_sql.h:181
static const int kFlagFile
Definition: catalog_sql.h:178
bool CopyPath2Path(const string &src, const string &dest)
Definition: compression.cc:66
static DerivedT * Open(const std::string &filename, const OpenMode open_mode)
Definition: sql_impl.h:73
static const int kFlagLink
Definition: catalog_sql.h:179
std::string GetAbsolutePath(const std::string &path)
Definition: posix.cc:159
const char kSuffixCatalog
Definition: hash.h:54
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
Definition: posix.cc:855
sqlite3_int64 RetrieveInt64(const int idx_column) const
Definition: sql.h:445
string StringifyInt(const int64_t value)
Definition: string.cc:77
sqlite3 * sqlite_db() const
Definition: sql.h:145
bool CommitTransaction() const
Definition: sql_impl.h:274
bool DirectoryExists(const std::string &path)
Definition: posix.cc:824
bool Traverse(const TraversalType type=Base::kBreadthFirst)
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
int64_t GetFileSize(const std::string &path)
Definition: posix.cc:812
int RetrieveInt(const int idx_column) const
Definition: sql.h:442
static void size_t size
Definition: smalloc.h:54
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1128
int RetrieveBytes(const int idx_column) const
Definition: sql.h:428
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545