GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_ingestsql.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 0 808 0.0%
Branches: 0 558 0.0%

Line Branch Exec Source
1 #include "swissknife_ingestsql.h"
2
3 #include <fcntl.h>
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <sys/time.h>
7 #include <sys/resource.h>
8 #include <sys/types.h>
9 #include <pwd.h>
10 #include <grp.h>
11
12 #include <csignal>
13 #include <cstdlib>
14 #include <fstream>
15 #include <sstream>
16 #include <unordered_map>
17 #include <unordered_set>
18 #include <stack>
19
20 #include "acl.h"
21 #include "catalog_mgr_rw.h"
22 #include "curl/curl.h"
23 #include "gateway_util.h"
24 #include "swissknife_lease_curl.h"
25 #include "swissknife_lease_json.h"
26 #include "swissknife_sync.h"
27 #include "upload.h"
28 #include "util/logging.h"
29 #include "catalog_downloader.h"
30 #include "shortstring.h"
31
32 #define CHECK_SQLITE_ERROR(ret, expected) do { \
33 if ((ret)!=expected) { \
34 LogCvmfs(kLogCvmfs, kLogStderr, "SQLite error: %d", (ret)); \
35 assert(0); \
36 } \
37 } while (0)
38
39 #define CUSTOM_ASSERT(check, msg, ...) do { \
40 if (!(check)) { \
41 LogCvmfs(kLogCvmfs, kLogStderr, msg, ##__VA_ARGS__); \
42 assert(0); \
43 } \
44 } while (0)
45
46 #define SHOW_PROGRESS(item, freq, curr, total) do { \
47 if ((curr) % freq == 0 || (curr) == total) { \
48 LogCvmfs(kLogCvmfs, kLogStdout, "Processed %d/%d %s", (curr), total, item); \
49 } \
50 } while (0)
51
52
53
54 static const unsigned kExternalChunkSize = 24 * 1024 * 1024;
55 static const unsigned kInternalChunkSize = 6 * 1024 * 1024;
56 static const unsigned kDefaultLeaseBusyRetryInterval = 10;
57 static const unsigned kLeaseRefreshInterval = 90; // seconds
58
59
60 static bool g_lease_acquired = false;
61 static string g_gateway_url;
62 static string g_gateway_key_id;
63 static string g_gateway_secret;
64 static string g_session_token;
65 static string g_session_token_file;
66 static string g_s3_file;
67 static time_t g_last_lease_refresh=0;
68 static bool g_stop_refresh = false;
69 static int64_t g_priority=0;
70 static bool g_add_missing_catalogs = false;
71 static string get_lease_from_paths(vector<string> paths);
72 static vector<string> get_all_dirs_from_sqlite(vector<string>& sqlite_db_vec,
73 bool include_additions,
74 bool include_deletions);
75 static string get_parent(const string& path);
76 static string get_basename(const string& path);
77
78 static XattrList marshal_xattrs(const char *acl);
79 static string sanitise_name(const char *name_cstr, bool allow_leading_slash);
80 static void on_signal(int sig);
81 static string acquire_lease(const string& key_id, const string& secret, const string& lease_path,
82 const string& repo_service_url, bool force_cancel_lease, uint64_t *current_revision, string &current_root_hash,
83 unsigned int refresh_interval);
84 static void cancel_lease();
85 static void refresh_lease();
86 static vector<string> get_file_list(string& path);
87 static int check_hash(const char *hash) ;
88 static void recursively_delete_directory(PathString& path, catalog::WritableCatalogManager &catalog_manager);
89 static void create_empty_database( string& filename );
90 static void relax_db_locking(sqlite3 *db);
91 static bool check_prefix(const std::string &path , const std::string &prefix);
92
93 static bool isDatabaseMarkedComplete(const char *dbfile);
94 static void setDatabaseMarkedComplete(const char *dbfile);
95
96 extern "C" void* lease_refresh_thread(void *payload);
97
98 static string sanitise_name(const char *name_cstr,
99 bool allow_leading_slash = false) {
100 int reason = 0;
101 const char *c = name_cstr;
102 while(*c == '/') {c++;} // strip any leading slashes
103 string const name = string(c);
104 bool ok = true;
105
106 if (!allow_leading_slash && HasPrefix(name, "/", true)) {
107 reason=1;
108 ok = false;
109 }
110 if (HasSuffix(name, "/", true)) {
111 if (!(allow_leading_slash &&
112 name.size() == 1)) { // account for the case where name=="/"
113 reason=2;
114 ok = false;
115 }
116 }
117 if (name.find("//") != string::npos) {
118 reason=3;
119 ok = false;
120 }
121 if (HasPrefix(name, "./", true) || HasPrefix(name, "../", true)) {
122 reason=4;
123 ok = false;
124 }
125 if (HasSuffix(name, "/.", true) || HasSuffix(name, "/..", true)) {
126 reason=5;
127 ok = false;
128 }
129 if (name.find("/./") != string::npos || name.find("/../") != string::npos) {
130 reason=6;
131 ok = false;
132 }
133 if (name == "") {
134 reason=7;
135 ok = false;
136 }
137 CUSTOM_ASSERT(ok, "Name [%s] is invalid (reason %d)", name.c_str(), reason);
138 return string(name);
139 }
140
141 static string get_parent(const string& path) {
142 size_t const found = path.find_last_of('/');
143 if (found == string::npos) {
144 return string("");
145 }
146 return path.substr(0, found);
147 }
148
149 static string get_basename(const string& path) {
150 const size_t found = path.find_last_of('/');
151 if (found == string::npos) {
152 return path;
153 }
154 return path.substr(found + 1);
155 }
156
157 // this is copied from MakeRelativePath
158 static string MakeCatalogPath(const std::string &relative_path) {
159 return (relative_path == "") ? "" : "/" + relative_path;
160 }
161
162 static string acquire_lease(const string& key_id, const string& secret, const string& lease_path,
163 const string& repo_service_url, bool force_cancel_lease, uint64_t *current_revision, string &current_root_hash,
164 unsigned int refresh_interval) {
165 const CURLcode ret = curl_global_init(CURL_GLOBAL_ALL);
166 CUSTOM_ASSERT(ret == CURLE_OK, "failed to init curl");
167
168 string gateway_metadata_str;
169 char *gateway_metadata = getenv("CVMFS_GATEWAY_METADATA");
170 if (gateway_metadata != NULL) gateway_metadata_str = gateway_metadata;
171
172 while (true) {
173 CurlBuffer buffer;
174 if (MakeAcquireRequest(key_id, secret, lease_path, repo_service_url,
175 &buffer, gateway_metadata_str)) {
176 string session_token;
177
178 const LeaseReply rep = ParseAcquireReplyWithRevision(buffer, &session_token, current_revision, current_root_hash);
179 switch (rep) {
180 case kLeaseReplySuccess:
181 g_lease_acquired = true;
182 g_last_lease_refresh = time(NULL);
183 return session_token;
184 break;
185 case kLeaseReplyBusy:
186 if( force_cancel_lease ) {
187 LogCvmfs(kLogCvmfs, kLogStderr, "Lease busy, forcing cancellation (TODO");
188 }
189 LogCvmfs(kLogCvmfs, kLogStderr, "Lease busy, retrying in %d sec",
190 refresh_interval);
191 sleep(refresh_interval);
192 break;
193 default:
194 LogCvmfs(kLogCvmfs, kLogStderr, "Error acquiring lease: %s. Retrying in %d sec",
195 buffer.data.c_str(), refresh_interval);
196 sleep(refresh_interval);
197 }
198 } else {
199 LogCvmfs(kLogCvmfs, kLogStderr, "Error making lease acquisition request. Retrying in %d sec", refresh_interval);
200 sleep(refresh_interval);
201 }
202 }
203 assert(false);
204 return "";
205 }
206
207 static uint64_t make_commit_on_gateway( const std::string &old_root_hash, const std::string &new_root_hash, int64_t priority) {
208 CurlBuffer buffer;
209 char priorityStr[100];
210 (void)sprintf(priorityStr, "%" PRId64, priority); // skipping return value check; no way such large buffer will overflow
211 buffer.data="";
212
213 const std::string payload = "{\n\"old_root_hash\": \"" + old_root_hash + "\",\n\"new_root_hash\": \""+new_root_hash+"\",\n\"priority\": "+priorityStr+"}";
214
215 return MakeEndRequest("POST", g_gateway_key_id, g_gateway_secret,
216 g_session_token, g_gateway_url, payload, &buffer, true);
217 }
218
219 static void refresh_lease() {
220 CurlBuffer buffer;
221 buffer.data="";
222 if ( (time(NULL)- g_last_lease_refresh )< kLeaseRefreshInterval ){ return; }
223
224 if (MakeEndRequest("PATCH", g_gateway_key_id, g_gateway_secret,
225 g_session_token, g_gateway_url, "", &buffer,false)) {
226 const int ret = ParseDropReply(buffer);
227 if (kLeaseReplySuccess == ret) {
228 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Lease refreshed");
229 g_last_lease_refresh=time(NULL);
230 } else {
231 LogCvmfs(kLogCvmfs, kLogStderr, "Lease refresh failed: %d", ret);
232 }
233 } else {
234 LogCvmfs(kLogCvmfs, kLogStderr, "Lease refresh request failed");
235 if(buffer.data == "Method Not Allowed\n") {
236 g_last_lease_refresh=time(NULL);
237 LogCvmfs(kLogCvmfs, kLogStderr, "This gateway does not support lease refresh");
238 }
239
240 }
241 }
242
243
244 static void cancel_lease() {
245 CurlBuffer buffer;
246 if (MakeEndRequest("DELETE", g_gateway_key_id, g_gateway_secret,
247 g_session_token, g_gateway_url, "", &buffer, false)) {
248 const int ret = ParseDropReply(buffer);
249 if (kLeaseReplySuccess == ret) {
250 LogCvmfs(kLogCvmfs, kLogStdout, "Lease cancelled");
251 } else {
252 LogCvmfs(kLogCvmfs, kLogStderr, "Lease cancellation failed: %d", ret);
253 }
254 } else {
255 LogCvmfs(kLogCvmfs, kLogStderr, "Lease cancellation request failed");
256 }
257 g_stop_refresh=true;
258 }
259
260 static void on_signal(int sig) {
261 (void)signal(sig, SIG_DFL);
262 if (g_lease_acquired) {
263 LogCvmfs(kLogCvmfs, kLogStdout, "Cancelling lease");
264 cancel_lease();
265 unlink(g_session_token_file.c_str());
266 }
267 if (sig == SIGINT || sig == SIGTERM) exit(1);
268 }
269
270 static vector<string> get_all_dirs_from_sqlite(vector<string>& sqlite_db_vec,
271 bool include_additions,
272 bool include_deletions) {
273 int ret;
274 vector<string> paths;
275
276 for (vector<string>::iterator it = sqlite_db_vec.begin();
277 it != sqlite_db_vec.end(); it++) {
278 sqlite3 *db;
279 ret = sqlite3_open_v2((*it).c_str(), &db, SQLITE_OPEN_READONLY, NULL);
280 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
281 relax_db_locking(db);
282
283 vector<string> tables;
284 if (include_additions) {
285 tables.push_back("dirs");
286 tables.push_back("links");
287 tables.push_back("files");
288 }
289 if (include_deletions) {
290 tables.push_back("deletions");
291 }
292
293 // get all the paths from the DB
294 for (vector<string>::iterator it = tables.begin(); it != tables.end();
295 it++) {
296 sqlite3_stmt *stmt;
297 const string query = "SELECT name FROM " + *it;
298 ret = sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, NULL);
299 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
300 while (sqlite3_step(stmt) == SQLITE_ROW) {
301 const char *name = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
302 const string names = sanitise_name(name);
303 if (*it=="dirs") {
304 paths.push_back(names);
305 } else {
306 paths.push_back(get_parent(names));
307 }
308 }
309 ret = sqlite3_finalize(stmt);
310 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
311 }
312 ret = sqlite3_close_v2(db);
313 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
314 }
315 return paths;
316 }
317
318 static int get_db_schema_revision(sqlite3 *db, const std::string &db_name = "") {
319 sqlite3_stmt *stmt;
320 std::ostringstream stmt_str;
321 stmt_str << "SELECT value FROM " << db_name << "properties WHERE key = 'schema_revision'";
322 int ret = sqlite3_prepare_v2(db, stmt_str.str().c_str(), -1, &stmt, NULL);
323 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
324
325 ret = sqlite3_step(stmt);
326 // if table exists, we require that it must have a schema_revision row
327 CHECK_SQLITE_ERROR(ret, SQLITE_ROW);
328 const std::string schema_revision_str(reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0)));
329 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
330 return std::stoi(schema_revision_str);
331 }
332
333 static int get_row_count(sqlite3 *db, const std::string &table_name) {
334 sqlite3_stmt *stmt;
335 std::ostringstream stmt_str;
336 stmt_str << "SELECT COUNT(*) FROM " << table_name;
337 int ret = sqlite3_prepare_v2(db, stmt_str.str().c_str(), -1, &stmt, NULL);
338 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
339
340 ret = sqlite3_step(stmt);
341 CHECK_SQLITE_ERROR(ret, SQLITE_ROW);
342 const std::string count_str(reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0)));
343 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
344 return std::stoi(count_str);
345 }
346
347 static int calculate_print_frequency(int total) {
348 int base = 1000;
349 while (base * 50 < total) base *= 10;
350 return base;
351 }
352
353 // compute a common path among for paths for use as the lease path
354 static string get_lease_from_paths(vector<string> paths) {
355 CUSTOM_ASSERT(!paths.empty(), "no paths are provided");
356
357 // we'd have to ensure path is relative
358 // (probably best to check this elsewhere as it is not just a requirement for this function)
359 auto lease = PathString(paths.at(0));
360 for (auto it = paths.begin() + 1; it != paths.end(); ++it) {
361 auto path = PathString(*it);
362 // shrink the lease path until it is a parent of "path"
363 while (!IsSubPath(lease, path)) {
364 auto i = lease.GetLength() - 1;
365 for (; i >= 0; --i) {
366 if (lease.GetChars()[i] == '/' || i == 0) {
367 lease.Truncate(i);
368 break;
369 }
370 }
371 }
372 if (lease.IsEmpty()) break; // early stop if lease is already at the root
373 }
374
375 auto prefix = "/" + lease.ToString();
376
377 LogCvmfs(kLogCvmfs, kLogStdout, "Longest prefix is %s", prefix.c_str());
378 return prefix;
379 }
380
381 static XattrList marshal_xattrs(const char *acl_string) {
382 XattrList aclobj;
383
384 if (acl_string == NULL || acl_string[0] == '\0') {
385 return aclobj;
386 }
387
388 bool equiv_mode;
389 size_t binary_size;
390 char *binary_acl;
391 const int ret = acl_from_text_to_xattr_value(string(acl_string), binary_acl, binary_size, equiv_mode);
392 if (ret) {
393 LogCvmfs(kLogCvmfs, kLogStderr, "failure of acl_from_text_to_xattr_value(%s)", acl_string);
394 assert(0); // TODO(vavolkl): incorporate error handling other than asserting
395 return aclobj;
396 }
397 if (!equiv_mode) {
398 CUSTOM_ASSERT(aclobj.Set("system.posix_acl_access", string(binary_acl, binary_size)), "failed to set system.posix_acl_access (ACL size %ld)", binary_size);
399 free(binary_acl);
400 }
401
402 return aclobj;
403 }
404
405 std::unordered_map<string, string> load_config(const string& config_file) {
406 std::unordered_map<string, string> config_map;
407 ifstream input(config_file);
408 if (!input) {
409 LogCvmfs(kLogCvmfs, kLogStderr, "could not open config file %s", config_file.c_str());
410 return config_map;
411 }
412 vector<string> lines;
413 for (string line; getline(input, line);) {
414 lines.push_back(line);
415 }
416
417 for (auto it = lines.begin(); it != lines.end(); it++) {
418 const string l = *it;
419 const size_t p = l.find('=', 0);
420 if (p != string::npos) {
421 const string key = l.substr(0, p);
422 string val = l.substr(p + 1);
423 // trim any double quotes
424 if (val.front() == '"') {
425 val = val.substr(1, val.length() - 2);
426 }
427 config_map[key] = val;
428 }
429 }
430
431 return config_map;
432 }
433
434 string retrieve_config(std::unordered_map<string, string> &config_map, const string& key) {
435 auto kv = config_map.find(key);
436 CUSTOM_ASSERT(kv != config_map.end(), "Parameter %s not found in config", key.c_str());
437 return kv->second;
438 }
439
440 static vector<string> get_file_list(string& path) {
441 vector<string> paths;
442 const char *cpath = path.c_str();
443 struct stat st;
444 const int ret = stat(cpath, &st);
445 CUSTOM_ASSERT(ret == 0, "failed to stat file %s", cpath);
446
447 if (S_ISDIR(st.st_mode)) {
448 DIR *d;
449 struct dirent *dir;
450 d = opendir(cpath);
451 if (d) {
452 while ((dir = readdir(d)) != NULL) {
453 const char *t = strrchr(dir->d_name, '.');
454 if (t && !strcmp(t, ".db")) {
455 paths.push_back(path + "/" + dir->d_name);
456 }
457 }
458 closedir(d);
459 }
460 } else {
461 paths.push_back(path);
462 }
463 return paths;
464 }
465
466 extern bool g_log_with_time;
467
468 int swissknife::IngestSQL::Main(const swissknife::ArgumentList &args) {
469
470 // the catalog code uses assert() liberally.
471 // install ABRT signal handler to catch an abort and cancel lease
472 if (
473 signal(SIGABRT, &on_signal) == SIG_ERR
474 || signal(SIGINT, &on_signal) == SIG_ERR
475 || signal(SIGTERM, &on_signal) == SIG_ERR
476 ) {
477 LogCvmfs(kLogCvmfs, kLogStdout, "Setting signal handlers failed");
478 exit(1);
479 }
480
481 const bool enable_corefiles = (args.find('c') != args.end());
482 if( !enable_corefiles ) {
483 struct rlimit rlim;
484 rlim.rlim_cur = rlim.rlim_max = 0;
485 setrlimit( RLIMIT_CORE, &rlim );
486 }
487
488
489 if (args.find('n') != args.end()) {
490 create_empty_database( *args.find('n')->second);
491 exit(0);
492 }
493
494 //TODO(@vvolkl): add 'B' option to wait_for_update
495 //TODO(@vvolkl): add 'T' option for ttl
496
497
498 if (args.find('P') != args.end()) {
499 const char *arg = (*args.find('P')->second).c_str();
500 char* at_null_terminator_if_number;
501 g_priority = strtoll(arg, &at_null_terminator_if_number, 10);
502 if (*at_null_terminator_if_number != '\0') {
503 LogCvmfs(kLogCvmfs, kLogStderr, "Priority parameter value '%s' parsing failed", arg);
504 return 1;
505 }
506 } else {
507 g_priority = -time(NULL);
508 }
509
510
511 unsigned int lease_busy_retry_interval = kDefaultLeaseBusyRetryInterval;
512 if (args.find('r') != args.end()) {
513 lease_busy_retry_interval = atoi((*args.find('r')->second).c_str());
514 }
515
516 string dir_temp = "";
517 const char *env_tmpdir;
518 if (args.find('t') != args.end()) {
519 dir_temp = MakeCanonicalPath(*args.find('t')->second);
520 } else if ((env_tmpdir = getenv("TMPDIR"))) {
521 dir_temp = MakeCanonicalPath(env_tmpdir);
522 } else {
523 LogCvmfs(kLogCvmfs, kLogStderr, "-t or TMPDIR required");
524 return 1;
525 }
526
527 string kConfigDir("/etc/cvmfs/gateway-client/");
528 if (args.find('C') != args.end()) {
529 kConfigDir = MakeCanonicalPath(*args.find('C')->second);
530 kConfigDir += "/";
531 LogCvmfs(kLogCvmfs, kLogStdout, "Overriding configuration dir prefix to %s", kConfigDir.c_str() );
532 }
533
534 // mandatory arguments
535 string const repo_name = *args.find('N')->second;
536 string sqlite_db_path = *args.find('D')->second;
537
538 vector<string> sqlite_db_vec = get_file_list(sqlite_db_path);
539
540 // optional arguments
541 bool const allow_deletions = (args.find('d') != args.end());
542 bool const force_cancel_lease = (args.find('x') != args.end());
543 bool const allow_additions = !allow_deletions || (args.find('a') != args.end());
544 g_add_missing_catalogs = ( args.find('z') != args.end());
545 bool const check_completed_graft_property = ( args.find('Z') != args.end());
546 if (args.find('v') != args.end()) {
547 SetLogVerbosity(kLogVerbose);
548 }
549
550 if(check_completed_graft_property) {
551 if(sqlite_db_vec.size()!=1) {
552 LogCvmfs(kLogCvmfs, kLogStderr, "-Z requires a single DB file");
553 exit(1);
554 }
555 if(isDatabaseMarkedComplete(sqlite_db_vec[0].c_str())) {
556 LogCvmfs(kLogCvmfs, kLogStderr, "DB file is already marked as completed_graft");
557 exit(0);
558 } else {
559 LogCvmfs(kLogCvmfs, kLogStderr, "DB file is not marked as completed_graft");
560 }
561 }
562
563 string const config_file = kConfigDir + repo_name + "/config";
564 string stratum0;
565 string proxy;
566
567 string additional_prefix="";
568 bool has_additional_prefix=false;
569 if (args.find('p') != args.end()) {
570 additional_prefix=*args.find('p')->second;
571 additional_prefix = sanitise_name(additional_prefix.c_str(), true);
572 if( additional_prefix.back() != '/' ) {
573 additional_prefix += "/";
574 }
575 has_additional_prefix= true;
576 LogCvmfs(kLogCvmfs, kLogStdout, "Adding additional prefix %s to lease and all paths", additional_prefix.c_str() );
577 // now we are confident that any additional prefix has no leading / and does have a tailing /
578 }
579 auto config_map = load_config(config_file);
580
581 if (args.find('g') != args.end()) {
582 g_gateway_url = *args.find('g')->second;
583 } else {
584 g_gateway_url = retrieve_config(config_map, "CVMFS_GATEWAY");
585 }
586 if (args.find('w') != args.end()) {
587 stratum0 = *args.find('w')->second;
588 } else {
589 stratum0 = retrieve_config(config_map, "CVMFS_STRATUM0");
590 }
591
592 if (args.find('@') != args.end()) {
593 proxy = *args.find('@')->second;
594 } else {
595 proxy = retrieve_config(config_map, "CVMFS_HTTP_PROXY");
596 }
597
598 string lease_path="";
599 //bool lease_autodetected = false;
600 if (args.find('l') != args.end()) {
601 lease_path = *args.find('l')->second;
602 } else {
603 // lease path wasn't specified, so try to autodetect it
604 vector<string> const paths = get_all_dirs_from_sqlite(
605 sqlite_db_vec, allow_additions, allow_deletions);
606 if (paths.size() == 0) {
607 LogCvmfs(kLogCvmfs, kLogStdout, "Database is empty, nothing to do");
608 return 0; // treat it as a success
609 }
610 lease_path = get_lease_from_paths(paths);
611 //lease_autodetected = true;
612 }
613
614 if (has_additional_prefix) {
615 if (lease_path == "/" ) { lease_path = "/" + additional_prefix; }
616 else {
617 if ( lease_path.substr(0,1)=="/" ) {
618 lease_path = "/" + additional_prefix + lease_path.substr(1, lease_path.size()-1);
619 } else {
620 lease_path = "/" + additional_prefix + lease_path; // prefix is certain to have a trailing /
621 }
622 }
623 }
624 if (lease_path.substr(0,1)!="/") { lease_path = "/" + lease_path; }
625 LogCvmfs(kLogCvmfs, kLogStdout, "Lease path is %s", lease_path.c_str());
626
627
628 string public_keys = kConfigDir + repo_name + "/pubkey";
629 string key_file = kConfigDir + repo_name + "/gatewaykey";
630 string s3_file = kConfigDir + repo_name + "/s3.conf";
631
632 if( args.find('k') != args.end()) {
633 public_keys = *args.find('k')->second;
634 }
635 if( args.find('s') != args.end()) {
636 key_file = *args.find('s')->second;
637 }
638 if( args.find('3') != args.end()) {
639 s3_file = *args.find('3')->second;
640 }
641
642 CUSTOM_ASSERT(access(public_keys.c_str(), R_OK) == 0, "%s is not readable", public_keys.c_str());
643 CUSTOM_ASSERT(access(key_file.c_str(), R_OK) == 0, "%s is not readable", key_file.c_str());
644
645 // string spooler_definition_string = string("gw,,") + g_gateway_url;
646 // create a spooler that will upload to S3
647 string const spooler_definition_string = string("S3,") + dir_temp + "," + repo_name + "@" + s3_file;
648
649 // load gateway lease
650 if (!gateway::ReadKeys(key_file, &g_gateway_key_id, &g_gateway_secret)) {
651 LogCvmfs(kLogCvmfs, kLogStderr, "gateway::ReadKeys failed");
652 return 1;
653 }
654
655 uint64_t current_revision=0;
656 std::string current_root_hash="";
657
658 // acquire lease and save token to a file in the tmpdir
659 LogCvmfs(kLogCvmfs, kLogStdout, "Acquiring gateway lease on %s",
660 lease_path.c_str());
661 g_session_token = acquire_lease(g_gateway_key_id, g_gateway_secret,
662 repo_name + lease_path, g_gateway_url, force_cancel_lease,
663 &current_revision, current_root_hash, lease_busy_retry_interval);
664
665
666 char *_tmpfile = strdup( (dir_temp + "/gateway_session_token_XXXXXX").c_str() );
667 int const temp_fd = mkstemp(_tmpfile);
668 g_session_token_file = string(_tmpfile);
669 free(_tmpfile);
670
671 FILE *fout=fdopen(temp_fd, "wb");
672 CUSTOM_ASSERT(fout!=NULL, "failed to open session token file %s for writing", g_session_token_file.c_str());
673 fputs(g_session_token.c_str(), fout);
674 fclose(fout);
675
676 // now start the lease refresh thread
677 pthread_t lease_thread;
678 if ( 0 != pthread_create( &lease_thread, NULL, lease_refresh_thread, NULL ) ) {
679 LogCvmfs(kLogCvmfs, kLogStderr, "Unable to start lease refresh thread");
680 cancel_lease();
681 return 1;
682 }
683
684 // now initialise the various bits we need
685
686 upload::SpoolerDefinition spooler_definition(
687 spooler_definition_string, shash::kSha1, zlib::kZlibDefault, false, true,
688 SyncParameters::kDefaultMinFileChunkSize, SyncParameters::kDefaultAvgFileChunkSize,
689 SyncParameters::kDefaultMaxFileChunkSize, g_session_token_file, key_file);
690
691 if (args.find('q') != args.end()) {
692 spooler_definition.number_of_concurrent_uploads =
693 String2Uint64(*args.find('q')->second);
694 }
695
696 upload::SpoolerDefinition const spooler_definition_catalogs(
697 spooler_definition.Dup2DefaultCompression());
698
699 UniquePtr<upload::Spooler> const spooler_catalogs(
700 upload::Spooler::Construct(spooler_definition_catalogs, nullptr));
701
702 if (!spooler_catalogs.IsValid()) {
703 LogCvmfs(kLogCvmfs, kLogStderr, "spooler_catalogs invalid");
704 cancel_lease();
705 return 1;
706 }
707 if (!InitDownloadManager(true, proxy, kCatalogDownloadMultiplier)) {
708 LogCvmfs(kLogCvmfs, kLogStderr, "download manager init failed");
709 cancel_lease();
710 return 1;
711 }
712 if (!InitSignatureManager(public_keys, "")) {
713 LogCvmfs(kLogCvmfs, kLogStderr, "signature manager init failed");
714 cancel_lease();
715 return 1;
716 }
717
718 UniquePtr<manifest::Manifest> manifest;
719
720 manifest = FetchRemoteManifest(stratum0, repo_name, shash::Any());
721
722 if (!manifest.IsValid()) {
723 LogCvmfs(kLogCvmfs, kLogStderr, "manifest invalid");
724 cancel_lease();
725 return 1;
726 }
727
728 if(current_revision > 0 ) {
729 if( current_revision == manifest->revision() ) {
730 if (current_root_hash != manifest->catalog_hash().ToString() ) {
731 LogCvmfs(kLogCvmfs, kLogStderr, "Mismatch between cvmfspublished and gateway hash for revision %lu (%s!=%s)", current_revision, current_root_hash.c_str(), manifest->catalog_hash().ToString().c_str() );
732 cancel_lease();
733 return 1;
734 } else {
735 LogCvmfs(kLogCvmfs, kLogStdout, "Gateway and .cvmfspublished agree on repo version %lu", current_revision );
736 }
737 }
738 if( current_revision > manifest->revision() ) {
739 LogCvmfs(kLogCvmfs, kLogStdout, "Gateway has supplied a newer revision than the current .cvmfspublished %lu > %lu", current_revision, manifest->revision() );
740 manifest->set_revision(current_revision);
741 manifest->set_catalog_hash( shash::MkFromHexPtr(shash::HexPtr(current_root_hash), shash::kSuffixCatalog));
742 } else if (current_revision < manifest->revision() ) {
743 LogCvmfs(kLogCvmfs, kLogStdout, "Gateway has supplied an older revision than the current .cvmfspublished %lu < %lu", current_revision, manifest->revision() );
744 }
745 } else {
746 LogCvmfs(kLogCvmfs, kLogStdout, "Gateway has not supplied a revision. Using .cvmfspublished" );
747 }
748
749
750 // get hash of current root catalog, remove terminal "C", encode it
751 string const old_root_hash = manifest->catalog_hash().ToString(true);
752 string const hash = old_root_hash.substr(0, old_root_hash.length() - 1);
753 shash::Any const base_hash =
754 shash::MkFromHexPtr(shash::HexPtr(hash), shash::kSuffixCatalog);
755 LogCvmfs(kLogCvmfs, kLogStdout, "old_root_hash: %s", old_root_hash.c_str());
756
757 bool const is_balanced = false;
758
759 catalog::WritableCatalogManager catalog_manager(
760 base_hash, stratum0, dir_temp, spooler_catalogs.weak_ref(),
761 download_manager(), false, SyncParameters::kDefaultNestedKcatalogLimit,
762 SyncParameters::kDefaultRootKcatalogLimit, SyncParameters::kDefaultFileMbyteLimit, statistics(),
763 is_balanced, SyncParameters::kDefaultMaxWeight, SyncParameters::kDefaultMinWeight, dir_temp /* dir_cache */);
764
765 catalog_manager.Init();
766
767
768 // now graft the contents of the DB
769 vector<sqlite3*> open_dbs;
770 for (auto&& db_file : sqlite_db_vec) {
771 sqlite3 *db;
772 CHECK_SQLITE_ERROR(sqlite3_open_v2(db_file.c_str(), &db, SQLITE_OPEN_READONLY, NULL), SQLITE_OK);
773 relax_db_locking(db);
774 open_dbs.push_back(db);
775 }
776 process_sqlite(open_dbs, catalog_manager, allow_additions,
777 allow_deletions, lease_path.substr(1), additional_prefix);
778 for (auto&& db : open_dbs) {
779 CHECK_SQLITE_ERROR(sqlite3_close_v2(db), SQLITE_OK);
780 }
781
782 // commit changes
783 LogCvmfs(kLogCvmfs, kLogStdout, "Committing changes...");
784 if (!catalog_manager.Commit(false, false, manifest.weak_ref())) {
785 LogCvmfs(kLogCvmfs, kLogStderr, "something went wrong during sync");
786 cancel_lease();
787 return 1;
788 }
789
790 // finalize the spooler
791 LogCvmfs(kLogCvmfs, kLogStdout, "Waiting for all uploads to finish...");
792 spooler_catalogs->WaitForUpload();
793
794 LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
795
796 // We call FinalizeSession(true) this time, to also trigger the commit
797 // operation on the gateway machine (if the upstream is of type "gw").
798
799 // Get the path of the new root catalog
800 const string new_root_hash = manifest->catalog_hash().ToString(true);
801
802 // if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
803 // RepositoryTag())) {
804 // LogCvmfs(kLogCvmfs, kLogStderr, "Failed to commit the transaction");
805 // // lease is only released on success
806 // cancel_lease();
807 // return 1;
808 // }
809
810 LogCvmfs(kLogCvmfs, kLogStdout, "Committing with priority %" PRId64, g_priority);
811
812 bool const ok = make_commit_on_gateway( old_root_hash, new_root_hash, g_priority );
813 if(!ok) {
814 LogCvmfs(kLogCvmfs, kLogStderr, "something went wrong during commit on gateway");
815 cancel_lease();
816 exit(1);
817 }
818
819
820 unlink( g_session_token_file.c_str() );
821
822 g_stop_refresh=true;
823
824
825 if(check_completed_graft_property) {
826 setDatabaseMarkedComplete(sqlite_db_vec[0].c_str());
827 }
828
829
830
831 return 0;
832 }
833
834 size_t writeFunction(void *ptr, size_t size, size_t nmemb, std::string* data) {
835 return size * nmemb;
836 }
837
838 void replaceAllSubstrings(std::string& str, const std::string& from, const std::string& to) {
839 if (from.empty()) {
840 return; // Avoid infinite loop if 'from' is an empty string.
841 }
842 size_t startPos = 0;
843 while ((startPos = str.find(from, startPos)) != std::string::npos) {
844 str.replace(startPos, from.length(), to);
845 startPos += to.length(); // Advance startPos to avoid replacing the substring just inserted.
846 }
847 }
848
849
850 void swissknife::IngestSQL::process_sqlite(
851 const std::vector<sqlite3*> &dbs,
852 catalog::WritableCatalogManager &catalog_manager,
853 bool allow_additions, bool allow_deletions, const std::string &lease_path, const std::string &additional_prefix)
854 {
855 std::map<std::string, Directory> all_dirs;
856 std::map<std::string, std::vector<File>> all_files;
857 std::map<std::string, std::vector<Symlink>> all_symlinks;
858
859 for (auto&& db : dbs) {
860 load_dirs(db, lease_path, additional_prefix, all_dirs);
861 }
862
863 // put in a nested scope so we can free up memory of `dir_names`
864 {
865 LogCvmfs(kLogCvmfs, kLogStdout, "Precaching existing directories (starting from %s)", lease_path.c_str());
866 std::unordered_set<std::string> dir_names;
867 std::transform(all_dirs.begin(), all_dirs.end(), std::inserter(dir_names, dir_names.end()),
868 [](const std::pair<std::string, Directory>& pair) {
869 return MakeCatalogPath(pair.first);
870 });
871 catalog_manager.LoadCatalogs(MakeCatalogPath(lease_path), dir_names);
872 }
873
874 for (auto&& db : dbs) {
875 load_files(db, lease_path, additional_prefix, all_files);
876 load_symlinks(db, lease_path, additional_prefix, all_symlinks);
877 }
878
879 // perform all deletions first
880 if (allow_deletions) {
881 LogCvmfs(kLogCvmfs, kLogStdout, "Processing deletions...");
882 for (auto&& db : dbs) {
883 CHECK_SQLITE_ERROR(do_deletions(db, catalog_manager, lease_path, additional_prefix), SQLITE_OK);
884 }
885 }
886
887 if (allow_additions) {
888 LogCvmfs(kLogCvmfs, kLogStdout, "Processing additions...");
889 // first ensure all directories are present and create missing ones
890 do_additions(all_dirs, all_files, all_symlinks, lease_path, catalog_manager);
891 }
892 }
893
894 void add_dir_to_tree(std::string path, std::unordered_map<std::string, std::set<std::string>> &tree, const std::string &lease_path) {
895 tree[path];
896 std::string parent_path = get_parent(path);
897 // recursively create any missing parents in the tree
898 // avoid creating a loop when we insert the root path
899 while (path != parent_path && path != lease_path && !tree[parent_path].count(path)) {
900 tree[parent_path].insert(path);
901 path = parent_path;
902 parent_path = get_parent(path);
903 }
904 }
905
906 int swissknife::IngestSQL::do_additions(
907 const DirMap &all_dirs, const FileMap &all_files, const SymlinkMap &all_symlinks, const std::string &lease_path, catalog::WritableCatalogManager &catalog_manager) {
908 // STEP 1:
909 // - collect all the dirs/symlinks/files we need to process from the DB
910 // - build a tree of paths for DFS traversal
911 // - note the tree will contain all parent dirs of symlinks/files even if those are not
912 // explicitly added to the dirs table
913 std::unordered_map<std::string, std::set<std::string>> tree;
914 for (auto&& p : all_dirs) {
915 add_dir_to_tree(p.first, tree, lease_path);
916 }
917 for (auto&& p : all_files) {
918 add_dir_to_tree(p.first, tree, lease_path);
919 }
920 for (auto&& p : all_symlinks) {
921 add_dir_to_tree(p.first, tree, lease_path);
922 }
923 int const row_count = static_cast<int>(tree.size());
924 int const print_every = calculate_print_frequency(row_count);
925 int curr_row = 0;
926 LogCvmfs(kLogCvmfs, kLogStdout, "Changeset: %ld dirs, %ld files, %ld symlinks", tree.size(), all_files.size(), all_symlinks.size());
927
928 // STEP 2:
929 // - process all the changes with DFS traversal
930 // - make directories in pre-order
931 // - add files/symlinks and schedule upload in post-order
932 catalog_manager.SetupSingleCatalogUploadCallback();
933 std::stack<string> dfs_stack;
934 for (auto&& p : tree) {
935 // figure out the starting point by checking whose parent is missing from the tree
936 if (p.first == "" || !tree.count(get_parent(p.first))) {
937 CUSTOM_ASSERT(dfs_stack.empty(), "provided DB input forms more than one path trees");
938 dfs_stack.push(p.first);
939 }
940 }
941 std::set<string> visited;
942 while (!dfs_stack.empty()) {
943 string const curr_dir = dfs_stack.top();
944 // add content for the dir in post-order traversal
945 if (visited.count(curr_dir)) {
946 curr_row++;
947 if (all_symlinks.count(curr_dir)) {
948 add_symlinks(catalog_manager, all_symlinks.at(curr_dir));
949 }
950 if (all_files.count(curr_dir)) {
951 add_files(catalog_manager, all_files.at(curr_dir));
952 }
953 // snapshot the dir (if it's a nested catalog mountpoint)
954 catalog::DirectoryEntry dir_entry;
955 bool exists = false;
956 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(curr_dir),
957 catalog::kLookupDefault, &dir_entry);
958 assert(exists); // the dir must exist at this point
959 if (dir_entry.IsNestedCatalogMountpoint() || dir_entry.IsNestedCatalogRoot()) {
960 catalog_manager.AddCatalogToQueue(curr_dir);
961 catalog_manager.ScheduleReadyCatalogs();
962 }
963 dfs_stack.pop();
964 SHOW_PROGRESS("directories", print_every, curr_row, row_count);
965 } else {
966 visited.insert(curr_dir);
967 // push children to the stack
968 auto it = tree.find(curr_dir);
969 if (it != tree.end()) {
970 for (auto&& child : it->second) {
971 dfs_stack.push(child);
972 }
973 tree.erase(it);
974 }
975 if (!all_dirs.count(curr_dir)) continue;
976
977 // create the dir first in pre-order traversal
978 const IngestSQL::Directory& dir = all_dirs.at(curr_dir);
979 catalog::DirectoryEntry dir_entry;
980
981 bool exists = false;
982 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(curr_dir),
983 catalog::kLookupDefault, &dir_entry);
984 CUSTOM_ASSERT(!(exists && !S_ISDIR(dir_entry.mode_)), "Refusing to replace existing file/symlink at %s with a directory", dir.name.c_str());
985
986 dir_entry.name_ = NameString(get_basename(dir.name));
987 dir_entry.mtime_ = dir.mtime / 1000000000;
988 dir_entry.mode_ = dir.mode | S_IFDIR;
989 dir_entry.mode_ &= (S_IFDIR | 0777);
990 dir_entry.uid_ = dir.owner;
991 dir_entry.gid_ = dir.grp;
992 dir_entry.has_xattrs_ = !dir.xattr.IsEmpty();
993
994 bool add_nested_catalog=false;
995
996 if (exists) {
997 catalog_manager.TouchDirectory(dir_entry, dir.xattr, dir.name);
998 if((!dir_entry.IsNestedCatalogMountpoint() && !dir_entry.IsNestedCatalogRoot()) && ( g_add_missing_catalogs || dir.nested ) ) {
999 add_nested_catalog=true;
1000 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Touching existing directory %s and adding nested catalog", dir.name.c_str());
1001 } else {
1002 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Touching existing directory %s", dir.name.c_str());
1003 }
1004 } else {
1005 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding directory [%s]", dir.name.c_str());
1006 catalog_manager.AddDirectory(dir_entry, dir.xattr, get_parent(dir.name));
1007 if(dir.nested) {add_nested_catalog=true;}
1008 }
1009 if (add_nested_catalog) {
1010 // now add a .cvmfscatalog file
1011 // so that manual changes won't remove the nested catalog
1012 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Placing .cvmfscatalog file in [%s]", dir.name.c_str());
1013 catalog::DirectoryEntryBase dir2;
1014 dir2.name_ = NameString(".cvmfscatalog");
1015 dir2.mtime_ = dir.mtime / 1000000000;
1016 dir2.mode_ = (S_IFREG | 0666);
1017 dir2.uid_ = 0;
1018 dir2.gid_ = 0;
1019 dir2.has_xattrs_ = 0;
1020 dir2.checksum_ = shash::MkFromHexPtr(
1021 shash::HexPtr("da39a3ee5e6b4b0d3255bfef95601890afd80709"),
1022 shash::kSuffixNone); // hash of ""
1023 XattrList const xattr2;
1024 catalog_manager.AddFile(dir2, xattr2, dir.name);
1025
1026 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Creating Nested Catalog [%s]", dir.name.c_str());
1027 catalog_manager.CreateNestedCatalog(dir.name);
1028 }
1029 }
1030 }
1031
1032 // sanity check that we have processed all the input
1033 CUSTOM_ASSERT(tree.empty(), "not all directories are processed, malformed input DB?");
1034 catalog_manager.RemoveSingleCatalogUploadCallback();
1035 return 0;
1036 }
1037
1038 int swissknife::IngestSQL::add_symlinks(
1039 catalog::WritableCatalogManager &catalog_manager, const std::vector<Symlink> &symlinks)
1040 {
1041 for (auto&& symlink : symlinks) {
1042 catalog::DirectoryEntry dir;
1043 catalog::DirectoryEntryBase dir2;
1044 XattrList const xattr;
1045 bool exists = false;
1046 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(symlink.name),
1047 catalog::kLookupDefault, &dir);
1048
1049 dir2.name_ = NameString(get_basename(symlink.name));
1050 dir2.mtime_ = symlink.mtime / 1000000000;
1051 dir2.uid_ = symlink.owner;
1052 dir2.gid_ = symlink.grp;
1053 dir2.has_xattrs_ = false;
1054 dir2.symlink_ = LinkString(symlink.target);
1055 dir2.mode_ = S_IFLNK | 0777;
1056
1057 int noop=false;
1058
1059 if (exists) {
1060 if(symlink.skip_if_file_or_dir) {
1061 if(S_ISDIR(dir.mode_) || S_ISREG(dir.mode_)) {
1062 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "File or directory for symlink [%s] exists, skipping symlink creation", symlink.name.c_str() );
1063 noop = true;
1064 } else if (S_ISLNK(dir.mode_)) {
1065 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing existing symlink [%s]",
1066 symlink.name.c_str());
1067 catalog_manager.RemoveFile(symlink.name);
1068 } else {
1069 CUSTOM_ASSERT(0, "unknown mode for dirent: %d", dir.mode_);
1070 }
1071 } else {
1072 CUSTOM_ASSERT(!S_ISDIR(dir.mode_), "Not removing directory [%s] to create symlink", symlink.name.c_str());
1073 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing existing file/symlink [%s]",
1074 symlink.name.c_str());
1075 catalog_manager.RemoveFile(symlink.name);
1076 }
1077 }
1078 if(!noop) {
1079 string const parent = get_parent(symlink.name);
1080 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding symlink [%s] -> [%s]", symlink.name.c_str(), symlink.target.c_str());
1081 catalog_manager.AddFile(dir2, xattr, parent);
1082 }
1083 }
1084 return 0;
1085 }
1086
1087 static int check_hash( const char*hash) {
1088 if (strlen(hash)!=40) { return 1;}
1089 for(int i=0; i<40; i++ ) {
1090 // < '0' || > 'f' || ( > '9' && < 'a' )
1091 if( hash[i]<0x30 || hash[i]>0x66 || ( hash[i]>0x39 && hash[i] <0x61 )) {
1092 return 1;
1093 }
1094 }
1095 return 0;
1096 }
1097
1098 bool check_prefix(const std::string &path , const std::string &prefix) {
1099 if (prefix=="" || prefix=="/") {return true;}
1100 if ( "/"+path == prefix ) { return true; }
1101 if(!HasPrefix( path, prefix, false)) {
1102 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Entry %s is outside lease path: %s", path.c_str(), prefix.c_str());
1103 return false;
1104 }
1105 return true;
1106 }
1107
1108 void swissknife::IngestSQL::load_dirs(sqlite3 *db, const std::string &lease_path, const std::string &additional_prefix, std::map<std::string, Directory> &all_dirs) {
1109 sqlite3_stmt *stmt;
1110 int const schema_revision = get_db_schema_revision(db);
1111 string select_stmt = "SELECT name, mode, mtime, owner, grp, acl, nested FROM dirs";
1112 if (schema_revision <= 3) {
1113 select_stmt = "SELECT name, mode, mtime, owner, grp, acl FROM dirs";
1114 }
1115 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1116 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1117 while (sqlite3_step(stmt) == SQLITE_ROW) {
1118 char *name_cstr = (char *)sqlite3_column_text(stmt, 0);
1119 mode_t const mode = sqlite3_column_int(stmt, 1);
1120 time_t const mtime = sqlite3_column_int64(stmt, 2);
1121 uid_t const owner = sqlite3_column_int(stmt, 3);
1122 gid_t const grp = sqlite3_column_int(stmt, 4);
1123 int const nested = schema_revision <= 3 ? 1 : sqlite3_column_int(stmt, 6);
1124
1125 string const name = additional_prefix + sanitise_name(name_cstr);
1126 CUSTOM_ASSERT(check_prefix(name, lease_path), "%s is not below lease path %s", name.c_str(), lease_path.c_str());
1127
1128 Directory dir(name, mtime, mode, owner, grp, nested);
1129 char *acl = (char *)sqlite3_column_text(stmt, 5);
1130 dir.xattr = marshal_xattrs(acl);
1131 all_dirs.insert(std::make_pair(name, dir));
1132 }
1133 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1134 }
1135
1136 void swissknife::IngestSQL::load_files(sqlite3 *db, const std::string &lease_path, const std::string &additional_prefix, std::map<std::string, std::vector<File>> &all_files) {
1137 sqlite3_stmt *stmt;
1138 int const schema_revision = get_db_schema_revision(db);
1139 string select_stmt = "SELECT name, mode, mtime, owner, grp, size, hashes, internal, compressed FROM files";
1140 if (schema_revision <= 2) {
1141 select_stmt = "SELECT name, mode, mtime, owner, grp, size, hashes, internal FROM files";
1142 }
1143 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1144 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1145 while (sqlite3_step(stmt) == SQLITE_ROW) {
1146 char *name = (char *)sqlite3_column_text(stmt, 0);
1147 mode_t const mode = sqlite3_column_int(stmt, 1);
1148 time_t const mtime = sqlite3_column_int64(stmt, 2);
1149 uid_t const owner = sqlite3_column_int(stmt, 3);
1150 gid_t const grp = sqlite3_column_int(stmt, 4);
1151 size_t const size = sqlite3_column_int64(stmt, 5);
1152 char *hashes_cstr = (char *)sqlite3_column_text(stmt, 6);
1153 int const internal = sqlite3_column_int(stmt, 7);
1154 int const compressed = schema_revision <= 2 ? 0 : sqlite3_column_int(stmt, 8);
1155
1156 string names = additional_prefix + sanitise_name(name);
1157 CUSTOM_ASSERT(check_prefix(names, lease_path), "%s is not below lease path %s", names.c_str(), lease_path.c_str());
1158 string const parent_dir = get_parent(names);
1159
1160 if (!all_files.count(parent_dir)) {
1161 all_files[parent_dir] = vector<swissknife::IngestSQL::File>();
1162 }
1163 all_files[parent_dir].emplace_back(std::move(names), mtime, size, owner, grp, mode, internal, compressed);
1164
1165 // tokenize hashes
1166 char *ref;
1167 char *tok;
1168 tok = strtok_r(hashes_cstr, ",", &ref);
1169 vector<off_t> offsets;
1170 vector<size_t> sizes;
1171 vector<shash::Any> hashes;
1172 off_t offset = 0;
1173
1174 CUSTOM_ASSERT(size>=0, "file size cannot be negative [%s]", names.c_str());
1175 size_t const kChunkSize = internal ? kInternalChunkSize : kExternalChunkSize;
1176
1177 while (tok) {
1178 offsets.push_back(offset);
1179 // TODO: check the hash format
1180 CUSTOM_ASSERT(check_hash(tok)==0, "provided hash for [%s] is invalid: %s", names.c_str(), tok);
1181 hashes.push_back(
1182 shash::MkFromHexPtr(shash::HexPtr(tok), shash::kSuffixNone));
1183 tok = strtok_r(NULL, ",", &ref);
1184 offset += kChunkSize; // in the future we might want variable chunk
1185 // sizes specified in the DB
1186 }
1187 size_t expected_num_chunks = size/kChunkSize;
1188 if (expected_num_chunks * (size_t)kChunkSize < (size_t) size || size==0 ) { expected_num_chunks++; }
1189 CUSTOM_ASSERT(offsets.size() == expected_num_chunks, "offsets size %ld does not match expected number of chunks %ld", offsets.size(), expected_num_chunks);
1190 for (size_t i = 0; i < offsets.size() - 1; i++) {
1191 sizes.push_back(size_t(offsets[i + 1] - offsets[i]));
1192 }
1193
1194 sizes.push_back(size_t(size - offsets[offsets.size() - 1]));
1195 for (size_t i = 0; i < offsets.size(); i++) {
1196 FileChunk const chunk = FileChunk(hashes[i], offsets[i], sizes[i]);
1197 all_files[parent_dir].back().chunks.PushBack(chunk);
1198 }
1199 }
1200 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1201 }
1202
1203 void swissknife::IngestSQL::load_symlinks(sqlite3 *db, const std::string &lease_path, const std::string &additional_prefix, std::map<std::string, std::vector<Symlink>> &all_symlinks) {
1204 sqlite3_stmt *stmt;
1205 string const select_stmt = "SELECT name, target, mtime, owner, grp, skip_if_file_or_dir FROM links";
1206 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1207 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1208 while (sqlite3_step(stmt) == SQLITE_ROW) {
1209 char *name_cstr = (char *)sqlite3_column_text(stmt, 0);
1210 char *target_cstr = (char *)sqlite3_column_text(stmt, 1);
1211 time_t const mtime = sqlite3_column_int64(stmt, 2);
1212 uid_t const owner = sqlite3_column_int(stmt, 3);
1213 gid_t const grp = sqlite3_column_int(stmt, 4);
1214 int const skip_if_file_or_dir = sqlite3_column_int(stmt, 5);
1215
1216 string names = additional_prefix + sanitise_name(name_cstr);
1217 CUSTOM_ASSERT(check_prefix(names, lease_path), "%s is not below lease path %s", names.c_str(), lease_path.c_str());
1218 string target= target_cstr;
1219 string const parent_dir = get_parent(names);
1220
1221 if (!all_symlinks.count(parent_dir)) {
1222 all_symlinks[parent_dir] = vector<swissknife::IngestSQL::Symlink>();
1223 }
1224 all_symlinks[parent_dir].emplace_back(std::move(names), std::move(target), mtime, owner, grp, skip_if_file_or_dir);
1225 }
1226 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1227 }
1228
1229 int swissknife::IngestSQL::add_files(
1230 catalog::WritableCatalogManager &catalog_manager, const std::vector<File> &files)
1231 {
1232 for (auto&& file : files) {
1233 catalog::DirectoryEntry dir;
1234 XattrList const xattr;
1235 bool exists = false;
1236 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(file.name),
1237 catalog::kLookupDefault, &dir);
1238
1239 dir.name_ = NameString(get_basename(file.name));
1240 dir.mtime_ = file.mtime / 1000000000;
1241 dir.mode_ = file.mode | S_IFREG;
1242 dir.mode_ &= (S_IFREG | 0777);
1243 dir.uid_ = file.owner;
1244 dir.gid_ = file.grp;
1245 dir.size_ = file.size;
1246 dir.has_xattrs_ = false;
1247 dir.is_external_file_ = !file.internal;
1248 dir.set_is_chunked_file(true);
1249 dir.checksum_ = shash::MkFromHexPtr(
1250 shash::HexPtr("0000000000000000000000000000000000000000"),
1251 shash::kSuffixNone);
1252
1253 // compression is permitted only for internal data
1254 CUSTOM_ASSERT(file.internal || (!file.internal && file.compressed<2), "compression is only allowed for internal data [%s]", file.name.c_str());
1255
1256 switch (file.compressed) {
1257 case 1: // Uncompressed
1258 dir.compression_algorithm_ = zlib::kNoCompression;
1259 break;
1260 case 2: // Compressed with Zlib
1261 dir.compression_algorithm_ = zlib::kZlibDefault;
1262 break;
1263 // future cases: different compression schemes
1264 default: // default behaviour: compressed if internal, content-addressed. Uncompressed if external
1265 dir.compression_algorithm_ = file.internal ? zlib::kZlibDefault : zlib::kNoCompression;
1266 }
1267
1268 if (exists) {
1269 CUSTOM_ASSERT(!S_ISDIR(dir.mode()) && !S_ISLNK(dir.mode()), "Refusing to replace existing dir/symlink at %s with a file", file.name.c_str());
1270 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing existing file [%s]", file.name.c_str());
1271 catalog_manager.RemoveFile(file.name);
1272 }
1273 string const parent = get_parent(file.name);
1274 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding chunked file [%s]", file.name.c_str());
1275 catalog_manager.AddChunkedFile(dir, xattr, parent, file.chunks);
1276 }
1277
1278 return 0;
1279 }
1280
1281 int swissknife::IngestSQL::do_deletions(
1282 sqlite3 *db, catalog::WritableCatalogManager &catalog_manager, const std::string &lease_path, const std::string &additional_prefix) {
1283 sqlite3_stmt *stmt;
1284 int const row_count = get_row_count(db, "deletions");
1285 int const print_every = calculate_print_frequency(row_count);
1286 int curr_row = 0;
1287 int ret = sqlite3_prepare_v2(db, "SELECT name, directory, file, link FROM deletions ORDER BY length(name) DESC", -1, &stmt, NULL);
1288 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1289 while (sqlite3_step(stmt) == SQLITE_ROW) {
1290 curr_row++;
1291
1292 char *name = (char *)sqlite3_column_text(stmt, 0);
1293 int64_t const isdir = sqlite3_column_int64(stmt, 1);
1294 int64_t const isfile = sqlite3_column_int64(stmt, 2);
1295 int64_t const islink = sqlite3_column_int64(stmt, 3);
1296
1297 string const names = additional_prefix + sanitise_name(name);
1298 CUSTOM_ASSERT(check_prefix( names, lease_path), "%s is not below lease path %s", names.c_str(), lease_path.c_str());
1299
1300 catalog::DirectoryEntry dirent;
1301 bool exists = false;
1302 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(names),
1303 catalog::kLookupDefault, &dirent);
1304 if(exists) {
1305 if( (isdir && S_ISDIR(dirent.mode()) )
1306 || (islink && S_ISLNK(dirent.mode()) )
1307 || (isfile && S_ISREG(dirent.mode()) ) ) {
1308 if (S_ISDIR(dirent.mode())) {
1309 PathString names_path(names);
1310 recursively_delete_directory(names_path, catalog_manager);
1311 } else {
1312 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing link/file [%s]", names.c_str());
1313 catalog_manager.RemoveFile(names);
1314 }
1315 } else {
1316 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Mismatch in deletion type, not deleting: [%s] (dir %ld/%d , link %ld/%d, file %ld/%d)", names.c_str(), isdir, S_ISDIR(dirent.mode()), islink, S_ISLNK(dirent.mode()), isfile, S_ISREG(dirent.mode()) );
1317 }
1318 } else {
1319 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Not Removing non-existent [%s]",
1320 names.c_str());
1321 }
1322
1323 SHOW_PROGRESS("deletions", print_every, curr_row, row_count);
1324 }
1325 ret = sqlite3_finalize(stmt);
1326 return ret;
1327 }
1328
1329 const char*schema[] = {
1330 "PRAGMA journal_mode=WAL;",
1331
1332 "CREATE TABLE IF NOT EXISTS dirs ( \
1333 name TEXT PRIMARY KEY, \
1334 mode INTEGER NOT NULL DEFAULT 493,\
1335 mtime INTEGER NOT NULL DEFAULT (unixepoch()),\
1336 owner INTEGER NOT NULL DEFAULT 0, \
1337 grp INTEGER NOT NULL DEFAULT 0, \
1338 acl TEXT NOT NULL DEFAULT '', \
1339 nested INTEGER DEFAULT 1);",
1340
1341 "CREATE TABLE IF NOT EXISTS files ( \
1342 name TEXT PRIMARY KEY, \
1343 mode INTEGER NOT NULL DEFAULT 420, \
1344 mtime INTEGER NOT NULL DEFAULT (unixepoch()),\
1345 owner INTEGER NOT NULL DEFAULT 0,\
1346 grp INTEGER NOT NULL DEFAULT 0,\
1347 size INTEGER NOT NULL DEFAULT 0,\
1348 hashes TEXT NOT NULL DEFAULT '',\
1349 internal INTEGER NOT NULL DEFAULT 0,\
1350 compressed INTEGER NOT NULL DEFAULT 0\
1351 );",
1352
1353 "CREATE TABLE IF NOT EXISTS links (\
1354 name TEXT PRIMARY KEY,\
1355 target TEXT NOT NULL DEFAULT '',\
1356 mtime INTEGER NOT NULL DEFAULT (unixepoch()),\
1357 owner INTEGER NOT NULL DEFAULT 0,\
1358 grp INTEGER NOT NULL DEFAULT 0,\
1359 skip_if_file_or_dir INTEGER NOT NULL DEFAULT 0\
1360 );",
1361
1362 "CREATE TABLE IF NOT EXISTS deletions (\
1363 name TEXT PRIMARY KEY,\
1364 directory INTEGER NOT NULL DEFAULT 0,\
1365 file INTEGER NOT NULL DEFAULT 0,\
1366 link INTEGER NOT NULL DEFAULT 0\
1367 );",
1368
1369 "CREATE TABLE IF NOT EXISTS properties (\
1370 key TEXT PRIMARY KEY,\
1371 value TEXT NOT NULL\
1372 );",
1373
1374 "INSERT INTO properties VALUES ('schema_revision', '4') ON CONFLICT DO NOTHING;",
1375 NULL
1376 };
1377
1378 static void create_empty_database( string& filename ) {
1379 sqlite3 *db_out;
1380 LogCvmfs(kLogCvmfs, kLogStdout, "Creating empty database file %s", filename.c_str());
1381 int ret = sqlite3_open_v2(filename.c_str(), &db_out, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL);
1382 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1383 relax_db_locking(db_out);
1384
1385 const char **ptr = schema;
1386 while (*ptr!=NULL) {
1387 ret = sqlite3_exec(db_out, *ptr, NULL, NULL, NULL);
1388 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1389 ptr++;
1390 }
1391 sqlite3_close(db_out);
1392 }
1393
1394 static void recursively_delete_directory(PathString& path, catalog::WritableCatalogManager &catalog_manager) {
1395 catalog::DirectoryEntryList const listing;
1396
1397 // Add all names
1398 catalog::StatEntryList listing_from_catalog;
1399 bool const retval = catalog_manager.ListingStat(PathString( "/" + path.ToString()), &listing_from_catalog);
1400
1401 CUSTOM_ASSERT(retval, "failed to call ListingStat for %s", path.c_str());
1402
1403 if(!catalog_manager.IsTransitionPoint(path.ToString())) {
1404
1405 for (unsigned i = 0; i < listing_from_catalog.size(); ++i) {
1406 PathString entry_path;
1407 entry_path.Assign(path);
1408 entry_path.Append("/", 1);
1409 entry_path.Append(listing_from_catalog.AtPtr(i)->name.GetChars(),
1410 listing_from_catalog.AtPtr(i)->name.GetLength());
1411
1412 if( S_ISDIR( listing_from_catalog.AtPtr(i)->info.st_mode) ) {
1413 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Recursing into %s/", entry_path.ToString().c_str());
1414 recursively_delete_directory(entry_path, catalog_manager);
1415
1416
1417 } else {
1418 LogCvmfs(kLogCvmfs, kLogVerboseMsg, " Recursively removing %s", entry_path.ToString().c_str());
1419 catalog_manager.RemoveFile(entry_path.ToString());
1420 }
1421
1422 }
1423 } else {
1424 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing nested catalog %s", path.ToString().c_str());
1425 catalog_manager.RemoveNestedCatalog(path.ToString(), false);
1426 }
1427 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing directory %s", path.ToString().c_str());
1428 catalog_manager.RemoveDirectory(path.ToString());
1429
1430 }
1431
1432
1433 static void relax_db_locking(sqlite3 *db) {
1434 int ret=0;
1435 ret = sqlite3_exec(db, "PRAGMA temp_store=2", NULL, NULL, NULL);
1436 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1437 ret = sqlite3_exec(db, "PRAGMA synchronous=OFF", NULL, NULL, NULL);
1438 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1439 }
1440
1441
1442 extern "C" void* lease_refresh_thread(void *payload) {
1443 while( !g_stop_refresh ) {
1444 sleep(2);
1445 refresh_lease();
1446 }
1447 return NULL;
1448 }
1449
1450 static bool isDatabaseMarkedComplete(const char *dbfile) {
1451 int ret;
1452 sqlite3 *db;
1453 sqlite3_stmt *stmt;
1454 bool retval=false;
1455
1456 ret = sqlite3_open(dbfile, &db);
1457 if (ret!=SQLITE_OK) { return false; }
1458
1459 const char *req = "SELECT value FROM properties WHERE key='completed_graft'";
1460 ret = sqlite3_prepare_v2(db, req, -1, &stmt, NULL);
1461 if (ret!=SQLITE_OK) {
1462 return false;
1463 }
1464 if(sqlite3_step(stmt) == SQLITE_ROW) {
1465 int const id = sqlite3_column_int(stmt,0);
1466 if(id>0) {
1467 retval=true;
1468 }
1469 }
1470 sqlite3_close(db);
1471 return retval;
1472 }
1473
1474 static void setDatabaseMarkedComplete(const char *dbfile) {
1475 int ret;
1476 sqlite3 *db;
1477 char *err;
1478
1479 ret = sqlite3_open(dbfile, &db);
1480 if (ret!=SQLITE_OK) { return; }
1481
1482 const char *req = "INSERT INTO properties (key, value) VALUES ('completed_graft',1) ON CONFLICT(key) DO UPDATE SET value=1 WHERE key='completed_graft'";
1483
1484 ret = sqlite3_exec(db, req, 0, 0, &err);
1485 if (ret!=SQLITE_OK) {
1486 return;
1487 }
1488 sqlite3_close(db);
1489 }
1490
1491