GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_ingestsql.cc
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 0 841 0.0%
Branches: 0 558 0.0%

Line Branch Exec Source
1 #include "swissknife_ingestsql.h"
2
3 #include <fcntl.h>
4 #include <grp.h>
5 #include <pwd.h>
6 #include <stdio.h>
7 #include <sys/resource.h>
8 #include <sys/time.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11
12 #include <csignal>
13 #include <cstdlib>
14 #include <fstream>
15 #include <sstream>
16 #include <stack>
17 #include <unordered_map>
18 #include <unordered_set>
19
20 #include "acl.h"
21 #include "catalog_downloader.h"
22 #include "catalog_mgr_rw.h"
23 #include "curl/curl.h"
24 #include "gateway_util.h"
25 #include "shortstring.h"
26 #include "swissknife_lease_curl.h"
27 #include "swissknife_lease_json.h"
28 #include "swissknife_sync.h"
29 #include "upload.h"
30 #include "util/logging.h"
31
32 #define CHECK_SQLITE_ERROR(ret, expected) \
33 do { \
34 if ((ret) != expected) { \
35 LogCvmfs(kLogCvmfs, kLogStderr, "SQLite error: %d", (ret)); \
36 assert(0); \
37 } \
38 } while (0)
39
40 #define CUSTOM_ASSERT(check, msg, ...) \
41 do { \
42 if (!(check)) { \
43 LogCvmfs(kLogCvmfs, kLogStderr, msg, ##__VA_ARGS__); \
44 assert(0); \
45 } \
46 } while (0)
47
48 #define SHOW_PROGRESS(item, freq, curr, total) \
49 do { \
50 if ((curr) % freq == 0 || (curr) == total) { \
51 LogCvmfs(kLogCvmfs, kLogStdout, "Processed %d/%d %s", (curr), total, \
52 item); \
53 } \
54 } while (0)
55
56
57 static const unsigned kExternalChunkSize = 24 * 1024 * 1024;
58 static const unsigned kInternalChunkSize = 6 * 1024 * 1024;
59 static const unsigned kDefaultLeaseBusyRetryInterval = 10;
60 static const unsigned kLeaseRefreshInterval = 90; // seconds
61
62
63 static bool g_lease_acquired = false;
64 static string g_gateway_url;
65 static string g_gateway_key_id;
66 static string g_gateway_secret;
67 static string g_session_token;
68 static string g_session_token_file;
69 static string g_s3_file;
70 static time_t g_last_lease_refresh = 0;
71 static bool g_stop_refresh = false;
72 static int64_t g_priority = 0;
73 static bool g_add_missing_catalogs = false;
74 static string get_lease_from_paths(vector<string> paths);
75 static vector<string> get_all_dirs_from_sqlite(vector<string> &sqlite_db_vec,
76 bool include_additions,
77 bool include_deletions);
78 static string get_parent(const string &path);
79 static string get_basename(const string &path);
80
81 static XattrList marshal_xattrs(const char *acl);
82 static string sanitise_name(const char *name_cstr, bool allow_leading_slash);
83 static void on_signal(int sig);
84 static string acquire_lease(const string &key_id, const string &secret,
85 const string &lease_path,
86 const string &repo_service_url,
87 bool force_cancel_lease, uint64_t *current_revision,
88 string &current_root_hash,
89 unsigned int refresh_interval);
90 static void cancel_lease();
91 static void refresh_lease();
92 static vector<string> get_file_list(string &path);
93 static int check_hash(const char *hash);
94 static void recursively_delete_directory(
95 PathString &path, catalog::WritableCatalogManager &catalog_manager);
96 static void create_empty_database(string &filename);
97 static void relax_db_locking(sqlite3 *db);
98 static bool check_prefix(const std::string &path, const std::string &prefix);
99
100 static bool isDatabaseMarkedComplete(const char *dbfile);
101 static void setDatabaseMarkedComplete(const char *dbfile);
102
103 extern "C" void *lease_refresh_thread(void *payload);
104
105 static string sanitise_name(const char *name_cstr,
106 bool allow_leading_slash = false) {
107 int reason = 0;
108 const char *c = name_cstr;
109 while (*c == '/') {
110 c++;
111 } // strip any leading slashes
112 string const name = string(c);
113 bool ok = true;
114
115 if (!allow_leading_slash && HasPrefix(name, "/", true)) {
116 reason = 1;
117 ok = false;
118 }
119 if (HasSuffix(name, "/", true)) {
120 if (!(allow_leading_slash
121 && name.size() == 1)) { // account for the case where name=="/"
122 reason = 2;
123 ok = false;
124 }
125 }
126 if (name.find("//") != string::npos) {
127 reason = 3;
128 ok = false;
129 }
130 if (HasPrefix(name, "./", true) || HasPrefix(name, "../", true)) {
131 reason = 4;
132 ok = false;
133 }
134 if (HasSuffix(name, "/.", true) || HasSuffix(name, "/..", true)) {
135 reason = 5;
136 ok = false;
137 }
138 if (name.find("/./") != string::npos || name.find("/../") != string::npos) {
139 reason = 6;
140 ok = false;
141 }
142 if (name == "") {
143 reason = 7;
144 ok = false;
145 }
146 CUSTOM_ASSERT(ok, "Name [%s] is invalid (reason %d)", name.c_str(), reason);
147 return string(name);
148 }
149
150 static string get_parent(const string &path) {
151 size_t const found = path.find_last_of('/');
152 if (found == string::npos) {
153 return string("");
154 }
155 return path.substr(0, found);
156 }
157
158 static string get_basename(const string &path) {
159 const size_t found = path.find_last_of('/');
160 if (found == string::npos) {
161 return path;
162 }
163 return path.substr(found + 1);
164 }
165
166 // this is copied from MakeRelativePath
167 static string MakeCatalogPath(const std::string &relative_path) {
168 return (relative_path == "") ? "" : "/" + relative_path;
169 }
170
171 static string acquire_lease(const string &key_id, const string &secret,
172 const string &lease_path,
173 const string &repo_service_url,
174 bool force_cancel_lease, uint64_t *current_revision,
175 string &current_root_hash,
176 unsigned int refresh_interval) {
177 const CURLcode ret = curl_global_init(CURL_GLOBAL_ALL);
178 CUSTOM_ASSERT(ret == CURLE_OK, "failed to init curl");
179
180 string gateway_metadata_str;
181 char *gateway_metadata = getenv("CVMFS_GATEWAY_METADATA");
182 if (gateway_metadata != NULL)
183 gateway_metadata_str = gateway_metadata;
184
185 while (true) {
186 CurlBuffer buffer;
187 if (MakeAcquireRequest(key_id, secret, lease_path, repo_service_url,
188 &buffer, gateway_metadata_str)) {
189 string session_token;
190
191 const LeaseReply rep = ParseAcquireReplyWithRevision(
192 buffer, &session_token, current_revision, current_root_hash);
193 switch (rep) {
194 case kLeaseReplySuccess:
195 g_lease_acquired = true;
196 g_last_lease_refresh = time(NULL);
197 return session_token;
198 break;
199 case kLeaseReplyBusy:
200 if (force_cancel_lease) {
201 LogCvmfs(kLogCvmfs, kLogStderr,
202 "Lease busy, forcing cancellation (TODO");
203 }
204 LogCvmfs(kLogCvmfs, kLogStderr, "Lease busy, retrying in %d sec",
205 refresh_interval);
206 sleep(refresh_interval);
207 break;
208 default:
209 LogCvmfs(kLogCvmfs, kLogStderr,
210 "Error acquiring lease: %s. Retrying in %d sec",
211 buffer.data.c_str(), refresh_interval);
212 sleep(refresh_interval);
213 }
214 } else {
215 LogCvmfs(kLogCvmfs, kLogStderr,
216 "Error making lease acquisition request. Retrying in %d sec",
217 refresh_interval);
218 sleep(refresh_interval);
219 }
220 }
221 assert(false);
222 return "";
223 }
224
225 static uint64_t make_commit_on_gateway(const std::string &old_root_hash,
226 const std::string &new_root_hash,
227 int64_t priority) {
228 CurlBuffer buffer;
229 char priorityStr[100];
230 (void)sprintf(priorityStr, "%" PRId64,
231 priority); // skipping return value check; no way such large
232 // buffer will overflow
233 buffer.data = "";
234
235 const std::string payload = "{\n\"old_root_hash\": \"" + old_root_hash
236 + "\",\n\"new_root_hash\": \"" + new_root_hash
237 + "\",\n\"priority\": " + priorityStr + "}";
238
239 return MakeEndRequest("POST", g_gateway_key_id, g_gateway_secret,
240 g_session_token, g_gateway_url, payload, &buffer, true);
241 }
242
243 static void refresh_lease() {
244 CurlBuffer buffer;
245 buffer.data = "";
246 if ((time(NULL) - g_last_lease_refresh) < kLeaseRefreshInterval) {
247 return;
248 }
249
250 if (MakeEndRequest("PATCH", g_gateway_key_id, g_gateway_secret,
251 g_session_token, g_gateway_url, "", &buffer, false)) {
252 const int ret = ParseDropReply(buffer);
253 if (kLeaseReplySuccess == ret) {
254 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Lease refreshed");
255 g_last_lease_refresh = time(NULL);
256 } else {
257 LogCvmfs(kLogCvmfs, kLogStderr, "Lease refresh failed: %d", ret);
258 }
259 } else {
260 LogCvmfs(kLogCvmfs, kLogStderr, "Lease refresh request failed");
261 if (buffer.data == "Method Not Allowed\n") {
262 g_last_lease_refresh = time(NULL);
263 LogCvmfs(kLogCvmfs, kLogStderr,
264 "This gateway does not support lease refresh");
265 }
266 }
267 }
268
269
270 static void cancel_lease() {
271 CurlBuffer buffer;
272 if (MakeEndRequest("DELETE", g_gateway_key_id, g_gateway_secret,
273 g_session_token, g_gateway_url, "", &buffer, false)) {
274 const int ret = ParseDropReply(buffer);
275 if (kLeaseReplySuccess == ret) {
276 LogCvmfs(kLogCvmfs, kLogStdout, "Lease cancelled");
277 } else {
278 LogCvmfs(kLogCvmfs, kLogStderr, "Lease cancellation failed: %d", ret);
279 }
280 } else {
281 LogCvmfs(kLogCvmfs, kLogStderr, "Lease cancellation request failed");
282 }
283 g_stop_refresh = true;
284 }
285
286 static void on_signal(int sig) {
287 (void)signal(sig, SIG_DFL);
288 if (g_lease_acquired) {
289 LogCvmfs(kLogCvmfs, kLogStdout, "Cancelling lease");
290 cancel_lease();
291 unlink(g_session_token_file.c_str());
292 }
293 if (sig == SIGINT || sig == SIGTERM)
294 exit(1);
295 }
296
297 static vector<string> get_all_dirs_from_sqlite(vector<string> &sqlite_db_vec,
298 bool include_additions,
299 bool include_deletions) {
300 int ret;
301 vector<string> paths;
302
303 for (vector<string>::iterator it = sqlite_db_vec.begin();
304 it != sqlite_db_vec.end();
305 it++) {
306 sqlite3 *db;
307 ret = sqlite3_open_v2((*it).c_str(), &db, SQLITE_OPEN_READONLY, NULL);
308 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
309 relax_db_locking(db);
310
311 vector<string> tables;
312 if (include_additions) {
313 tables.push_back("dirs");
314 tables.push_back("links");
315 tables.push_back("files");
316 }
317 if (include_deletions) {
318 tables.push_back("deletions");
319 }
320
321 // get all the paths from the DB
322 for (vector<string>::iterator it = tables.begin(); it != tables.end();
323 it++) {
324 sqlite3_stmt *stmt;
325 const string query = "SELECT name FROM " + *it;
326 ret = sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, NULL);
327 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
328 while (sqlite3_step(stmt) == SQLITE_ROW) {
329 const char *name = reinterpret_cast<const char *>(
330 sqlite3_column_text(stmt, 0));
331 const string names = sanitise_name(name);
332 if (*it == "dirs") {
333 paths.push_back(names);
334 } else {
335 paths.push_back(get_parent(names));
336 }
337 }
338 ret = sqlite3_finalize(stmt);
339 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
340 }
341 ret = sqlite3_close_v2(db);
342 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
343 }
344 return paths;
345 }
346
347 static int get_db_schema_revision(sqlite3 *db,
348 const std::string &db_name = "") {
349 sqlite3_stmt *stmt;
350 std::ostringstream stmt_str;
351 stmt_str << "SELECT value FROM " << db_name
352 << "properties WHERE key = 'schema_revision'";
353 int ret = sqlite3_prepare_v2(db, stmt_str.str().c_str(), -1, &stmt, NULL);
354 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
355
356 ret = sqlite3_step(stmt);
357 // if table exists, we require that it must have a schema_revision row
358 CHECK_SQLITE_ERROR(ret, SQLITE_ROW);
359 const std::string schema_revision_str(
360 reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0)));
361 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
362 return std::stoi(schema_revision_str);
363 }
364
365 static int get_row_count(sqlite3 *db, const std::string &table_name) {
366 sqlite3_stmt *stmt;
367 std::ostringstream stmt_str;
368 stmt_str << "SELECT COUNT(*) FROM " << table_name;
369 int ret = sqlite3_prepare_v2(db, stmt_str.str().c_str(), -1, &stmt, NULL);
370 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
371
372 ret = sqlite3_step(stmt);
373 CHECK_SQLITE_ERROR(ret, SQLITE_ROW);
374 const std::string count_str(
375 reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0)));
376 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
377 return std::stoi(count_str);
378 }
379
380 static int calculate_print_frequency(int total) {
381 int base = 1000;
382 while (base * 50 < total)
383 base *= 10;
384 return base;
385 }
386
387 // compute a common path among for paths for use as the lease path
388 static string get_lease_from_paths(vector<string> paths) {
389 CUSTOM_ASSERT(!paths.empty(), "no paths are provided");
390
391 // we'd have to ensure path is relative
392 // (probably best to check this elsewhere as it is not just a requirement for
393 // this function)
394 auto lease = PathString(paths.at(0));
395 for (auto it = paths.begin() + 1; it != paths.end(); ++it) {
396 auto path = PathString(*it);
397 // shrink the lease path until it is a parent of "path"
398 while (!IsSubPath(lease, path)) {
399 auto i = lease.GetLength() - 1;
400 for (; i >= 0; --i) {
401 if (lease.GetChars()[i] == '/' || i == 0) {
402 lease.Truncate(i);
403 break;
404 }
405 }
406 }
407 if (lease.IsEmpty())
408 break; // early stop if lease is already at the root
409 }
410
411 auto prefix = "/" + lease.ToString();
412
413 LogCvmfs(kLogCvmfs, kLogStdout, "Longest prefix is %s", prefix.c_str());
414 return prefix;
415 }
416
417 static XattrList marshal_xattrs(const char *acl_string) {
418 XattrList aclobj;
419
420 if (acl_string == NULL || acl_string[0] == '\0') {
421 return aclobj;
422 }
423
424 bool equiv_mode;
425 size_t binary_size;
426 char *binary_acl;
427 const int ret = acl_from_text_to_xattr_value(string(acl_string), binary_acl,
428 binary_size, equiv_mode);
429 if (ret) {
430 LogCvmfs(kLogCvmfs, kLogStderr,
431 "failure of acl_from_text_to_xattr_value(%s)", acl_string);
432 assert(
433 0); // TODO(vavolkl): incorporate error handling other than asserting
434 return aclobj;
435 }
436 if (!equiv_mode) {
437 CUSTOM_ASSERT(
438 aclobj.Set("system.posix_acl_access", string(binary_acl, binary_size)),
439 "failed to set system.posix_acl_access (ACL size %ld)", binary_size);
440 free(binary_acl);
441 }
442
443 return aclobj;
444 }
445
446 std::unordered_map<string, string> load_config(const string &config_file) {
447 std::unordered_map<string, string> config_map;
448 ifstream input(config_file);
449 if (!input) {
450 LogCvmfs(kLogCvmfs, kLogStderr, "could not open config file %s",
451 config_file.c_str());
452 return config_map;
453 }
454 vector<string> lines;
455 for (string line; getline(input, line);) {
456 lines.push_back(line);
457 }
458
459 for (auto it = lines.begin(); it != lines.end(); it++) {
460 const string l = *it;
461 const size_t p = l.find('=', 0);
462 if (p != string::npos) {
463 const string key = l.substr(0, p);
464 string val = l.substr(p + 1);
465 // trim any double quotes
466 if (val.front() == '"') {
467 val = val.substr(1, val.length() - 2);
468 }
469 config_map[key] = val;
470 }
471 }
472
473 return config_map;
474 }
475
476 string retrieve_config(std::unordered_map<string, string> &config_map,
477 const string &key) {
478 auto kv = config_map.find(key);
479 CUSTOM_ASSERT(kv != config_map.end(), "Parameter %s not found in config",
480 key.c_str());
481 return kv->second;
482 }
483
484 static vector<string> get_file_list(string &path) {
485 vector<string> paths;
486 const char *cpath = path.c_str();
487 struct stat st;
488 const int ret = stat(cpath, &st);
489 CUSTOM_ASSERT(ret == 0, "failed to stat file %s", cpath);
490
491 if (S_ISDIR(st.st_mode)) {
492 DIR *d;
493 struct dirent *dir;
494 d = opendir(cpath);
495 if (d) {
496 while ((dir = readdir(d)) != NULL) {
497 const char *t = strrchr(dir->d_name, '.');
498 if (t && !strcmp(t, ".db")) {
499 paths.push_back(path + "/" + dir->d_name);
500 }
501 }
502 closedir(d);
503 }
504 } else {
505 paths.push_back(path);
506 }
507 return paths;
508 }
509
510 extern bool g_log_with_time;
511
512 int swissknife::IngestSQL::Main(const swissknife::ArgumentList &args) {
513 // the catalog code uses assert() liberally.
514 // install ABRT signal handler to catch an abort and cancel lease
515 if (signal(SIGABRT, &on_signal) == SIG_ERR
516 || signal(SIGINT, &on_signal) == SIG_ERR
517 || signal(SIGTERM, &on_signal) == SIG_ERR) {
518 LogCvmfs(kLogCvmfs, kLogStdout, "Setting signal handlers failed");
519 exit(1);
520 }
521
522 const bool enable_corefiles = (args.find('c') != args.end());
523 if (!enable_corefiles) {
524 struct rlimit rlim;
525 rlim.rlim_cur = rlim.rlim_max = 0;
526 setrlimit(RLIMIT_CORE, &rlim);
527 }
528
529
530 if (args.find('n') != args.end()) {
531 create_empty_database(*args.find('n')->second);
532 exit(0);
533 }
534
535 // TODO(@vvolkl): add 'B' option to wait_for_update
536 // TODO(@vvolkl): add 'T' option for ttl
537
538
539 if (args.find('P') != args.end()) {
540 const char *arg = (*args.find('P')->second).c_str();
541 char *at_null_terminator_if_number;
542 g_priority = strtoll(arg, &at_null_terminator_if_number, 10);
543 if (*at_null_terminator_if_number != '\0') {
544 LogCvmfs(kLogCvmfs, kLogStderr,
545 "Priority parameter value '%s' parsing failed", arg);
546 return 1;
547 }
548 } else {
549 g_priority = -time(NULL);
550 }
551
552
553 unsigned int lease_busy_retry_interval = kDefaultLeaseBusyRetryInterval;
554 if (args.find('r') != args.end()) {
555 lease_busy_retry_interval = atoi((*args.find('r')->second).c_str());
556 }
557
558 string dir_temp = "";
559 const char *env_tmpdir;
560 if (args.find('t') != args.end()) {
561 dir_temp = MakeCanonicalPath(*args.find('t')->second);
562 } else if ((env_tmpdir = getenv("TMPDIR"))) {
563 dir_temp = MakeCanonicalPath(env_tmpdir);
564 } else {
565 LogCvmfs(kLogCvmfs, kLogStderr, "-t or TMPDIR required");
566 return 1;
567 }
568
569 string kConfigDir("/etc/cvmfs/gateway-client/");
570 if (args.find('C') != args.end()) {
571 kConfigDir = MakeCanonicalPath(*args.find('C')->second);
572 kConfigDir += "/";
573 LogCvmfs(kLogCvmfs, kLogStdout, "Overriding configuration dir prefix to %s",
574 kConfigDir.c_str());
575 }
576
577 // mandatory arguments
578 string const repo_name = *args.find('N')->second;
579 string sqlite_db_path = *args.find('D')->second;
580
581 vector<string> sqlite_db_vec = get_file_list(sqlite_db_path);
582
583 // optional arguments
584 bool const allow_deletions = (args.find('d') != args.end());
585 bool const force_cancel_lease = (args.find('x') != args.end());
586 bool const allow_additions = !allow_deletions
587 || (args.find('a') != args.end());
588 g_add_missing_catalogs = (args.find('z') != args.end());
589 bool const check_completed_graft_property = (args.find('Z') != args.end());
590 if (args.find('v') != args.end()) {
591 SetLogVerbosity(kLogVerbose);
592 }
593
594 if (check_completed_graft_property) {
595 if (sqlite_db_vec.size() != 1) {
596 LogCvmfs(kLogCvmfs, kLogStderr, "-Z requires a single DB file");
597 exit(1);
598 }
599 if (isDatabaseMarkedComplete(sqlite_db_vec[0].c_str())) {
600 LogCvmfs(kLogCvmfs, kLogStderr,
601 "DB file is already marked as completed_graft");
602 exit(0);
603 } else {
604 LogCvmfs(kLogCvmfs, kLogStderr,
605 "DB file is not marked as completed_graft");
606 }
607 }
608
609 string const config_file = kConfigDir + repo_name + "/config";
610 string stratum0;
611 string proxy;
612
613 string additional_prefix = "";
614 bool has_additional_prefix = false;
615 if (args.find('p') != args.end()) {
616 additional_prefix = *args.find('p')->second;
617 additional_prefix = sanitise_name(additional_prefix.c_str(), true);
618 if (additional_prefix.back() != '/') {
619 additional_prefix += "/";
620 }
621 has_additional_prefix = true;
622 LogCvmfs(kLogCvmfs, kLogStdout,
623 "Adding additional prefix %s to lease and all paths",
624 additional_prefix.c_str());
625 // now we are confident that any additional prefix has no leading / and does
626 // have a tailing /
627 }
628 auto config_map = load_config(config_file);
629
630 if (args.find('g') != args.end()) {
631 g_gateway_url = *args.find('g')->second;
632 } else {
633 g_gateway_url = retrieve_config(config_map, "CVMFS_GATEWAY");
634 }
635 if (args.find('w') != args.end()) {
636 stratum0 = *args.find('w')->second;
637 } else {
638 stratum0 = retrieve_config(config_map, "CVMFS_STRATUM0");
639 }
640
641 if (args.find('@') != args.end()) {
642 proxy = *args.find('@')->second;
643 } else {
644 proxy = retrieve_config(config_map, "CVMFS_HTTP_PROXY");
645 }
646
647 string lease_path = "";
648 // bool lease_autodetected = false;
649 if (args.find('l') != args.end()) {
650 lease_path = *args.find('l')->second;
651 } else {
652 // lease path wasn't specified, so try to autodetect it
653 vector<string> const paths = get_all_dirs_from_sqlite(
654 sqlite_db_vec, allow_additions, allow_deletions);
655 if (paths.size() == 0) {
656 LogCvmfs(kLogCvmfs, kLogStdout, "Database is empty, nothing to do");
657 return 0; // treat it as a success
658 }
659 lease_path = get_lease_from_paths(paths);
660 // lease_autodetected = true;
661 }
662
663 if (has_additional_prefix) {
664 if (lease_path == "/") {
665 lease_path = "/" + additional_prefix;
666 } else {
667 if (lease_path.substr(0, 1) == "/") {
668 lease_path = "/" + additional_prefix
669 + lease_path.substr(1, lease_path.size() - 1);
670 } else {
671 lease_path = "/" + additional_prefix
672 + lease_path; // prefix is certain to have a trailing /
673 }
674 }
675 }
676 if (lease_path.substr(0, 1) != "/") {
677 lease_path = "/" + lease_path;
678 }
679 LogCvmfs(kLogCvmfs, kLogStdout, "Lease path is %s", lease_path.c_str());
680
681
682 string public_keys = kConfigDir + repo_name + "/pubkey";
683 string key_file = kConfigDir + repo_name + "/gatewaykey";
684 string s3_file = kConfigDir + repo_name + "/s3.conf";
685
686 if (args.find('k') != args.end()) {
687 public_keys = *args.find('k')->second;
688 }
689 if (args.find('s') != args.end()) {
690 key_file = *args.find('s')->second;
691 }
692 if (args.find('3') != args.end()) {
693 s3_file = *args.find('3')->second;
694 }
695
696 CUSTOM_ASSERT(access(public_keys.c_str(), R_OK) == 0, "%s is not readable",
697 public_keys.c_str());
698 CUSTOM_ASSERT(access(key_file.c_str(), R_OK) == 0, "%s is not readable",
699 key_file.c_str());
700
701 // string spooler_definition_string = string("gw,,") + g_gateway_url;
702 // create a spooler that will upload to S3
703 string const spooler_definition_string = string("S3,") + dir_temp + ","
704 + repo_name + "@" + s3_file;
705
706 // load gateway lease
707 if (!gateway::ReadKeys(key_file, &g_gateway_key_id, &g_gateway_secret)) {
708 LogCvmfs(kLogCvmfs, kLogStderr, "gateway::ReadKeys failed");
709 return 1;
710 }
711
712 uint64_t current_revision = 0;
713 std::string current_root_hash = "";
714
715 // acquire lease and save token to a file in the tmpdir
716 LogCvmfs(kLogCvmfs, kLogStdout, "Acquiring gateway lease on %s",
717 lease_path.c_str());
718 g_session_token = acquire_lease(g_gateway_key_id, g_gateway_secret,
719 repo_name + lease_path, g_gateway_url,
720 force_cancel_lease, &current_revision,
721 current_root_hash, lease_busy_retry_interval);
722
723
724 char *_tmpfile = strdup((dir_temp + "/gateway_session_token_XXXXXX").c_str());
725 int const temp_fd = mkstemp(_tmpfile);
726 g_session_token_file = string(_tmpfile);
727 free(_tmpfile);
728
729 FILE *fout = fdopen(temp_fd, "wb");
730 CUSTOM_ASSERT(fout != NULL,
731 "failed to open session token file %s for writing",
732 g_session_token_file.c_str());
733 fputs(g_session_token.c_str(), fout);
734 fclose(fout);
735
736 // now start the lease refresh thread
737 pthread_t lease_thread;
738 if (0 != pthread_create(&lease_thread, NULL, lease_refresh_thread, NULL)) {
739 LogCvmfs(kLogCvmfs, kLogStderr, "Unable to start lease refresh thread");
740 cancel_lease();
741 return 1;
742 }
743
744 // now initialise the various bits we need
745
746 upload::SpoolerDefinition spooler_definition(
747 spooler_definition_string, shash::kSha1, zlib::kZlibDefault, false, true,
748 SyncParameters::kDefaultMinFileChunkSize,
749 SyncParameters::kDefaultAvgFileChunkSize,
750 SyncParameters::kDefaultMaxFileChunkSize, g_session_token_file, key_file);
751
752 if (args.find('q') != args.end()) {
753 spooler_definition.number_of_concurrent_uploads = String2Uint64(
754 *args.find('q')->second);
755 }
756
757 upload::SpoolerDefinition const spooler_definition_catalogs(
758 spooler_definition.Dup2DefaultCompression());
759
760 UniquePtr<upload::Spooler> const spooler_catalogs(
761 upload::Spooler::Construct(spooler_definition_catalogs, nullptr));
762
763 if (!spooler_catalogs.IsValid()) {
764 LogCvmfs(kLogCvmfs, kLogStderr, "spooler_catalogs invalid");
765 cancel_lease();
766 return 1;
767 }
768 if (!InitDownloadManager(true, proxy, kCatalogDownloadMultiplier)) {
769 LogCvmfs(kLogCvmfs, kLogStderr, "download manager init failed");
770 cancel_lease();
771 return 1;
772 }
773 if (!InitSignatureManager(public_keys, "")) {
774 LogCvmfs(kLogCvmfs, kLogStderr, "signature manager init failed");
775 cancel_lease();
776 return 1;
777 }
778
779 UniquePtr<manifest::Manifest> manifest;
780
781 manifest = FetchRemoteManifest(stratum0, repo_name, shash::Any());
782
783 if (!manifest.IsValid()) {
784 LogCvmfs(kLogCvmfs, kLogStderr, "manifest invalid");
785 cancel_lease();
786 return 1;
787 }
788
789 if (current_revision > 0) {
790 if (current_revision == manifest->revision()) {
791 if (current_root_hash != manifest->catalog_hash().ToString()) {
792 LogCvmfs(kLogCvmfs, kLogStderr,
793 "Mismatch between cvmfspublished and gateway hash for "
794 "revision %lu (%s!=%s)",
795 current_revision, current_root_hash.c_str(),
796 manifest->catalog_hash().ToString().c_str());
797 cancel_lease();
798 return 1;
799 } else {
800 LogCvmfs(kLogCvmfs, kLogStdout,
801 "Gateway and .cvmfspublished agree on repo version %lu",
802 current_revision);
803 }
804 }
805 if (current_revision > manifest->revision()) {
806 LogCvmfs(kLogCvmfs, kLogStdout,
807 "Gateway has supplied a newer revision than the current "
808 ".cvmfspublished %lu > %lu",
809 current_revision, manifest->revision());
810 manifest->set_revision(current_revision);
811 manifest->set_catalog_hash(shash::MkFromHexPtr(
812 shash::HexPtr(current_root_hash), shash::kSuffixCatalog));
813 } else if (current_revision < manifest->revision()) {
814 LogCvmfs(kLogCvmfs, kLogStdout,
815 "Gateway has supplied an older revision than the current "
816 ".cvmfspublished %lu < %lu",
817 current_revision, manifest->revision());
818 }
819 } else {
820 LogCvmfs(kLogCvmfs, kLogStdout,
821 "Gateway has not supplied a revision. Using .cvmfspublished");
822 }
823
824
825 // get hash of current root catalog, remove terminal "C", encode it
826 string const old_root_hash = manifest->catalog_hash().ToString(true);
827 string const hash = old_root_hash.substr(0, old_root_hash.length() - 1);
828 shash::Any const base_hash = shash::MkFromHexPtr(shash::HexPtr(hash),
829 shash::kSuffixCatalog);
830 LogCvmfs(kLogCvmfs, kLogStdout, "old_root_hash: %s", old_root_hash.c_str());
831
832 bool const is_balanced = false;
833
834 catalog::WritableCatalogManager catalog_manager(
835 base_hash, stratum0, dir_temp, spooler_catalogs.weak_ref(),
836 download_manager(), false, SyncParameters::kDefaultNestedKcatalogLimit,
837 SyncParameters::kDefaultRootKcatalogLimit,
838 SyncParameters::kDefaultFileMbyteLimit, statistics(), is_balanced,
839 SyncParameters::kDefaultMaxWeight, SyncParameters::kDefaultMinWeight,
840 dir_temp /* dir_cache */);
841
842 catalog_manager.Init();
843
844
845 // now graft the contents of the DB
846 vector<sqlite3 *> open_dbs;
847 for (auto &&db_file : sqlite_db_vec) {
848 sqlite3 *db;
849 CHECK_SQLITE_ERROR(
850 sqlite3_open_v2(db_file.c_str(), &db, SQLITE_OPEN_READONLY, NULL),
851 SQLITE_OK);
852 relax_db_locking(db);
853 open_dbs.push_back(db);
854 }
855 process_sqlite(open_dbs, catalog_manager, allow_additions, allow_deletions,
856 lease_path.substr(1), additional_prefix);
857 for (auto &&db : open_dbs) {
858 CHECK_SQLITE_ERROR(sqlite3_close_v2(db), SQLITE_OK);
859 }
860
861 // commit changes
862 LogCvmfs(kLogCvmfs, kLogStdout, "Committing changes...");
863 if (!catalog_manager.Commit(false, false, manifest.weak_ref())) {
864 LogCvmfs(kLogCvmfs, kLogStderr, "something went wrong during sync");
865 cancel_lease();
866 return 1;
867 }
868
869 // finalize the spooler
870 LogCvmfs(kLogCvmfs, kLogStdout, "Waiting for all uploads to finish...");
871 spooler_catalogs->WaitForUpload();
872
873 LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
874
875 // We call FinalizeSession(true) this time, to also trigger the commit
876 // operation on the gateway machine (if the upstream is of type "gw").
877
878 // Get the path of the new root catalog
879 const string new_root_hash = manifest->catalog_hash().ToString(true);
880
881 // if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
882 // RepositoryTag())) {
883 // LogCvmfs(kLogCvmfs, kLogStderr, "Failed to commit the transaction");
884 // // lease is only released on success
885 // cancel_lease();
886 // return 1;
887 // }
888
889 LogCvmfs(kLogCvmfs, kLogStdout, "Committing with priority %" PRId64,
890 g_priority);
891
892 bool const ok = make_commit_on_gateway(old_root_hash, new_root_hash,
893 g_priority);
894 if (!ok) {
895 LogCvmfs(kLogCvmfs, kLogStderr,
896 "something went wrong during commit on gateway");
897 cancel_lease();
898 exit(1);
899 }
900
901
902 unlink(g_session_token_file.c_str());
903
904 g_stop_refresh = true;
905
906
907 if (check_completed_graft_property) {
908 setDatabaseMarkedComplete(sqlite_db_vec[0].c_str());
909 }
910
911
912 return 0;
913 }
914
915 size_t writeFunction(void *ptr, size_t size, size_t nmemb, std::string *data) {
916 return size * nmemb;
917 }
918
919 void replaceAllSubstrings(std::string &str, const std::string &from,
920 const std::string &to) {
921 if (from.empty()) {
922 return; // Avoid infinite loop if 'from' is an empty string.
923 }
924 size_t startPos = 0;
925 while ((startPos = str.find(from, startPos)) != std::string::npos) {
926 str.replace(startPos, from.length(), to);
927 startPos += to.length(); // Advance startPos to avoid replacing the
928 // substring just inserted.
929 }
930 }
931
932
933 void swissknife::IngestSQL::process_sqlite(
934 const std::vector<sqlite3 *> &dbs,
935 catalog::WritableCatalogManager &catalog_manager, bool allow_additions,
936 bool allow_deletions, const std::string &lease_path,
937 const std::string &additional_prefix) {
938 std::map<std::string, Directory> all_dirs;
939 std::map<std::string, std::vector<File> > all_files;
940 std::map<std::string, std::vector<Symlink> > all_symlinks;
941
942 for (auto &&db : dbs) {
943 load_dirs(db, lease_path, additional_prefix, all_dirs);
944 }
945
946 // put in a nested scope so we can free up memory of `dir_names`
947 {
948 LogCvmfs(kLogCvmfs, kLogStdout,
949 "Precaching existing directories (starting from %s)",
950 lease_path.c_str());
951 std::unordered_set<std::string> dir_names;
952 std::transform(all_dirs.begin(), all_dirs.end(),
953 std::inserter(dir_names, dir_names.end()),
954 [](const std::pair<std::string, Directory> &pair) {
955 return MakeCatalogPath(pair.first);
956 });
957 catalog_manager.LoadCatalogs(MakeCatalogPath(lease_path), dir_names);
958 }
959
960 for (auto &&db : dbs) {
961 load_files(db, lease_path, additional_prefix, all_files);
962 load_symlinks(db, lease_path, additional_prefix, all_symlinks);
963 }
964
965 // perform all deletions first
966 if (allow_deletions) {
967 LogCvmfs(kLogCvmfs, kLogStdout, "Processing deletions...");
968 for (auto &&db : dbs) {
969 CHECK_SQLITE_ERROR(
970 do_deletions(db, catalog_manager, lease_path, additional_prefix),
971 SQLITE_OK);
972 }
973 }
974
975 if (allow_additions) {
976 LogCvmfs(kLogCvmfs, kLogStdout, "Processing additions...");
977 // first ensure all directories are present and create missing ones
978 do_additions(all_dirs, all_files, all_symlinks, lease_path,
979 catalog_manager);
980 }
981 }
982
983 void add_dir_to_tree(
984 std::string path,
985 std::unordered_map<std::string, std::set<std::string> > &tree,
986 const std::string &lease_path) {
987 tree[path];
988 std::string parent_path = get_parent(path);
989 // recursively create any missing parents in the tree
990 // avoid creating a loop when we insert the root path
991 while (path != parent_path && path != lease_path
992 && !tree[parent_path].count(path)) {
993 tree[parent_path].insert(path);
994 path = parent_path;
995 parent_path = get_parent(path);
996 }
997 }
998
999 int swissknife::IngestSQL::do_additions(
1000 const DirMap &all_dirs, const FileMap &all_files,
1001 const SymlinkMap &all_symlinks, const std::string &lease_path,
1002 catalog::WritableCatalogManager &catalog_manager) {
1003 // STEP 1:
1004 // - collect all the dirs/symlinks/files we need to process from the DB
1005 // - build a tree of paths for DFS traversal
1006 // - note the tree will contain all parent dirs of symlinks/files even if
1007 // those are not
1008 // explicitly added to the dirs table
1009 std::unordered_map<std::string, std::set<std::string> > tree;
1010 for (auto &&p : all_dirs) {
1011 add_dir_to_tree(p.first, tree, lease_path);
1012 }
1013 for (auto &&p : all_files) {
1014 add_dir_to_tree(p.first, tree, lease_path);
1015 }
1016 for (auto &&p : all_symlinks) {
1017 add_dir_to_tree(p.first, tree, lease_path);
1018 }
1019 int const row_count = static_cast<int>(tree.size());
1020 int const print_every = calculate_print_frequency(row_count);
1021 int curr_row = 0;
1022 LogCvmfs(kLogCvmfs, kLogStdout,
1023 "Changeset: %ld dirs, %ld files, %ld symlinks", tree.size(),
1024 all_files.size(), all_symlinks.size());
1025
1026 // STEP 2:
1027 // - process all the changes with DFS traversal
1028 // - make directories in pre-order
1029 // - add files/symlinks and schedule upload in post-order
1030 catalog_manager.SetupSingleCatalogUploadCallback();
1031 std::stack<string> dfs_stack;
1032 for (auto &&p : tree) {
1033 // figure out the starting point by checking whose parent is missing from
1034 // the tree
1035 if (p.first == "" || !tree.count(get_parent(p.first))) {
1036 CUSTOM_ASSERT(dfs_stack.empty(),
1037 "provided DB input forms more than one path trees");
1038 dfs_stack.push(p.first);
1039 }
1040 }
1041 std::set<string> visited;
1042 while (!dfs_stack.empty()) {
1043 string const curr_dir = dfs_stack.top();
1044 // add content for the dir in post-order traversal
1045 if (visited.count(curr_dir)) {
1046 curr_row++;
1047 if (all_symlinks.count(curr_dir)) {
1048 add_symlinks(catalog_manager, all_symlinks.at(curr_dir));
1049 }
1050 if (all_files.count(curr_dir)) {
1051 add_files(catalog_manager, all_files.at(curr_dir));
1052 }
1053 // snapshot the dir (if it's a nested catalog mountpoint)
1054 catalog::DirectoryEntry dir_entry;
1055 bool exists = false;
1056 exists = catalog_manager.LookupDirEntry(
1057 MakeCatalogPath(curr_dir), catalog::kLookupDefault, &dir_entry);
1058 assert(exists); // the dir must exist at this point
1059 if (dir_entry.IsNestedCatalogMountpoint()
1060 || dir_entry.IsNestedCatalogRoot()) {
1061 catalog_manager.AddCatalogToQueue(curr_dir);
1062 catalog_manager.ScheduleReadyCatalogs();
1063 }
1064 dfs_stack.pop();
1065 SHOW_PROGRESS("directories", print_every, curr_row, row_count);
1066 } else {
1067 visited.insert(curr_dir);
1068 // push children to the stack
1069 auto it = tree.find(curr_dir);
1070 if (it != tree.end()) {
1071 for (auto &&child : it->second) {
1072 dfs_stack.push(child);
1073 }
1074 tree.erase(it);
1075 }
1076 if (!all_dirs.count(curr_dir))
1077 continue;
1078
1079 // create the dir first in pre-order traversal
1080 const IngestSQL::Directory &dir = all_dirs.at(curr_dir);
1081 catalog::DirectoryEntry dir_entry;
1082
1083 bool exists = false;
1084 exists = catalog_manager.LookupDirEntry(
1085 MakeCatalogPath(curr_dir), catalog::kLookupDefault, &dir_entry);
1086 CUSTOM_ASSERT(
1087 !(exists && !S_ISDIR(dir_entry.mode_)),
1088 "Refusing to replace existing file/symlink at %s with a directory",
1089 dir.name.c_str());
1090
1091 dir_entry.name_ = NameString(get_basename(dir.name));
1092 dir_entry.mtime_ = dir.mtime / 1000000000;
1093 dir_entry.mode_ = dir.mode | S_IFDIR;
1094 dir_entry.mode_ &= (S_IFDIR | 0777);
1095 dir_entry.uid_ = dir.owner;
1096 dir_entry.gid_ = dir.grp;
1097 dir_entry.has_xattrs_ = !dir.xattr.IsEmpty();
1098
1099 bool add_nested_catalog = false;
1100
1101 if (exists) {
1102 catalog_manager.TouchDirectory(dir_entry, dir.xattr, dir.name);
1103 if ((!dir_entry.IsNestedCatalogMountpoint()
1104 && !dir_entry.IsNestedCatalogRoot())
1105 && (g_add_missing_catalogs || dir.nested)) {
1106 add_nested_catalog = true;
1107 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1108 "Touching existing directory %s and adding nested catalog",
1109 dir.name.c_str());
1110 } else {
1111 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Touching existing directory %s",
1112 dir.name.c_str());
1113 }
1114 } else {
1115 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding directory [%s]",
1116 dir.name.c_str());
1117 catalog_manager.AddDirectory(dir_entry, dir.xattr,
1118 get_parent(dir.name));
1119 if (dir.nested) {
1120 add_nested_catalog = true;
1121 }
1122 }
1123 if (add_nested_catalog) {
1124 // now add a .cvmfscatalog file
1125 // so that manual changes won't remove the nested catalog
1126 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1127 "Placing .cvmfscatalog file in [%s]", dir.name.c_str());
1128 catalog::DirectoryEntryBase dir2;
1129 dir2.name_ = NameString(".cvmfscatalog");
1130 dir2.mtime_ = dir.mtime / 1000000000;
1131 dir2.mode_ = (S_IFREG | 0666);
1132 dir2.uid_ = 0;
1133 dir2.gid_ = 0;
1134 dir2.has_xattrs_ = 0;
1135 dir2.checksum_ = shash::MkFromHexPtr(
1136 shash::HexPtr("da39a3ee5e6b4b0d3255bfef95601890afd80709"),
1137 shash::kSuffixNone); // hash of ""
1138 XattrList const xattr2;
1139 catalog_manager.AddFile(dir2, xattr2, dir.name);
1140
1141 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Creating Nested Catalog [%s]",
1142 dir.name.c_str());
1143 catalog_manager.CreateNestedCatalog(dir.name);
1144 }
1145 }
1146 }
1147
1148 // sanity check that we have processed all the input
1149 CUSTOM_ASSERT(tree.empty(),
1150 "not all directories are processed, malformed input DB?");
1151 catalog_manager.RemoveSingleCatalogUploadCallback();
1152 return 0;
1153 }
1154
1155 int swissknife::IngestSQL::add_symlinks(
1156 catalog::WritableCatalogManager &catalog_manager,
1157 const std::vector<Symlink> &symlinks) {
1158 for (auto &&symlink : symlinks) {
1159 catalog::DirectoryEntry dir;
1160 catalog::DirectoryEntryBase dir2;
1161 XattrList const xattr;
1162 bool exists = false;
1163 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(symlink.name),
1164 catalog::kLookupDefault, &dir);
1165
1166 dir2.name_ = NameString(get_basename(symlink.name));
1167 dir2.mtime_ = symlink.mtime / 1000000000;
1168 dir2.uid_ = symlink.owner;
1169 dir2.gid_ = symlink.grp;
1170 dir2.has_xattrs_ = false;
1171 dir2.symlink_ = LinkString(symlink.target);
1172 dir2.mode_ = S_IFLNK | 0777;
1173
1174 int noop = false;
1175
1176 if (exists) {
1177 if (symlink.skip_if_file_or_dir) {
1178 if (S_ISDIR(dir.mode_) || S_ISREG(dir.mode_)) {
1179 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1180 "File or directory for symlink [%s] exists, skipping "
1181 "symlink creation",
1182 symlink.name.c_str());
1183 noop = true;
1184 } else if (S_ISLNK(dir.mode_)) {
1185 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing existing symlink [%s]",
1186 symlink.name.c_str());
1187 catalog_manager.RemoveFile(symlink.name);
1188 } else {
1189 CUSTOM_ASSERT(0, "unknown mode for dirent: %d", dir.mode_);
1190 }
1191 } else {
1192 CUSTOM_ASSERT(!S_ISDIR(dir.mode_),
1193 "Not removing directory [%s] to create symlink",
1194 symlink.name.c_str());
1195 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1196 "Removing existing file/symlink [%s]", symlink.name.c_str());
1197 catalog_manager.RemoveFile(symlink.name);
1198 }
1199 }
1200 if (!noop) {
1201 string const parent = get_parent(symlink.name);
1202 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding symlink [%s] -> [%s]",
1203 symlink.name.c_str(), symlink.target.c_str());
1204 catalog_manager.AddFile(dir2, xattr, parent);
1205 }
1206 }
1207 return 0;
1208 }
1209
1210 static int check_hash(const char *hash) {
1211 if (strlen(hash) != 40) {
1212 return 1;
1213 }
1214 for (int i = 0; i < 40; i++) {
1215 // < '0' || > 'f' || ( > '9' && < 'a' )
1216 if (hash[i] < 0x30 || hash[i] > 0x66
1217 || (hash[i] > 0x39 && hash[i] < 0x61)) {
1218 return 1;
1219 }
1220 }
1221 return 0;
1222 }
1223
1224 bool check_prefix(const std::string &path, const std::string &prefix) {
1225 if (prefix == "" || prefix == "/") {
1226 return true;
1227 }
1228 if ("/" + path == prefix) {
1229 return true;
1230 }
1231 if (!HasPrefix(path, prefix, false)) {
1232 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Entry %s is outside lease path: %s",
1233 path.c_str(), prefix.c_str());
1234 return false;
1235 }
1236 return true;
1237 }
1238
1239 void swissknife::IngestSQL::load_dirs(
1240 sqlite3 *db, const std::string &lease_path,
1241 const std::string &additional_prefix,
1242 std::map<std::string, Directory> &all_dirs) {
1243 sqlite3_stmt *stmt;
1244 int const schema_revision = get_db_schema_revision(db);
1245 string select_stmt = "SELECT name, mode, mtime, owner, grp, acl, nested FROM "
1246 "dirs";
1247 if (schema_revision <= 3) {
1248 select_stmt = "SELECT name, mode, mtime, owner, grp, acl FROM dirs";
1249 }
1250 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1251 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1252 while (sqlite3_step(stmt) == SQLITE_ROW) {
1253 char *name_cstr = (char *)sqlite3_column_text(stmt, 0);
1254 mode_t const mode = sqlite3_column_int(stmt, 1);
1255 time_t const mtime = sqlite3_column_int64(stmt, 2);
1256 uid_t const owner = sqlite3_column_int(stmt, 3);
1257 gid_t const grp = sqlite3_column_int(stmt, 4);
1258 int const nested = schema_revision <= 3 ? 1 : sqlite3_column_int(stmt, 6);
1259
1260 string const name = additional_prefix + sanitise_name(name_cstr);
1261 CUSTOM_ASSERT(check_prefix(name, lease_path),
1262 "%s is not below lease path %s", name.c_str(),
1263 lease_path.c_str());
1264
1265 Directory dir(name, mtime, mode, owner, grp, nested);
1266 char *acl = (char *)sqlite3_column_text(stmt, 5);
1267 dir.xattr = marshal_xattrs(acl);
1268 all_dirs.insert(std::make_pair(name, dir));
1269 }
1270 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1271 }
1272
1273 void swissknife::IngestSQL::load_files(
1274 sqlite3 *db, const std::string &lease_path,
1275 const std::string &additional_prefix,
1276 std::map<std::string, std::vector<File> > &all_files) {
1277 sqlite3_stmt *stmt;
1278 int const schema_revision = get_db_schema_revision(db);
1279 string select_stmt = "SELECT name, mode, mtime, owner, grp, size, hashes, "
1280 "internal, compressed FROM files";
1281 if (schema_revision <= 2) {
1282 select_stmt = "SELECT name, mode, mtime, owner, grp, size, hashes, "
1283 "internal FROM files";
1284 }
1285 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1286 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1287 while (sqlite3_step(stmt) == SQLITE_ROW) {
1288 char *name = (char *)sqlite3_column_text(stmt, 0);
1289 mode_t const mode = sqlite3_column_int(stmt, 1);
1290 time_t const mtime = sqlite3_column_int64(stmt, 2);
1291 uid_t const owner = sqlite3_column_int(stmt, 3);
1292 gid_t const grp = sqlite3_column_int(stmt, 4);
1293 size_t const size = sqlite3_column_int64(stmt, 5);
1294 char *hashes_cstr = (char *)sqlite3_column_text(stmt, 6);
1295 int const internal = sqlite3_column_int(stmt, 7);
1296 int const compressed = schema_revision <= 2 ? 0
1297 : sqlite3_column_int(stmt, 8);
1298
1299 string names = additional_prefix + sanitise_name(name);
1300 CUSTOM_ASSERT(check_prefix(names, lease_path),
1301 "%s is not below lease path %s", names.c_str(),
1302 lease_path.c_str());
1303 string const parent_dir = get_parent(names);
1304
1305 if (!all_files.count(parent_dir)) {
1306 all_files[parent_dir] = vector<swissknife::IngestSQL::File>();
1307 }
1308 all_files[parent_dir].emplace_back(std::move(names), mtime, size, owner,
1309 grp, mode, internal, compressed);
1310
1311 // tokenize hashes
1312 char *ref;
1313 char *tok;
1314 tok = strtok_r(hashes_cstr, ",", &ref);
1315 vector<off_t> offsets;
1316 vector<size_t> sizes;
1317 vector<shash::Any> hashes;
1318 off_t offset = 0;
1319
1320 CUSTOM_ASSERT(size >= 0, "file size cannot be negative [%s]",
1321 names.c_str());
1322 size_t const kChunkSize = internal ? kInternalChunkSize
1323 : kExternalChunkSize;
1324
1325 while (tok) {
1326 offsets.push_back(offset);
1327 // TODO: check the hash format
1328 CUSTOM_ASSERT(check_hash(tok) == 0,
1329 "provided hash for [%s] is invalid: %s", names.c_str(),
1330 tok);
1331 hashes.push_back(
1332 shash::MkFromHexPtr(shash::HexPtr(tok), shash::kSuffixNone));
1333 tok = strtok_r(NULL, ",", &ref);
1334 offset += kChunkSize; // in the future we might want variable chunk
1335 // sizes specified in the DB
1336 }
1337 size_t expected_num_chunks = size / kChunkSize;
1338 if (expected_num_chunks * (size_t)kChunkSize < (size_t)size || size == 0) {
1339 expected_num_chunks++;
1340 }
1341 CUSTOM_ASSERT(
1342 offsets.size() == expected_num_chunks,
1343 "offsets size %ld does not match expected number of chunks %ld",
1344 offsets.size(), expected_num_chunks);
1345 for (size_t i = 0; i < offsets.size() - 1; i++) {
1346 sizes.push_back(size_t(offsets[i + 1] - offsets[i]));
1347 }
1348
1349 sizes.push_back(size_t(size - offsets[offsets.size() - 1]));
1350 for (size_t i = 0; i < offsets.size(); i++) {
1351 FileChunk const chunk = FileChunk(hashes[i], offsets[i], sizes[i]);
1352 all_files[parent_dir].back().chunks.PushBack(chunk);
1353 }
1354 }
1355 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1356 }
1357
1358 void swissknife::IngestSQL::load_symlinks(
1359 sqlite3 *db, const std::string &lease_path,
1360 const std::string &additional_prefix,
1361 std::map<std::string, std::vector<Symlink> > &all_symlinks) {
1362 sqlite3_stmt *stmt;
1363 string const select_stmt = "SELECT name, target, mtime, owner, grp, "
1364 "skip_if_file_or_dir FROM links";
1365 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1366 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1367 while (sqlite3_step(stmt) == SQLITE_ROW) {
1368 char *name_cstr = (char *)sqlite3_column_text(stmt, 0);
1369 char *target_cstr = (char *)sqlite3_column_text(stmt, 1);
1370 time_t const mtime = sqlite3_column_int64(stmt, 2);
1371 uid_t const owner = sqlite3_column_int(stmt, 3);
1372 gid_t const grp = sqlite3_column_int(stmt, 4);
1373 int const skip_if_file_or_dir = sqlite3_column_int(stmt, 5);
1374
1375 string names = additional_prefix + sanitise_name(name_cstr);
1376 CUSTOM_ASSERT(check_prefix(names, lease_path),
1377 "%s is not below lease path %s", names.c_str(),
1378 lease_path.c_str());
1379 string target = target_cstr;
1380 string const parent_dir = get_parent(names);
1381
1382 if (!all_symlinks.count(parent_dir)) {
1383 all_symlinks[parent_dir] = vector<swissknife::IngestSQL::Symlink>();
1384 }
1385 all_symlinks[parent_dir].emplace_back(std::move(names), std::move(target),
1386 mtime, owner, grp,
1387 skip_if_file_or_dir);
1388 }
1389 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1390 }
1391
1392 int swissknife::IngestSQL::add_files(
1393 catalog::WritableCatalogManager &catalog_manager,
1394 const std::vector<File> &files) {
1395 for (auto &&file : files) {
1396 catalog::DirectoryEntry dir;
1397 XattrList const xattr;
1398 bool exists = false;
1399 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(file.name),
1400 catalog::kLookupDefault, &dir);
1401
1402 dir.name_ = NameString(get_basename(file.name));
1403 dir.mtime_ = file.mtime / 1000000000;
1404 dir.mode_ = file.mode | S_IFREG;
1405 dir.mode_ &= (S_IFREG | 0777);
1406 dir.uid_ = file.owner;
1407 dir.gid_ = file.grp;
1408 dir.size_ = file.size;
1409 dir.has_xattrs_ = false;
1410 dir.is_external_file_ = !file.internal;
1411 dir.set_is_chunked_file(true);
1412 dir.checksum_ = shash::MkFromHexPtr(
1413 shash::HexPtr("0000000000000000000000000000000000000000"),
1414 shash::kSuffixNone);
1415
1416 // compression is permitted only for internal data
1417 CUSTOM_ASSERT(file.internal || (!file.internal && file.compressed < 2),
1418 "compression is only allowed for internal data [%s]",
1419 file.name.c_str());
1420
1421 switch (file.compressed) {
1422 case 1: // Uncompressed
1423 dir.compression_algorithm_ = zlib::kNoCompression;
1424 break;
1425 case 2: // Compressed with Zlib
1426 dir.compression_algorithm_ = zlib::kZlibDefault;
1427 break;
1428 // future cases: different compression schemes
1429 default: // default behaviour: compressed if internal, content-addressed.
1430 // Uncompressed if external
1431 dir.compression_algorithm_ = file.internal ? zlib::kZlibDefault
1432 : zlib::kNoCompression;
1433 }
1434
1435 if (exists) {
1436 CUSTOM_ASSERT(
1437 !S_ISDIR(dir.mode()) && !S_ISLNK(dir.mode()),
1438 "Refusing to replace existing dir/symlink at %s with a file",
1439 file.name.c_str());
1440 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing existing file [%s]",
1441 file.name.c_str());
1442 catalog_manager.RemoveFile(file.name);
1443 }
1444 string const parent = get_parent(file.name);
1445 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding chunked file [%s]",
1446 file.name.c_str());
1447 catalog_manager.AddChunkedFile(dir, xattr, parent, file.chunks);
1448 }
1449
1450 return 0;
1451 }
1452
1453 int swissknife::IngestSQL::do_deletions(
1454 sqlite3 *db, catalog::WritableCatalogManager &catalog_manager,
1455 const std::string &lease_path, const std::string &additional_prefix) {
1456 sqlite3_stmt *stmt;
1457 int const row_count = get_row_count(db, "deletions");
1458 int const print_every = calculate_print_frequency(row_count);
1459 int curr_row = 0;
1460 int ret = sqlite3_prepare_v2(db,
1461 "SELECT name, directory, file, link FROM "
1462 "deletions ORDER BY length(name) DESC",
1463 -1, &stmt, NULL);
1464 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1465 while (sqlite3_step(stmt) == SQLITE_ROW) {
1466 curr_row++;
1467
1468 char *name = (char *)sqlite3_column_text(stmt, 0);
1469 int64_t const isdir = sqlite3_column_int64(stmt, 1);
1470 int64_t const isfile = sqlite3_column_int64(stmt, 2);
1471 int64_t const islink = sqlite3_column_int64(stmt, 3);
1472
1473 string const names = additional_prefix + sanitise_name(name);
1474 CUSTOM_ASSERT(check_prefix(names, lease_path),
1475 "%s is not below lease path %s", names.c_str(),
1476 lease_path.c_str());
1477
1478 catalog::DirectoryEntry dirent;
1479 bool exists = false;
1480 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(names),
1481 catalog::kLookupDefault, &dirent);
1482 if (exists) {
1483 if ((isdir && S_ISDIR(dirent.mode()))
1484 || (islink && S_ISLNK(dirent.mode()))
1485 || (isfile && S_ISREG(dirent.mode()))) {
1486 if (S_ISDIR(dirent.mode())) {
1487 PathString names_path(names);
1488 recursively_delete_directory(names_path, catalog_manager);
1489 } else {
1490 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing link/file [%s]",
1491 names.c_str());
1492 catalog_manager.RemoveFile(names);
1493 }
1494 } else {
1495 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1496 "Mismatch in deletion type, not deleting: [%s] (dir %ld/%d , "
1497 "link %ld/%d, file %ld/%d)",
1498 names.c_str(), isdir, S_ISDIR(dirent.mode()), islink,
1499 S_ISLNK(dirent.mode()), isfile, S_ISREG(dirent.mode()));
1500 }
1501 } else {
1502 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Not Removing non-existent [%s]",
1503 names.c_str());
1504 }
1505
1506 SHOW_PROGRESS("deletions", print_every, curr_row, row_count);
1507 }
1508 ret = sqlite3_finalize(stmt);
1509 return ret;
1510 }
1511
1512 const char *schema[] = {"PRAGMA journal_mode=WAL;",
1513
1514 "CREATE TABLE IF NOT EXISTS dirs ( \
1515 name TEXT PRIMARY KEY, \
1516 mode INTEGER NOT NULL DEFAULT 493,\
1517 mtime INTEGER NOT NULL DEFAULT (unixepoch()),\
1518 owner INTEGER NOT NULL DEFAULT 0, \
1519 grp INTEGER NOT NULL DEFAULT 0, \
1520 acl TEXT NOT NULL DEFAULT '', \
1521 nested INTEGER DEFAULT 1);",
1522
1523 "CREATE TABLE IF NOT EXISTS files ( \
1524 name TEXT PRIMARY KEY, \
1525 mode INTEGER NOT NULL DEFAULT 420, \
1526 mtime INTEGER NOT NULL DEFAULT (unixepoch()),\
1527 owner INTEGER NOT NULL DEFAULT 0,\
1528 grp INTEGER NOT NULL DEFAULT 0,\
1529 size INTEGER NOT NULL DEFAULT 0,\
1530 hashes TEXT NOT NULL DEFAULT '',\
1531 internal INTEGER NOT NULL DEFAULT 0,\
1532 compressed INTEGER NOT NULL DEFAULT 0\
1533 );",
1534
1535 "CREATE TABLE IF NOT EXISTS links (\
1536 name TEXT PRIMARY KEY,\
1537 target TEXT NOT NULL DEFAULT '',\
1538 mtime INTEGER NOT NULL DEFAULT (unixepoch()),\
1539 owner INTEGER NOT NULL DEFAULT 0,\
1540 grp INTEGER NOT NULL DEFAULT 0,\
1541 skip_if_file_or_dir INTEGER NOT NULL DEFAULT 0\
1542 );",
1543
1544 "CREATE TABLE IF NOT EXISTS deletions (\
1545 name TEXT PRIMARY KEY,\
1546 directory INTEGER NOT NULL DEFAULT 0,\
1547 file INTEGER NOT NULL DEFAULT 0,\
1548 link INTEGER NOT NULL DEFAULT 0\
1549 );",
1550
1551 "CREATE TABLE IF NOT EXISTS properties (\
1552 key TEXT PRIMARY KEY,\
1553 value TEXT NOT NULL\
1554 );",
1555
1556 "INSERT INTO properties VALUES ('schema_revision', "
1557 "'4') ON CONFLICT DO NOTHING;",
1558 NULL};
1559
1560 static void create_empty_database(string &filename) {
1561 sqlite3 *db_out;
1562 LogCvmfs(kLogCvmfs, kLogStdout, "Creating empty database file %s",
1563 filename.c_str());
1564 int ret = sqlite3_open_v2(filename.c_str(), &db_out,
1565 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL);
1566 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1567 relax_db_locking(db_out);
1568
1569 const char **ptr = schema;
1570 while (*ptr != NULL) {
1571 ret = sqlite3_exec(db_out, *ptr, NULL, NULL, NULL);
1572 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1573 ptr++;
1574 }
1575 sqlite3_close(db_out);
1576 }
1577
1578 static void recursively_delete_directory(
1579 PathString &path, catalog::WritableCatalogManager &catalog_manager) {
1580 catalog::DirectoryEntryList const listing;
1581
1582 // Add all names
1583 catalog::StatEntryList listing_from_catalog;
1584 bool const retval = catalog_manager.ListingStat(
1585 PathString("/" + path.ToString()), &listing_from_catalog);
1586
1587 CUSTOM_ASSERT(retval, "failed to call ListingStat for %s", path.c_str());
1588
1589 if (!catalog_manager.IsTransitionPoint(path.ToString())) {
1590 for (unsigned i = 0; i < listing_from_catalog.size(); ++i) {
1591 PathString entry_path;
1592 entry_path.Assign(path);
1593 entry_path.Append("/", 1);
1594 entry_path.Append(listing_from_catalog.AtPtr(i)->name.GetChars(),
1595 listing_from_catalog.AtPtr(i)->name.GetLength());
1596
1597 if (S_ISDIR(listing_from_catalog.AtPtr(i)->info.st_mode)) {
1598 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Recursing into %s/",
1599 entry_path.ToString().c_str());
1600 recursively_delete_directory(entry_path, catalog_manager);
1601
1602
1603 } else {
1604 LogCvmfs(kLogCvmfs, kLogVerboseMsg, " Recursively removing %s",
1605 entry_path.ToString().c_str());
1606 catalog_manager.RemoveFile(entry_path.ToString());
1607 }
1608 }
1609 } else {
1610 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing nested catalog %s",
1611 path.ToString().c_str());
1612 catalog_manager.RemoveNestedCatalog(path.ToString(), false);
1613 }
1614 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing directory %s",
1615 path.ToString().c_str());
1616 catalog_manager.RemoveDirectory(path.ToString());
1617 }
1618
1619
1620 static void relax_db_locking(sqlite3 *db) {
1621 int ret = 0;
1622 ret = sqlite3_exec(db, "PRAGMA temp_store=2", NULL, NULL, NULL);
1623 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1624 ret = sqlite3_exec(db, "PRAGMA synchronous=OFF", NULL, NULL, NULL);
1625 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1626 }
1627
1628
1629 extern "C" void *lease_refresh_thread(void *payload) {
1630 while (!g_stop_refresh) {
1631 sleep(2);
1632 refresh_lease();
1633 }
1634 return NULL;
1635 }
1636
1637 static bool isDatabaseMarkedComplete(const char *dbfile) {
1638 int ret;
1639 sqlite3 *db;
1640 sqlite3_stmt *stmt;
1641 bool retval = false;
1642
1643 ret = sqlite3_open(dbfile, &db);
1644 if (ret != SQLITE_OK) {
1645 return false;
1646 }
1647
1648 const char *req = "SELECT value FROM properties WHERE key='completed_graft'";
1649 ret = sqlite3_prepare_v2(db, req, -1, &stmt, NULL);
1650 if (ret != SQLITE_OK) {
1651 return false;
1652 }
1653 if (sqlite3_step(stmt) == SQLITE_ROW) {
1654 int const id = sqlite3_column_int(stmt, 0);
1655 if (id > 0) {
1656 retval = true;
1657 }
1658 }
1659 sqlite3_close(db);
1660 return retval;
1661 }
1662
1663 static void setDatabaseMarkedComplete(const char *dbfile) {
1664 int ret;
1665 sqlite3 *db;
1666 char *err;
1667
1668 ret = sqlite3_open(dbfile, &db);
1669 if (ret != SQLITE_OK) {
1670 return;
1671 }
1672
1673 const char *req = "INSERT INTO properties (key, value) VALUES "
1674 "('completed_graft',1) ON CONFLICT(key) DO UPDATE SET "
1675 "value=1 WHERE key='completed_graft'";
1676
1677 ret = sqlite3_exec(db, req, 0, 0, &err);
1678 if (ret != SQLITE_OK) {
1679 return;
1680 }
1681 sqlite3_close(db);
1682 }
1683