16 namespace swissknife {
20 r.push_back(Parameter::Mandatory(
21 'r',
"repository URL (absolute local path or remote URL)"));
22 r.push_back(Parameter::Mandatory(
'o',
"output database file"));
23 r.push_back(Parameter::Optional(
'n',
"fully qualified repository name"));
24 r.push_back(Parameter::Optional(
'k',
"repository master key(s) / dir"));
25 r.push_back(Parameter::Optional(
'l',
"temporary directory"));
26 r.push_back(Parameter::Optional(
'h',
"root hash (other than trunk)"));
27 r.push_back(Parameter::Optional(
'@',
"proxy url"));
33 const std::string &repo_url = *args.find(
'r')->second;
34 db_path_ = *args.find(
'o')->second;
35 const std::string &repo_name = (args.count(
'n') > 0) ? *args.find(
'n')->second
37 std::string repo_keys = (args.count(
'k') > 0) ? *args.find(
'k')->second :
"";
40 const std::string &tmp_dir = (args.count(
'l') > 0) ? *args.find(
'l')->second
42 if (args.count(
'h') > 0) {
47 tmp_db_path_ = tmp_dir +
"/cvmfs_filestats/";
48 atomic_init32(&num_downloaded_);
52 const bool follow_redirects =
false;
53 const string proxy = (args.count(
'@') > 0) ? *args.find(
'@')->second :
"";
54 if (!this->InitDownloadManager(follow_redirects, proxy)
55 || !this->InitSignatureManager(repo_keys)) {
61 repo_name, repo_url, tmp_dir, download_manager(), signature_manager());
62 success = Run(&fetcher);
65 success = Run(&fetcher);
68 return (success) ? 0 : 1;
71 template<
class ObjectFetcherT>
72 bool CommandFileStats::Run(ObjectFetcherT *object_fetcher) {
73 atomic_init32(&finished_);
76 unlink(abs_path.c_str());
78 db_->InitStatements();
87 pthread_create(&thread_processing_, NULL, MainProcessing,
this);
91 atomic_inc32(&finished_);
92 pthread_join(thread_processing_, NULL);
94 db_->DestroyStatements();
99 void CommandFileStats::CatalogCallback(
101 int32_t num = atomic_read32(&num_downloaded_);
102 string out_path = tmp_db_path_ +
StringifyInt(num + 1) +
".db";
104 atomic_inc32(&num_downloaded_);
107 void *CommandFileStats::MainProcessing(
void *data) {
111 int32_t fin = atomic_read32(&repo_stats->
finished_);
114 while (fin == 0 || processed < downloaded) {
115 if (processed < downloaded) {
123 fin = atomic_read32(&repo_stats->
finished_);
131 void CommandFileStats::ProcessCatalog(
string db_path) {
139 "SELECT count(*) FROM catalog;");
141 int cur_catalog_id = db_->StoreCatalog(catalog_count->
RetrieveInt64(0),
143 delete catalog_count;
146 cat_db->
sqlite_db(),
"SELECT hash, size, flags, symlink FROM catalog;");
149 "SELECT md5path_1, md5path_2, size, hash FROM chunks "
150 "ORDER BY md5path_1 ASC, md5path_2 ASC;");
158 == catalog::SqlDirent::kFlagLink) {
160 db_->StoreSymlink(symlink_length);
162 == catalog::SqlDirent::kFlagFile) {
164 != catalog::SqlDirent::kFlagFileChunk) {
165 int object_id = db_->StoreObject(hash, num_bytes, size);
166 db_->StoreFile(cur_catalog_id, object_id);
170 db_->StoreObject(hash, num_bytes, size);
175 int old_md5path_1 = 0, old_md5path_2 = 0;
176 int md5path_1 = 0, md5path_2 = 0;
178 while (chunks_list->FetchRow()) {
179 md5path_1 = chunks_list->RetrieveInt(0);
180 md5path_2 = chunks_list->RetrieveInt(1);
181 if (md5path_1 != old_md5path_1 || md5path_2 != old_md5path_2) {
182 cur_file_id = db_->StoreChunkedFile(cur_catalog_id);
184 const void *hash = chunks_list->RetrieveBlob(3);
185 int num_bytes = chunks_list->RetrieveBytes(3);
186 int64_t
size = chunks_list->RetrieveInt64(2);
187 db_->StoreChunk(hash, num_bytes, size, cur_file_id);
188 old_md5path_1 = md5path_1;
189 old_md5path_2 = md5path_2;
197 float FileStatsDatabase::kLatestSchema = 1;
198 unsigned FileStatsDatabase::kLatestSchemaRevision = 1;
200 bool FileStatsDatabase::CreateEmptyDatabase() {
203 "CREATE TABLE catalogs ("
204 "catalog_id INTEGER PRIMARY KEY,"
205 "num_entries INTEGER,"
210 "CREATE TABLE objects ("
211 "object_id INTEGER PRIMARY KEY,"
217 "CREATE INDEX idx_object_hash "
218 "ON objects (hash);")
221 "CREATE TABLE files ("
222 "file_id INTEGER PRIMARY KEY,"
223 "catalog_id INTEGER,"
224 "FOREIGN KEY (catalog_id) REFERENCES catalogs (catalog_id)"
228 "CREATE TABLE files_objects ("
231 "FOREIGN KEY (file_id) REFERENCES files (file_id),"
232 "FOREIGN KEY (object_id) REFERENCES objects (object_id));")
235 "CREATE INDEX idx_file_id ON files_objects (file_id);")
238 "CREATE INDEX idx_object_id ON files_objects (object_id);")
241 "CREATE TABLE symlinks ("
247 void FileStatsDatabase::InitStatements() {
250 "INSERT INTO catalogs (num_entries, file_size) VALUES (:num, :size);");
252 sqlite_db(),
"INSERT INTO objects (hash, size) VALUES (:hash, :size);");
254 sqlite_db(),
"INSERT INTO files (catalog_id) VALUES (:catalog);");
257 "INSERT INTO files_objects (file_id, object_id) VALUES (:file, "
260 sqlite_db(),
"INSERT INTO symlinks (length) VALUES(:length);");
262 sqlite_db(),
"SELECT object_id FROM objects WHERE hash = :hash;");
265 void FileStatsDatabase::DestroyStatements() {
266 delete query_insert_catalog;
267 delete query_insert_object;
268 delete query_insert_file;
269 delete query_insert_file_object;
270 delete query_insert_symlink;
271 delete query_lookup_object;
274 int64_t FileStatsDatabase::StoreCatalog(int64_t num_entries,
276 query_insert_catalog->Reset();
277 query_insert_catalog->BindInt64(1, num_entries);
278 query_insert_catalog->BindInt64(2, file_size);
279 query_insert_catalog->Execute();
280 return sqlite3_last_insert_rowid(sqlite_db());
283 int64_t FileStatsDatabase::StoreFile(int64_t catalog_id, int64_t object_id) {
284 query_insert_file->Reset();
285 query_insert_file->BindInt64(1, catalog_id);
286 query_insert_file->Execute();
287 int file_id = sqlite3_last_insert_rowid(sqlite_db());
289 query_insert_file_object->Reset();
290 query_insert_file_object->BindInt64(1, file_id);
291 query_insert_file_object->BindInt64(2, object_id);
292 query_insert_file_object->Execute();
296 int64_t FileStatsDatabase::StoreChunkedFile(int64_t catalog_id) {
297 query_insert_file->Reset();
298 query_insert_file->BindInt64(1, catalog_id);
299 query_insert_file->Execute();
300 return sqlite3_last_insert_rowid(sqlite_db());
303 int64_t FileStatsDatabase::StoreChunk(
const void *hash,
int hash_size,
304 int64_t
size, int64_t file_id) {
305 int object_id = StoreObject(hash, hash_size, size);
307 query_insert_file_object->Reset();
308 query_insert_file_object->BindInt64(1, file_id);
309 query_insert_file_object->BindInt64(2, object_id);
310 query_insert_file_object->Execute();
311 return sqlite3_last_insert_rowid(sqlite_db());
314 int64_t FileStatsDatabase::StoreObject(
const void *hash,
int hash_size,
316 query_lookup_object->Reset();
317 query_lookup_object->BindBlob(1, hash, hash_size);
318 if (query_lookup_object->FetchRow()) {
319 return query_lookup_object->RetrieveInt(0);
321 query_insert_object->Reset();
322 query_insert_object->BindBlob(1, hash, hash_size);
323 query_insert_object->BindInt64(2, size);
324 query_insert_object->Execute();
325 return sqlite3_last_insert_rowid(sqlite_db());
329 int64_t FileStatsDatabase::StoreSymlink(int64_t length) {
330 query_insert_symlink->Reset();
331 query_insert_symlink->BindInt64(1, length);
332 query_insert_symlink->Execute();
333 return sqlite3_last_insert_rowid(sqlite_db());
CallbackPtr RegisterListener(typename BoundClosure< CatalogTraversalData< ObjectFetcherT::CatalogTN >, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, ClosureDataT data)
std::string database_path() const
static Publisher * Create(const SettingsPublisher &settings)
const void * RetrieveBlob(const int idx_column) const
std::vector< Parameter > ParameterList
string JoinStrings(const vector< string > &strings, const string &joint)
bool IsHttpUrl(const std::string &path)
bool BeginTransaction() const
ObjectFetcherT * object_fetcher
assert((mem||(size==0))&&"Out Of Memory")
static const int kFlagFileChunk
static const int kFlagFile
bool CopyPath2Path(const string &src, const string &dest)
static DerivedT * Open(const std::string &filename, const OpenMode open_mode)
atomic_int32 num_downloaded_
static const int kFlagLink
std::string GetAbsolutePath(const std::string &path)
const char kSuffixCatalog
bool MkdirDeep(const std::string &path, const mode_t mode, bool verify_writable)
sqlite3_int64 RetrieveInt64(const int idx_column) const
string StringifyInt(const int64_t value)
sqlite3 * sqlite_db() const
bool CommitTransaction() const
bool DirectoryExists(const std::string &path)
bool Traverse(const TraversalType type=Base::kBreadthFirst)
void ProcessCatalog(string db_path)
std::map< char, SharedPtr< std::string > > ArgumentList
Any MkFromHexPtr(const HexPtr hex, const char suffix)
int64_t GetFileSize(const std::string &path)
int RetrieveInt(const int idx_column) const
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
int RetrieveBytes(const int idx_column) const
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)