GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_ingestsql.cc
Date: 2025-09-28 02:35:26
Exec Total Coverage
Lines: 0 903 0.0%
Branches: 0 594 0.0%

Line Branch Exec Source
1 #include "swissknife_ingestsql.h"
2
3 #include <fcntl.h>
4 #include <grp.h>
5 #include <pwd.h>
6 #include <stdio.h>
7 #include <sys/resource.h>
8 #include <sys/time.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11
12 #include <csignal>
13 #include <cstdlib>
14 #include <fstream>
15 #include <sstream>
16 #include <stack>
17 #include <unordered_map>
18 #include <unordered_set>
19
20 #include "acl.h"
21 #include "catalog_downloader.h"
22 #include "catalog_mgr_rw.h"
23 #include "curl/curl.h"
24 #include "gateway_util.h"
25 #include "shortstring.h"
26 #include "swissknife_lease_curl.h"
27 #include "swissknife_lease_json.h"
28 #include "swissknife_sync.h"
29 #include "upload.h"
30 #include "util/logging.h"
31
32 #define CHECK_SQLITE_ERROR(ret, expected) \
33 do { \
34 if ((ret) != expected) { \
35 LogCvmfs(kLogCvmfs, kLogStderr, "SQLite error: %d", (ret)); \
36 assert(0); \
37 } \
38 } while (0)
39
40 #define CUSTOM_ASSERT(check, msg, ...) \
41 do { \
42 if (!(check)) { \
43 LogCvmfs(kLogCvmfs, kLogStderr, msg, ##__VA_ARGS__); \
44 assert(0); \
45 } \
46 } while (0)
47
48 #define SHOW_PROGRESS(item, freq, curr, total) \
49 do { \
50 if ((curr) % freq == 0 || (curr) == total) { \
51 LogCvmfs(kLogCvmfs, kLogStdout, "Processed %d/%d %s", (curr), total, \
52 item); \
53 } \
54 } while (0)
55
56
57 static const unsigned kExternalChunkSize = 24 * 1024 * 1024;
58 static const unsigned kInternalChunkSize = 6 * 1024 * 1024;
59 static const unsigned kDefaultLeaseBusyRetryInterval = 10;
60 static const unsigned kLeaseRefreshInterval = 90; // seconds
61
62
63 static bool g_lease_acquired = false;
64 static string g_gateway_url;
65 static string g_gateway_key_id;
66 static string g_gateway_secret;
67 static string g_session_token;
68 static string g_session_token_file;
69 static string g_s3_file;
70 static time_t g_last_lease_refresh = 0;
71 static bool g_stop_refresh = false;
72 static string g_wait_for_update;
73 static int64_t g_priority = 0;
74 static bool g_add_missing_catalogs = false;
75 static string get_lease_from_paths(vector<string> paths);
76 static vector<string> get_all_dirs_from_sqlite(vector<string> &sqlite_db_vec,
77 bool include_additions,
78 bool include_deletions);
79 static string get_parent(const string &path);
80 static string get_basename(const string &path);
81
82 static XattrList marshal_xattrs(const char *acl);
83 static string sanitise_name(const char *name_cstr, bool allow_leading_slash);
84 static void on_signal(int sig);
85 static string acquire_lease(const string &key_id, const string &secret,
86 const string &lease_path,
87 const string &repo_service_url,
88 bool force_cancel_lease, uint64_t *current_revision,
89 string &current_root_hash,
90 unsigned int refresh_interval);
91 static void cancel_lease();
92 static void refresh_lease();
93 static vector<string> get_file_list(string &path);
94 static int check_hash(const char *hash);
95 static void recursively_delete_directory(
96 PathString &path, catalog::WritableCatalogManager &catalog_manager);
97 static void create_empty_database(string &filename);
98 static void relax_db_locking(sqlite3 *db);
99 static bool check_prefix(const std::string &path, const std::string &prefix);
100
101 static bool isDatabaseMarkedComplete(const char *dbfile);
102 static void setDatabaseMarkedComplete(const char *dbfile);
103
104 static void invalidate_manifest(std::string proxy_list, std::string url);
105 static void wait_for_update(std::string path, long revision);
106
107 extern "C" void *lease_refresh_thread(void *payload);
108
109 extern long g_final_revision;
110
111
112 static string sanitise_name(const char *name_cstr,
113 bool allow_leading_slash = false) {
114 int reason = 0;
115 const char *c = name_cstr;
116 while (*c == '/') {
117 c++;
118 } // strip any leading slashes
119 string const name = string(c);
120 bool ok = true;
121
122 if (!allow_leading_slash && HasPrefix(name, "/", true)) {
123 reason = 1;
124 ok = false;
125 }
126 if (HasSuffix(name, "/", true)) {
127 if (!(allow_leading_slash
128 && name.size() == 1)) { // account for the case where name=="/"
129 reason = 2;
130 ok = false;
131 }
132 }
133 if (name.find("//") != string::npos) {
134 reason = 3;
135 ok = false;
136 }
137 if (HasPrefix(name, "./", true) || HasPrefix(name, "../", true)) {
138 reason = 4;
139 ok = false;
140 }
141 if (HasSuffix(name, "/.", true) || HasSuffix(name, "/..", true)) {
142 reason = 5;
143 ok = false;
144 }
145 if (name.find("/./") != string::npos || name.find("/../") != string::npos) {
146 reason = 6;
147 ok = false;
148 }
149 if (name == "") {
150 reason = 7;
151 ok = false;
152 }
153 CUSTOM_ASSERT(ok, "Name [%s] is invalid (reason %d)", name.c_str(), reason);
154 return string(name);
155 }
156
157 static string get_parent(const string &path) {
158 size_t const found = path.find_last_of('/');
159 if (found == string::npos) {
160 return string("");
161 }
162 return path.substr(0, found);
163 }
164
165 static string get_basename(const string &path) {
166 const size_t found = path.find_last_of('/');
167 if (found == string::npos) {
168 return path;
169 }
170 return path.substr(found + 1);
171 }
172
173 // this is copied from MakeRelativePath
174 static string MakeCatalogPath(const std::string &relative_path) {
175 return (relative_path == "") ? "" : "/" + relative_path;
176 }
177
178 static string acquire_lease(const string &key_id, const string &secret,
179 const string &lease_path,
180 const string &repo_service_url,
181 bool force_cancel_lease, uint64_t *current_revision,
182 string &current_root_hash,
183 unsigned int refresh_interval) {
184 const CURLcode ret = curl_global_init(CURL_GLOBAL_ALL);
185 CUSTOM_ASSERT(ret == CURLE_OK, "failed to init curl");
186
187 string gateway_metadata_str;
188 char *gateway_metadata = getenv("CVMFS_GATEWAY_METADATA");
189 if (gateway_metadata != NULL)
190 gateway_metadata_str = gateway_metadata;
191
192 while (true) {
193 CurlBuffer buffer;
194 if (MakeAcquireRequest(key_id, secret, lease_path, repo_service_url,
195 &buffer, gateway_metadata_str)) {
196 string session_token;
197
198 const LeaseReply rep = ParseAcquireReplyWithRevision(
199 buffer, &session_token, current_revision, current_root_hash);
200 switch (rep) {
201 case kLeaseReplySuccess:
202 g_lease_acquired = true;
203 g_last_lease_refresh = time(NULL);
204 return session_token;
205 break;
206 case kLeaseReplyBusy:
207 if (force_cancel_lease) {
208 LogCvmfs(kLogCvmfs, kLogStderr,
209 "Lease busy, forcing cancellation (TODO");
210 }
211 LogCvmfs(kLogCvmfs, kLogStderr, "Lease busy, retrying in %d sec",
212 refresh_interval);
213 sleep(refresh_interval);
214 break;
215 default:
216 LogCvmfs(kLogCvmfs, kLogStderr,
217 "Error acquiring lease: %s. Retrying in %d sec",
218 buffer.data.c_str(), refresh_interval);
219 sleep(refresh_interval);
220 }
221 } else {
222 LogCvmfs(kLogCvmfs, kLogStderr,
223 "Error making lease acquisition request. Retrying in %d sec",
224 refresh_interval);
225 sleep(refresh_interval);
226 }
227 }
228 assert(false);
229 return "";
230 }
231
232 static uint64_t make_commit_on_gateway(const std::string &old_root_hash,
233 const std::string &new_root_hash,
234 int64_t priority) {
235 CurlBuffer buffer;
236 char priorityStr[100];
237 (void)sprintf(priorityStr, "%" PRId64,
238 priority); // skipping return value check; no way such large
239 // buffer will overflow
240 buffer.data = "";
241
242 const std::string payload = "{\n\"old_root_hash\": \"" + old_root_hash
243 + "\",\n\"new_root_hash\": \"" + new_root_hash
244 + "\",\n\"priority\": " + priorityStr + "}";
245
246 return MakeEndRequest("POST", g_gateway_key_id, g_gateway_secret,
247 g_session_token, g_gateway_url, payload, &buffer, true /*expect_final_revision*/);
248 }
249
250 static void refresh_lease() {
251 CurlBuffer buffer;
252 buffer.data = "";
253 if ((time(NULL) - g_last_lease_refresh) < kLeaseRefreshInterval) {
254 return;
255 }
256
257 if (MakeEndRequest("PATCH", g_gateway_key_id, g_gateway_secret,
258 g_session_token, g_gateway_url, "", &buffer, false /*expect_final_revision*/)) {
259 const int ret = ParseDropReply(buffer);
260 if (kLeaseReplySuccess == ret) {
261 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Lease refreshed");
262 g_last_lease_refresh = time(NULL);
263 } else {
264 LogCvmfs(kLogCvmfs, kLogStderr, "Lease refresh failed: %d", ret);
265 }
266 } else {
267 LogCvmfs(kLogCvmfs, kLogStderr, "Lease refresh request failed");
268 if (buffer.data == "Method Not Allowed\n") {
269 g_last_lease_refresh = time(NULL);
270 LogCvmfs(kLogCvmfs, kLogStderr,
271 "This gateway does not support lease refresh");
272 }
273 }
274 }
275
276
277 static void cancel_lease() {
278 CurlBuffer buffer;
279 if (MakeEndRequest("DELETE", g_gateway_key_id, g_gateway_secret,
280 g_session_token, g_gateway_url, "", &buffer, false /*expect_final_revision*/)) {
281 const int ret = ParseDropReply(buffer);
282 if (kLeaseReplySuccess == ret) {
283 LogCvmfs(kLogCvmfs, kLogStdout, "Lease cancelled");
284 } else {
285 LogCvmfs(kLogCvmfs, kLogStderr, "Lease cancellation failed: %d", ret);
286 }
287 } else {
288 LogCvmfs(kLogCvmfs, kLogStderr, "Lease cancellation request failed");
289 }
290 g_stop_refresh = true;
291 }
292
293 static void on_signal(int sig) {
294 (void)signal(sig, SIG_DFL);
295 if (g_lease_acquired) {
296 LogCvmfs(kLogCvmfs, kLogStdout, "Cancelling lease");
297 cancel_lease();
298 unlink(g_session_token_file.c_str());
299 }
300 if (sig == SIGINT || sig == SIGTERM)
301 exit(1);
302 }
303
304 static vector<string> get_all_dirs_from_sqlite(vector<string> &sqlite_db_vec,
305 bool include_additions,
306 bool include_deletions) {
307 int ret;
308 vector<string> paths;
309
310 for (vector<string>::iterator it = sqlite_db_vec.begin();
311 it != sqlite_db_vec.end();
312 it++) {
313 sqlite3 *db;
314 ret = sqlite3_open_v2((*it).c_str(), &db, SQLITE_OPEN_READONLY, NULL);
315 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
316 relax_db_locking(db);
317
318 vector<string> tables;
319 if (include_additions) {
320 tables.push_back("dirs");
321 tables.push_back("links");
322 tables.push_back("files");
323 }
324 if (include_deletions) {
325 tables.push_back("deletions");
326 }
327
328 // get all the paths from the DB
329 for (vector<string>::iterator it = tables.begin(); it != tables.end();
330 it++) {
331 sqlite3_stmt *stmt;
332 const string query = "SELECT name FROM " + *it;
333 ret = sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, NULL);
334 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
335 while (sqlite3_step(stmt) == SQLITE_ROW) {
336 const char *name = reinterpret_cast<const char *>(
337 sqlite3_column_text(stmt, 0));
338 const string names = sanitise_name(name);
339 if (*it == "dirs") {
340 paths.push_back(names);
341 } else {
342 paths.push_back(get_parent(names));
343 }
344 }
345 ret = sqlite3_finalize(stmt);
346 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
347 }
348 ret = sqlite3_close_v2(db);
349 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
350 }
351 return paths;
352 }
353
354 static int get_db_schema_revision(sqlite3 *db,
355 const std::string &db_name = "") {
356 sqlite3_stmt *stmt;
357 std::ostringstream stmt_str;
358 stmt_str << "SELECT value FROM " << db_name
359 << "properties WHERE key = 'schema_revision'";
360 int ret = sqlite3_prepare_v2(db, stmt_str.str().c_str(), -1, &stmt, NULL);
361 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
362
363 ret = sqlite3_step(stmt);
364 // if table exists, we require that it must have a schema_revision row
365 CHECK_SQLITE_ERROR(ret, SQLITE_ROW);
366 const std::string schema_revision_str(
367 reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0)));
368 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
369 return std::stoi(schema_revision_str);
370 }
371
372 static int get_row_count(sqlite3 *db, const std::string &table_name) {
373 sqlite3_stmt *stmt;
374 std::ostringstream stmt_str;
375 stmt_str << "SELECT COUNT(*) FROM " << table_name;
376 int ret = sqlite3_prepare_v2(db, stmt_str.str().c_str(), -1, &stmt, NULL);
377 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
378
379 ret = sqlite3_step(stmt);
380 CHECK_SQLITE_ERROR(ret, SQLITE_ROW);
381 const std::string count_str(
382 reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0)));
383 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
384 return std::stoi(count_str);
385 }
386
387 static int calculate_print_frequency(int total) {
388 int base = 1000;
389 while (base * 50 < total)
390 base *= 10;
391 return base;
392 }
393
394 // compute a common path among for paths for use as the lease path
395 static string get_lease_from_paths(vector<string> paths) {
396 CUSTOM_ASSERT(!paths.empty(), "no paths are provided");
397
398 // we'd have to ensure path is relative
399 // (probably best to check this elsewhere as it is not just a requirement for
400 // this function)
401 auto lease = PathString(paths.at(0));
402 for (auto it = paths.begin() + 1; it != paths.end(); ++it) {
403 auto path = PathString(*it);
404 // shrink the lease path until it is a parent of "path"
405 while (!IsSubPath(lease, path)) {
406 auto i = lease.GetLength() - 1;
407 for (; i >= 0; --i) {
408 if (lease.GetChars()[i] == '/' || i == 0) {
409 lease.Truncate(i);
410 break;
411 }
412 }
413 }
414 if (lease.IsEmpty())
415 break; // early stop if lease is already at the root
416 }
417
418 auto prefix = "/" + lease.ToString();
419
420 LogCvmfs(kLogCvmfs, kLogStdout, "Longest prefix is %s", prefix.c_str());
421 return prefix;
422 }
423
424 static XattrList marshal_xattrs(const char *acl_string) {
425 XattrList aclobj;
426
427 if (acl_string == NULL || acl_string[0] == '\0') {
428 return aclobj;
429 }
430
431 bool equiv_mode;
432 size_t binary_size;
433 char *binary_acl;
434 const int ret = acl_from_text_to_xattr_value(string(acl_string), binary_acl,
435 binary_size, equiv_mode);
436 if (ret) {
437 LogCvmfs(kLogCvmfs, kLogStderr,
438 "failure of acl_from_text_to_xattr_value(%s)", acl_string);
439 assert(
440 0); // TODO(vavolkl): incorporate error handling other than asserting
441 return aclobj;
442 }
443 if (!equiv_mode) {
444 CUSTOM_ASSERT(
445 aclobj.Set("system.posix_acl_access", string(binary_acl, binary_size)),
446 "failed to set system.posix_acl_access (ACL size %ld)", binary_size);
447 free(binary_acl);
448 }
449
450 return aclobj;
451 }
452
453 std::unordered_map<string, string> load_config(const string &config_file) {
454 std::unordered_map<string, string> config_map;
455 ifstream input(config_file);
456 if (!input) {
457 LogCvmfs(kLogCvmfs, kLogStderr, "could not open config file %s",
458 config_file.c_str());
459 return config_map;
460 }
461 vector<string> lines;
462 for (string line; getline(input, line);) {
463 lines.push_back(line);
464 }
465
466 for (auto it = lines.begin(); it != lines.end(); it++) {
467 const string l = *it;
468 const size_t p = l.find('=', 0);
469 if (p != string::npos) {
470 const string key = l.substr(0, p);
471 string val = l.substr(p + 1);
472 // trim any double quotes
473 if (val.front() == '"') {
474 val = val.substr(1, val.length() - 2);
475 }
476 config_map[key] = val;
477 }
478 }
479
480 return config_map;
481 }
482
483 string retrieve_config(std::unordered_map<string, string> &config_map,
484 const string &key) {
485 auto kv = config_map.find(key);
486 CUSTOM_ASSERT(kv != config_map.end(), "Parameter %s not found in config",
487 key.c_str());
488 return kv->second;
489 }
490
491 static vector<string> get_file_list(string &path) {
492 vector<string> paths;
493 const char *cpath = path.c_str();
494 struct stat st;
495 const int ret = stat(cpath, &st);
496 CUSTOM_ASSERT(ret == 0, "failed to stat file %s", cpath);
497
498 if (S_ISDIR(st.st_mode)) {
499 DIR *d;
500 struct dirent *dir;
501 d = opendir(cpath);
502 if (d) {
503 while ((dir = readdir(d)) != NULL) {
504 const char *t = strrchr(dir->d_name, '.');
505 if (t && !strcmp(t, ".db")) {
506 paths.push_back(path + "/" + dir->d_name);
507 }
508 }
509 closedir(d);
510 }
511 } else {
512 paths.push_back(path);
513 }
514 return paths;
515 }
516
517 extern bool g_log_with_time;
518
519 int swissknife::IngestSQL::Main(const swissknife::ArgumentList &args) {
520 // the catalog code uses assert() liberally.
521 // install ABRT signal handler to catch an abort and cancel lease
522 if (signal(SIGABRT, &on_signal) == SIG_ERR
523 || signal(SIGINT, &on_signal) == SIG_ERR
524 || signal(SIGTERM, &on_signal) == SIG_ERR) {
525 LogCvmfs(kLogCvmfs, kLogStdout, "Setting signal handlers failed");
526 exit(1);
527 }
528
529 const bool enable_corefiles = (args.find('c') != args.end());
530 if (!enable_corefiles) {
531 struct rlimit rlim;
532 rlim.rlim_cur = rlim.rlim_max = 0;
533 setrlimit(RLIMIT_CORE, &rlim);
534 }
535
536
537 if (args.find('n') != args.end()) {
538 create_empty_database(*args.find('n')->second);
539 exit(0);
540 }
541
542 if (args.find('B') != args.end()) {
543 g_wait_for_update = *args.find('B')->second;
544 }
545
546 if (args.find('P') != args.end()) {
547 const char *arg = (*args.find('P')->second).c_str();
548 char *at_null_terminator_if_number;
549 g_priority = strtoll(arg, &at_null_terminator_if_number, 10);
550 if (*at_null_terminator_if_number != '\0') {
551 LogCvmfs(kLogCvmfs, kLogStderr,
552 "Priority parameter value '%s' parsing failed", arg);
553 return 1;
554 }
555 } else {
556 g_priority = -time(NULL);
557 }
558
559
560 unsigned int lease_busy_retry_interval = kDefaultLeaseBusyRetryInterval;
561 if (args.find('r') != args.end()) {
562 lease_busy_retry_interval = atoi((*args.find('r')->second).c_str());
563 }
564
565 string dir_temp = "";
566 const char *env_tmpdir;
567 if (args.find('t') != args.end()) {
568 dir_temp = MakeCanonicalPath(*args.find('t')->second);
569 } else if ((env_tmpdir = getenv("TMPDIR"))) {
570 dir_temp = MakeCanonicalPath(env_tmpdir);
571 } else {
572 LogCvmfs(kLogCvmfs, kLogStderr, "-t or TMPDIR required");
573 return 1;
574 }
575
576 string kConfigDir("/etc/cvmfs/gateway-client/");
577 if (args.find('C') != args.end()) {
578 kConfigDir = MakeCanonicalPath(*args.find('C')->second);
579 kConfigDir += "/";
580 LogCvmfs(kLogCvmfs, kLogStdout, "Overriding configuration dir prefix to %s",
581 kConfigDir.c_str());
582 }
583
584 // mandatory arguments
585 string const repo_name = *args.find('N')->second;
586 string sqlite_db_path = *args.find('D')->second;
587
588 vector<string> sqlite_db_vec = get_file_list(sqlite_db_path);
589
590 // optional arguments
591 bool const allow_deletions = (args.find('d') != args.end());
592 bool const force_cancel_lease = (args.find('x') != args.end());
593 bool const allow_additions = !allow_deletions
594 || (args.find('a') != args.end());
595 g_add_missing_catalogs = (args.find('z') != args.end());
596 bool const check_completed_graft_property = (args.find('Z') != args.end());
597 if (args.find('v') != args.end()) {
598 SetLogVerbosity(kLogVerbose);
599 }
600
601 if (check_completed_graft_property) {
602 if (sqlite_db_vec.size() != 1) {
603 LogCvmfs(kLogCvmfs, kLogStderr, "-Z requires a single DB file");
604 exit(1);
605 }
606 if (isDatabaseMarkedComplete(sqlite_db_vec[0].c_str())) {
607 LogCvmfs(kLogCvmfs, kLogStderr,
608 "DB file is already marked as completed_graft");
609 exit(0);
610 } else {
611 LogCvmfs(kLogCvmfs, kLogStderr,
612 "DB file is not marked as completed_graft");
613 }
614 }
615
616 string const config_file = kConfigDir + repo_name + "/config";
617 string stratum0;
618 string proxy;
619
620 string additional_prefix = "";
621 bool has_additional_prefix = false;
622 if (args.find('p') != args.end()) {
623 additional_prefix = *args.find('p')->second;
624 additional_prefix = sanitise_name(additional_prefix.c_str(), true);
625 if (additional_prefix.back() != '/') {
626 additional_prefix += "/";
627 }
628 has_additional_prefix = true;
629 LogCvmfs(kLogCvmfs, kLogStdout,
630 "Adding additional prefix %s to lease and all paths",
631 additional_prefix.c_str());
632 // now we are confident that any additional prefix has no leading / and does
633 // have a tailing /
634 }
635 auto config_map = load_config(config_file);
636
637 if (args.find('g') != args.end()) {
638 g_gateway_url = *args.find('g')->second;
639 } else {
640 g_gateway_url = retrieve_config(config_map, "CVMFS_GATEWAY");
641 }
642 if (args.find('w') != args.end()) {
643 stratum0 = *args.find('w')->second;
644 } else {
645 stratum0 = retrieve_config(config_map, "CVMFS_STRATUM0");
646 }
647
648 if (args.find('@') != args.end()) {
649 proxy = *args.find('@')->second;
650 } else {
651 proxy = retrieve_config(config_map, "CVMFS_HTTP_PROXY");
652 }
653
654 string lease_path = "";
655 // bool lease_autodetected = false;
656 if (args.find('l') != args.end()) {
657 lease_path = *args.find('l')->second;
658 } else {
659 // lease path wasn't specified, so try to autodetect it
660 vector<string> const paths = get_all_dirs_from_sqlite(
661 sqlite_db_vec, allow_additions, allow_deletions);
662 if (paths.size() == 0) {
663 LogCvmfs(kLogCvmfs, kLogStdout, "Database is empty, nothing to do");
664 return 0; // treat it as a success
665 }
666 lease_path = get_lease_from_paths(paths);
667 // lease_autodetected = true;
668 }
669
670 if (has_additional_prefix) {
671 if (lease_path == "/") {
672 lease_path = "/" + additional_prefix;
673 } else {
674 if (lease_path.substr(0, 1) == "/") {
675 lease_path = "/" + additional_prefix
676 + lease_path.substr(1, lease_path.size() - 1);
677 } else {
678 lease_path = "/" + additional_prefix
679 + lease_path; // prefix is certain to have a trailing /
680 }
681 }
682 }
683 if (lease_path.substr(0, 1) != "/") {
684 lease_path = "/" + lease_path;
685 }
686 LogCvmfs(kLogCvmfs, kLogStdout, "Lease path is %s", lease_path.c_str());
687
688
689 string public_keys = kConfigDir + repo_name + "/pubkey";
690 string key_file = kConfigDir + repo_name + "/gatewaykey";
691 string s3_file = kConfigDir + repo_name + "/s3.conf";
692
693 if (args.find('k') != args.end()) {
694 public_keys = *args.find('k')->second;
695 }
696 if (args.find('s') != args.end()) {
697 key_file = *args.find('s')->second;
698 }
699 if (args.find('3') != args.end()) {
700 s3_file = *args.find('3')->second;
701 }
702
703 CUSTOM_ASSERT(access(public_keys.c_str(), R_OK) == 0, "%s is not readable",
704 public_keys.c_str());
705 CUSTOM_ASSERT(access(key_file.c_str(), R_OK) == 0, "%s is not readable",
706 key_file.c_str());
707
708 // string spooler_definition_string = string("gw,,") + g_gateway_url;
709 // create a spooler that will upload to S3
710 string const spooler_definition_string = string("S3,") + dir_temp + ","
711 + repo_name + "@" + s3_file;
712
713 // load gateway lease
714 if (!gateway::ReadKeys(key_file, &g_gateway_key_id, &g_gateway_secret)) {
715 LogCvmfs(kLogCvmfs, kLogStderr, "gateway::ReadKeys failed");
716 return 1;
717 }
718
719 uint64_t current_revision = 0;
720 std::string current_root_hash = "";
721
722 // acquire lease and save token to a file in the tmpdir
723 LogCvmfs(kLogCvmfs, kLogStdout, "Acquiring gateway lease on %s",
724 lease_path.c_str());
725 g_session_token = acquire_lease(g_gateway_key_id, g_gateway_secret,
726 repo_name + lease_path, g_gateway_url,
727 force_cancel_lease, &current_revision,
728 current_root_hash, lease_busy_retry_interval);
729
730
731 char *_tmpfile = strdup((dir_temp + "/gateway_session_token_XXXXXX").c_str());
732 int const temp_fd = mkstemp(_tmpfile);
733 g_session_token_file = string(_tmpfile);
734 free(_tmpfile);
735
736 FILE *fout = fdopen(temp_fd, "wb");
737 CUSTOM_ASSERT(fout != NULL,
738 "failed to open session token file %s for writing",
739 g_session_token_file.c_str());
740 fputs(g_session_token.c_str(), fout);
741 fclose(fout);
742
743 // now start the lease refresh thread
744 pthread_t lease_thread;
745 if (0 != pthread_create(&lease_thread, NULL, lease_refresh_thread, NULL)) {
746 LogCvmfs(kLogCvmfs, kLogStderr, "Unable to start lease refresh thread");
747 cancel_lease();
748 return 1;
749 }
750
751 // now initialise the various bits we need
752
753 upload::SpoolerDefinition spooler_definition(
754 spooler_definition_string, shash::kSha1, zlib::kZlibDefault, false, true,
755 SyncParameters::kDefaultMinFileChunkSize,
756 SyncParameters::kDefaultAvgFileChunkSize,
757 SyncParameters::kDefaultMaxFileChunkSize, g_session_token_file, key_file);
758
759 if (args.find('q') != args.end()) {
760 spooler_definition.number_of_concurrent_uploads = String2Uint64(
761 *args.find('q')->second);
762 }
763
764 upload::SpoolerDefinition const spooler_definition_catalogs(
765 spooler_definition.Dup2DefaultCompression());
766
767 UniquePtr<upload::Spooler> const spooler_catalogs(
768 upload::Spooler::Construct(spooler_definition_catalogs, nullptr));
769
770 if (!spooler_catalogs.IsValid()) {
771 LogCvmfs(kLogCvmfs, kLogStderr, "spooler_catalogs invalid");
772 cancel_lease();
773 return 1;
774 }
775 if (!InitDownloadManager(true, proxy, kCatalogDownloadMultiplier)) {
776 LogCvmfs(kLogCvmfs, kLogStderr, "download manager init failed");
777 cancel_lease();
778 return 1;
779 }
780 if (!InitSignatureManager(public_keys, "")) {
781 LogCvmfs(kLogCvmfs, kLogStderr, "signature manager init failed");
782 cancel_lease();
783 return 1;
784 }
785
786 UniquePtr<manifest::Manifest> manifest;
787
788 manifest = FetchRemoteManifest(stratum0, repo_name, shash::Any());
789
790 if (!manifest.IsValid()) {
791 LogCvmfs(kLogCvmfs, kLogStderr, "manifest invalid");
792 cancel_lease();
793 return 1;
794 }
795
796 if (current_revision > 0) {
797 if (current_revision == manifest->revision()) {
798 if (current_root_hash != manifest->catalog_hash().ToString()) {
799 LogCvmfs(kLogCvmfs, kLogStderr,
800 "Mismatch between cvmfspublished and gateway hash for "
801 "revision %lu (%s!=%s)",
802 current_revision, current_root_hash.c_str(),
803 manifest->catalog_hash().ToString().c_str());
804 cancel_lease();
805 return 1;
806 } else {
807 LogCvmfs(kLogCvmfs, kLogStdout,
808 "Gateway and .cvmfspublished agree on repo version %lu",
809 current_revision);
810 }
811 }
812 if (current_revision > manifest->revision()) {
813 LogCvmfs(kLogCvmfs, kLogStdout,
814 "Gateway has supplied a newer revision than the current "
815 ".cvmfspublished %lu > %lu",
816 current_revision, manifest->revision());
817 manifest->set_revision(current_revision);
818 manifest->set_catalog_hash(shash::MkFromHexPtr(
819 shash::HexPtr(current_root_hash), shash::kSuffixCatalog));
820 } else if (current_revision < manifest->revision()) {
821 LogCvmfs(kLogCvmfs, kLogStdout,
822 "Gateway has supplied an older revision than the current "
823 ".cvmfspublished %lu < %lu",
824 current_revision, manifest->revision());
825 }
826 } else {
827 LogCvmfs(kLogCvmfs, kLogStdout,
828 "Gateway has not supplied a revision. Using .cvmfspublished");
829 }
830
831
832 // get hash of current root catalog, remove terminal "C", encode it
833 string const old_root_hash = manifest->catalog_hash().ToString(true);
834 string const hash = old_root_hash.substr(0, old_root_hash.length() - 1);
835 shash::Any const base_hash = shash::MkFromHexPtr(shash::HexPtr(hash),
836 shash::kSuffixCatalog);
837 LogCvmfs(kLogCvmfs, kLogStdout, "old_root_hash: %s", old_root_hash.c_str());
838
839 bool const is_balanced = false;
840
841 catalog::WritableCatalogManager catalog_manager(
842 base_hash, stratum0, dir_temp, spooler_catalogs.weak_ref(),
843 download_manager(), false, SyncParameters::kDefaultNestedKcatalogLimit,
844 SyncParameters::kDefaultRootKcatalogLimit,
845 SyncParameters::kDefaultFileMbyteLimit, statistics(), is_balanced,
846 SyncParameters::kDefaultMaxWeight, SyncParameters::kDefaultMinWeight,
847 dir_temp /* dir_cache */);
848
849 catalog_manager.Init();
850
851
852 // now graft the contents of the DB
853 vector<sqlite3 *> open_dbs;
854 for (auto &&db_file : sqlite_db_vec) {
855 sqlite3 *db;
856 CHECK_SQLITE_ERROR(
857 sqlite3_open_v2(db_file.c_str(), &db, SQLITE_OPEN_READONLY, NULL),
858 SQLITE_OK);
859 relax_db_locking(db);
860 open_dbs.push_back(db);
861 }
862 process_sqlite(open_dbs, catalog_manager, allow_additions, allow_deletions,
863 lease_path.substr(1), additional_prefix);
864 for (auto &&db : open_dbs) {
865 CHECK_SQLITE_ERROR(sqlite3_close_v2(db), SQLITE_OK);
866 }
867
868 // commit changes
869 LogCvmfs(kLogCvmfs, kLogStdout, "Committing changes...");
870 if (!catalog_manager.Commit(false, false, manifest.weak_ref())) {
871 LogCvmfs(kLogCvmfs, kLogStderr, "something went wrong during sync");
872 cancel_lease();
873 return 1;
874 }
875
876 // finalize the spooler
877 LogCvmfs(kLogCvmfs, kLogStdout, "Waiting for all uploads to finish...");
878 spooler_catalogs->WaitForUpload();
879
880 LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
881
882 // We call FinalizeSession(true) this time, to also trigger the commit
883 // operation on the gateway machine (if the upstream is of type "gw").
884
885 // Get the path of the new root catalog
886 const string new_root_hash = manifest->catalog_hash().ToString(true);
887
888 // if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
889 // RepositoryTag())) {
890 // LogCvmfs(kLogCvmfs, kLogStderr, "Failed to commit the transaction");
891 // // lease is only released on success
892 // cancel_lease();
893 // return 1;
894 // }
895
896 LogCvmfs(kLogCvmfs, kLogStdout, "Committing with priority %" PRId64,
897 g_priority);
898
899 bool const ok = make_commit_on_gateway(old_root_hash, new_root_hash,
900 g_priority);
901 if (!ok) {
902 LogCvmfs(kLogCvmfs, kLogStderr,
903 "something went wrong during commit on gateway");
904 cancel_lease();
905 exit(1);
906 }
907
908
909 unlink(g_session_token_file.c_str());
910
911 g_stop_refresh = true;
912
913 if (g_wait_for_update != "") {
914 invalidate_manifest(proxy, stratum0 + "/.cvmfspublished");
915 wait_for_update(g_wait_for_update, g_final_revision);
916 }
917
918 if (check_completed_graft_property) {
919 setDatabaseMarkedComplete(sqlite_db_vec[0].c_str());
920 }
921
922
923 return 0;
924 }
925
926 size_t writeFunction(void *ptr, size_t size, size_t nmemb, std::string *data) {
927 return size * nmemb;
928 }
929
930 void replaceAllSubstrings(std::string &str, const std::string &from,
931 const std::string &to) {
932 if (from.empty()) {
933 return; // Avoid infinite loop if 'from' is an empty string.
934 }
935 size_t startPos = 0;
936 while ((startPos = str.find(from, startPos)) != std::string::npos) {
937 str.replace(startPos, from.length(), to);
938 startPos += to.length(); // Advance startPos to avoid replacing the
939 // substring just inserted.
940 }
941 }
942
943
944 void swissknife::IngestSQL::process_sqlite(
945 const std::vector<sqlite3 *> &dbs,
946 catalog::WritableCatalogManager &catalog_manager, bool allow_additions,
947 bool allow_deletions, const std::string &lease_path,
948 const std::string &additional_prefix) {
949 std::map<std::string, Directory> all_dirs;
950 std::map<std::string, std::vector<File> > all_files;
951 std::map<std::string, std::vector<Symlink> > all_symlinks;
952
953 for (auto &&db : dbs) {
954 load_dirs(db, lease_path, additional_prefix, all_dirs);
955 }
956
957 // put in a nested scope so we can free up memory of `dir_names`
958 {
959 LogCvmfs(kLogCvmfs, kLogStdout,
960 "Precaching existing directories (starting from %s)",
961 lease_path.c_str());
962 std::unordered_set<std::string> dir_names;
963 std::transform(all_dirs.begin(), all_dirs.end(),
964 std::inserter(dir_names, dir_names.end()),
965 [](const std::pair<std::string, Directory> &pair) {
966 return MakeCatalogPath(pair.first);
967 });
968 catalog_manager.LoadCatalogs(MakeCatalogPath(lease_path), dir_names);
969 }
970
971 for (auto &&db : dbs) {
972 load_files(db, lease_path, additional_prefix, all_files);
973 load_symlinks(db, lease_path, additional_prefix, all_symlinks);
974 }
975
976 // perform all deletions first
977 if (allow_deletions) {
978 LogCvmfs(kLogCvmfs, kLogStdout, "Processing deletions...");
979 for (auto &&db : dbs) {
980 CHECK_SQLITE_ERROR(
981 do_deletions(db, catalog_manager, lease_path, additional_prefix),
982 SQLITE_OK);
983 }
984 }
985
986 if (allow_additions) {
987 LogCvmfs(kLogCvmfs, kLogStdout, "Processing additions...");
988 // first ensure all directories are present and create missing ones
989 do_additions(all_dirs, all_files, all_symlinks, lease_path,
990 catalog_manager);
991 }
992 }
993
994 void add_dir_to_tree(
995 std::string path,
996 std::unordered_map<std::string, std::set<std::string> > &tree,
997 const std::string &lease_path) {
998 tree[path];
999 std::string parent_path = get_parent(path);
1000 // recursively create any missing parents in the tree
1001 // avoid creating a loop when we insert the root path
1002 while (path != parent_path && path != lease_path
1003 && !tree[parent_path].count(path)) {
1004 tree[parent_path].insert(path);
1005 path = parent_path;
1006 parent_path = get_parent(path);
1007 }
1008 }
1009
1010 int swissknife::IngestSQL::do_additions(
1011 const DirMap &all_dirs, const FileMap &all_files,
1012 const SymlinkMap &all_symlinks, const std::string &lease_path,
1013 catalog::WritableCatalogManager &catalog_manager) {
1014 // STEP 1:
1015 // - collect all the dirs/symlinks/files we need to process from the DB
1016 // - build a tree of paths for DFS traversal
1017 // - note the tree will contain all parent dirs of symlinks/files even if
1018 // those are not
1019 // explicitly added to the dirs table
1020 std::unordered_map<std::string, std::set<std::string> > tree;
1021 for (auto &&p : all_dirs) {
1022 add_dir_to_tree(p.first, tree, lease_path);
1023 }
1024 for (auto &&p : all_files) {
1025 add_dir_to_tree(p.first, tree, lease_path);
1026 }
1027 for (auto &&p : all_symlinks) {
1028 add_dir_to_tree(p.first, tree, lease_path);
1029 }
1030 int const row_count = static_cast<int>(tree.size());
1031 int const print_every = calculate_print_frequency(row_count);
1032 int curr_row = 0;
1033 LogCvmfs(kLogCvmfs, kLogStdout,
1034 "Changeset: %ld dirs, %ld files, %ld symlinks", tree.size(),
1035 all_files.size(), all_symlinks.size());
1036
1037 // STEP 2:
1038 // - process all the changes with DFS traversal
1039 // - make directories in pre-order
1040 // - add files/symlinks and schedule upload in post-order
1041 catalog_manager.SetupSingleCatalogUploadCallback();
1042 std::stack<string> dfs_stack;
1043 for (auto &&p : tree) {
1044 // figure out the starting point by checking whose parent is missing from
1045 // the tree
1046 if (p.first == "" || !tree.count(get_parent(p.first))) {
1047 CUSTOM_ASSERT(dfs_stack.empty(),
1048 "provided DB input forms more than one path trees");
1049 dfs_stack.push(p.first);
1050 }
1051 }
1052 std::set<string> visited;
1053 while (!dfs_stack.empty()) {
1054 string const curr_dir = dfs_stack.top();
1055 // add content for the dir in post-order traversal
1056 if (visited.count(curr_dir)) {
1057 curr_row++;
1058 if (all_symlinks.count(curr_dir)) {
1059 add_symlinks(catalog_manager, all_symlinks.at(curr_dir));
1060 }
1061 if (all_files.count(curr_dir)) {
1062 add_files(catalog_manager, all_files.at(curr_dir));
1063 }
1064 // snapshot the dir (if it's a nested catalog mountpoint)
1065 catalog::DirectoryEntry dir_entry;
1066 bool exists = false;
1067 exists = catalog_manager.LookupDirEntry(
1068 MakeCatalogPath(curr_dir), catalog::kLookupDefault, &dir_entry);
1069 assert(exists); // the dir must exist at this point
1070 if (dir_entry.IsNestedCatalogMountpoint()
1071 || dir_entry.IsNestedCatalogRoot()) {
1072 catalog_manager.AddCatalogToQueue(curr_dir);
1073 catalog_manager.ScheduleReadyCatalogs();
1074 }
1075 dfs_stack.pop();
1076 SHOW_PROGRESS("directories", print_every, curr_row, row_count);
1077 } else {
1078 visited.insert(curr_dir);
1079 // push children to the stack
1080 auto it = tree.find(curr_dir);
1081 if (it != tree.end()) {
1082 for (auto &&child : it->second) {
1083 dfs_stack.push(child);
1084 }
1085 tree.erase(it);
1086 }
1087 if (!all_dirs.count(curr_dir))
1088 continue;
1089
1090 // create the dir first in pre-order traversal
1091 const IngestSQL::Directory &dir = all_dirs.at(curr_dir);
1092 catalog::DirectoryEntry dir_entry;
1093
1094 bool exists = false;
1095 exists = catalog_manager.LookupDirEntry(
1096 MakeCatalogPath(curr_dir), catalog::kLookupDefault, &dir_entry);
1097 CUSTOM_ASSERT(
1098 !(exists && !S_ISDIR(dir_entry.mode_)),
1099 "Refusing to replace existing file/symlink at %s with a directory",
1100 dir.name.c_str());
1101
1102 dir_entry.name_ = NameString(get_basename(dir.name));
1103 dir_entry.mtime_ = dir.mtime / 1000000000;
1104 dir_entry.mode_ = dir.mode | S_IFDIR;
1105 dir_entry.mode_ &= (S_IFDIR | 0777);
1106 dir_entry.uid_ = dir.owner;
1107 dir_entry.gid_ = dir.grp;
1108 dir_entry.has_xattrs_ = !dir.xattr.IsEmpty();
1109
1110 bool add_nested_catalog = false;
1111
1112 if (exists) {
1113 catalog_manager.TouchDirectory(dir_entry, dir.xattr, dir.name);
1114 if ((!dir_entry.IsNestedCatalogMountpoint()
1115 && !dir_entry.IsNestedCatalogRoot())
1116 && (g_add_missing_catalogs || dir.nested)) {
1117 add_nested_catalog = true;
1118 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1119 "Touching existing directory %s and adding nested catalog",
1120 dir.name.c_str());
1121 } else {
1122 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Touching existing directory %s",
1123 dir.name.c_str());
1124 }
1125 } else {
1126 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding directory [%s]",
1127 dir.name.c_str());
1128 catalog_manager.AddDirectory(dir_entry, dir.xattr,
1129 get_parent(dir.name));
1130 if (dir.nested) {
1131 add_nested_catalog = true;
1132 }
1133 }
1134 if (add_nested_catalog) {
1135 // now add a .cvmfscatalog file
1136 // so that manual changes won't remove the nested catalog
1137 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1138 "Placing .cvmfscatalog file in [%s]", dir.name.c_str());
1139 catalog::DirectoryEntryBase dir2;
1140 dir2.name_ = NameString(".cvmfscatalog");
1141 dir2.mtime_ = dir.mtime / 1000000000;
1142 dir2.mode_ = (S_IFREG | 0666);
1143 dir2.uid_ = 0;
1144 dir2.gid_ = 0;
1145 dir2.has_xattrs_ = 0;
1146 dir2.checksum_ = shash::MkFromHexPtr(
1147 shash::HexPtr("da39a3ee5e6b4b0d3255bfef95601890afd80709"),
1148 shash::kSuffixNone); // hash of ""
1149 XattrList const xattr2;
1150 catalog_manager.AddFile(dir2, xattr2, dir.name);
1151
1152 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Creating Nested Catalog [%s]",
1153 dir.name.c_str());
1154 catalog_manager.CreateNestedCatalog(dir.name);
1155 }
1156 }
1157 }
1158
1159 // sanity check that we have processed all the input
1160 CUSTOM_ASSERT(tree.empty(),
1161 "not all directories are processed, malformed input DB?");
1162 catalog_manager.RemoveSingleCatalogUploadCallback();
1163 return 0;
1164 }
1165
1166 int swissknife::IngestSQL::add_symlinks(
1167 catalog::WritableCatalogManager &catalog_manager,
1168 const std::vector<Symlink> &symlinks) {
1169 for (auto &&symlink : symlinks) {
1170 catalog::DirectoryEntry dir;
1171 catalog::DirectoryEntryBase dir2;
1172 XattrList const xattr;
1173 bool exists = false;
1174 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(symlink.name),
1175 catalog::kLookupDefault, &dir);
1176
1177 dir2.name_ = NameString(get_basename(symlink.name));
1178 dir2.mtime_ = symlink.mtime / 1000000000;
1179 dir2.uid_ = symlink.owner;
1180 dir2.gid_ = symlink.grp;
1181 dir2.has_xattrs_ = false;
1182 dir2.symlink_ = LinkString(symlink.target);
1183 dir2.mode_ = S_IFLNK | 0777;
1184
1185 int noop = false;
1186
1187 if (exists) {
1188 if (symlink.skip_if_file_or_dir) {
1189 if (S_ISDIR(dir.mode_) || S_ISREG(dir.mode_)) {
1190 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1191 "File or directory for symlink [%s] exists, skipping "
1192 "symlink creation",
1193 symlink.name.c_str());
1194 noop = true;
1195 } else if (S_ISLNK(dir.mode_)) {
1196 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing existing symlink [%s]",
1197 symlink.name.c_str());
1198 catalog_manager.RemoveFile(symlink.name);
1199 } else {
1200 CUSTOM_ASSERT(0, "unknown mode for dirent: %d", dir.mode_);
1201 }
1202 } else {
1203 CUSTOM_ASSERT(!S_ISDIR(dir.mode_),
1204 "Not removing directory [%s] to create symlink",
1205 symlink.name.c_str());
1206 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1207 "Removing existing file/symlink [%s]", symlink.name.c_str());
1208 catalog_manager.RemoveFile(symlink.name);
1209 }
1210 }
1211 if (!noop) {
1212 string const parent = get_parent(symlink.name);
1213 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding symlink [%s] -> [%s]",
1214 symlink.name.c_str(), symlink.target.c_str());
1215 catalog_manager.AddFile(dir2, xattr, parent);
1216 }
1217 }
1218 return 0;
1219 }
1220
1221 static int check_hash(const char *hash) {
1222 if (strlen(hash) != 40) {
1223 return 1;
1224 }
1225 for (int i = 0; i < 40; i++) {
1226 // < '0' || > 'f' || ( > '9' && < 'a' )
1227 if (hash[i] < 0x30 || hash[i] > 0x66
1228 || (hash[i] > 0x39 && hash[i] < 0x61)) {
1229 return 1;
1230 }
1231 }
1232 return 0;
1233 }
1234
1235 bool check_prefix(const std::string &path, const std::string &prefix) {
1236 if (prefix == "" || prefix == "/") {
1237 return true;
1238 }
1239 if ("/" + path == prefix) {
1240 return true;
1241 }
1242 if (!HasPrefix(path, prefix, false)) {
1243 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Entry %s is outside lease path: %s",
1244 path.c_str(), prefix.c_str());
1245 return false;
1246 }
1247 return true;
1248 }
1249
1250 void swissknife::IngestSQL::load_dirs(
1251 sqlite3 *db, const std::string &lease_path,
1252 const std::string &additional_prefix,
1253 std::map<std::string, Directory> &all_dirs) {
1254 sqlite3_stmt *stmt;
1255 int const schema_revision = get_db_schema_revision(db);
1256 string select_stmt = "SELECT name, mode, mtime, owner, grp, acl, nested FROM "
1257 "dirs";
1258 if (schema_revision <= 3) {
1259 select_stmt = "SELECT name, mode, mtime, owner, grp, acl FROM dirs";
1260 }
1261 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1262 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1263 while (sqlite3_step(stmt) == SQLITE_ROW) {
1264 char *name_cstr = (char *)sqlite3_column_text(stmt, 0);
1265 mode_t const mode = sqlite3_column_int(stmt, 1);
1266 time_t const mtime = sqlite3_column_int64(stmt, 2);
1267 uid_t const owner = sqlite3_column_int(stmt, 3);
1268 gid_t const grp = sqlite3_column_int(stmt, 4);
1269 int const nested = schema_revision <= 3 ? 1 : sqlite3_column_int(stmt, 6);
1270
1271 string const name = additional_prefix + sanitise_name(name_cstr);
1272 CUSTOM_ASSERT(check_prefix(name, lease_path),
1273 "%s is not below lease path %s", name.c_str(),
1274 lease_path.c_str());
1275
1276 Directory dir(name, mtime, mode, owner, grp, nested);
1277 char *acl = (char *)sqlite3_column_text(stmt, 5);
1278 dir.xattr = marshal_xattrs(acl);
1279 all_dirs.insert(std::make_pair(name, dir));
1280 }
1281 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1282 }
1283
1284 void swissknife::IngestSQL::load_files(
1285 sqlite3 *db, const std::string &lease_path,
1286 const std::string &additional_prefix,
1287 std::map<std::string, std::vector<File> > &all_files) {
1288 sqlite3_stmt *stmt;
1289 int const schema_revision = get_db_schema_revision(db);
1290 string select_stmt = "SELECT name, mode, mtime, owner, grp, size, hashes, "
1291 "internal, compressed FROM files";
1292 if (schema_revision <= 2) {
1293 select_stmt = "SELECT name, mode, mtime, owner, grp, size, hashes, "
1294 "internal FROM files";
1295 }
1296 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1297 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1298 while (sqlite3_step(stmt) == SQLITE_ROW) {
1299 char *name = (char *)sqlite3_column_text(stmt, 0);
1300 mode_t const mode = sqlite3_column_int(stmt, 1);
1301 time_t const mtime = sqlite3_column_int64(stmt, 2);
1302 uid_t const owner = sqlite3_column_int(stmt, 3);
1303 gid_t const grp = sqlite3_column_int(stmt, 4);
1304 size_t const size = sqlite3_column_int64(stmt, 5);
1305 char *hashes_cstr = (char *)sqlite3_column_text(stmt, 6);
1306 int const internal = sqlite3_column_int(stmt, 7);
1307 int const compressed = schema_revision <= 2 ? 0
1308 : sqlite3_column_int(stmt, 8);
1309
1310 string names = additional_prefix + sanitise_name(name);
1311 CUSTOM_ASSERT(check_prefix(names, lease_path),
1312 "%s is not below lease path %s", names.c_str(),
1313 lease_path.c_str());
1314 string const parent_dir = get_parent(names);
1315
1316 if (!all_files.count(parent_dir)) {
1317 all_files[parent_dir] = vector<swissknife::IngestSQL::File>();
1318 }
1319 all_files[parent_dir].emplace_back(std::move(names), mtime, size, owner,
1320 grp, mode, internal, compressed);
1321
1322 // tokenize hashes
1323 char *ref;
1324 char *tok;
1325 tok = strtok_r(hashes_cstr, ",", &ref);
1326 vector<off_t> offsets;
1327 vector<size_t> sizes;
1328 vector<shash::Any> hashes;
1329 off_t offset = 0;
1330
1331 CUSTOM_ASSERT(size >= 0, "file size cannot be negative [%s]",
1332 names.c_str());
1333 size_t const kChunkSize = internal ? kInternalChunkSize
1334 : kExternalChunkSize;
1335
1336 while (tok) {
1337 offsets.push_back(offset);
1338 // TODO: check the hash format
1339 CUSTOM_ASSERT(check_hash(tok) == 0,
1340 "provided hash for [%s] is invalid: %s", names.c_str(),
1341 tok);
1342 hashes.push_back(
1343 shash::MkFromHexPtr(shash::HexPtr(tok), shash::kSuffixNone));
1344 tok = strtok_r(NULL, ",", &ref);
1345 offset += kChunkSize; // in the future we might want variable chunk
1346 // sizes specified in the DB
1347 }
1348 size_t expected_num_chunks = size / kChunkSize;
1349 if (expected_num_chunks * (size_t)kChunkSize < (size_t)size || size == 0) {
1350 expected_num_chunks++;
1351 }
1352 CUSTOM_ASSERT(
1353 offsets.size() == expected_num_chunks,
1354 "offsets size %ld does not match expected number of chunks %ld",
1355 offsets.size(), expected_num_chunks);
1356 for (size_t i = 0; i < offsets.size() - 1; i++) {
1357 sizes.push_back(size_t(offsets[i + 1] - offsets[i]));
1358 }
1359
1360 sizes.push_back(size_t(size - offsets[offsets.size() - 1]));
1361 for (size_t i = 0; i < offsets.size(); i++) {
1362 FileChunk const chunk = FileChunk(hashes[i], offsets[i], sizes[i]);
1363 all_files[parent_dir].back().chunks.PushBack(chunk);
1364 }
1365 }
1366 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1367 }
1368
1369 void swissknife::IngestSQL::load_symlinks(
1370 sqlite3 *db, const std::string &lease_path,
1371 const std::string &additional_prefix,
1372 std::map<std::string, std::vector<Symlink> > &all_symlinks) {
1373 sqlite3_stmt *stmt;
1374 string const select_stmt = "SELECT name, target, mtime, owner, grp, "
1375 "skip_if_file_or_dir FROM links";
1376 int const ret = sqlite3_prepare_v2(db, select_stmt.c_str(), -1, &stmt, NULL);
1377 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1378 while (sqlite3_step(stmt) == SQLITE_ROW) {
1379 char *name_cstr = (char *)sqlite3_column_text(stmt, 0);
1380 char *target_cstr = (char *)sqlite3_column_text(stmt, 1);
1381 time_t const mtime = sqlite3_column_int64(stmt, 2);
1382 uid_t const owner = sqlite3_column_int(stmt, 3);
1383 gid_t const grp = sqlite3_column_int(stmt, 4);
1384 int const skip_if_file_or_dir = sqlite3_column_int(stmt, 5);
1385
1386 string names = additional_prefix + sanitise_name(name_cstr);
1387 CUSTOM_ASSERT(check_prefix(names, lease_path),
1388 "%s is not below lease path %s", names.c_str(),
1389 lease_path.c_str());
1390 string target = target_cstr;
1391 string const parent_dir = get_parent(names);
1392
1393 if (!all_symlinks.count(parent_dir)) {
1394 all_symlinks[parent_dir] = vector<swissknife::IngestSQL::Symlink>();
1395 }
1396 all_symlinks[parent_dir].emplace_back(std::move(names), std::move(target),
1397 mtime, owner, grp,
1398 skip_if_file_or_dir);
1399 }
1400 CHECK_SQLITE_ERROR(sqlite3_finalize(stmt), SQLITE_OK);
1401 }
1402
1403 int swissknife::IngestSQL::add_files(
1404 catalog::WritableCatalogManager &catalog_manager,
1405 const std::vector<File> &files) {
1406 for (auto &&file : files) {
1407 catalog::DirectoryEntry dir;
1408 XattrList const xattr;
1409 bool exists = false;
1410 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(file.name),
1411 catalog::kLookupDefault, &dir);
1412
1413 dir.name_ = NameString(get_basename(file.name));
1414 dir.mtime_ = file.mtime / 1000000000;
1415 dir.mode_ = file.mode | S_IFREG;
1416 dir.mode_ &= (S_IFREG | 0777);
1417 dir.uid_ = file.owner;
1418 dir.gid_ = file.grp;
1419 dir.size_ = file.size;
1420 dir.has_xattrs_ = false;
1421 dir.is_external_file_ = !file.internal;
1422 dir.set_is_chunked_file(true);
1423 dir.checksum_ = shash::MkFromHexPtr(
1424 shash::HexPtr("0000000000000000000000000000000000000000"),
1425 shash::kSuffixNone);
1426
1427 // compression is permitted only for internal data
1428 CUSTOM_ASSERT(file.internal || (!file.internal && file.compressed < 2),
1429 "compression is only allowed for internal data [%s]",
1430 file.name.c_str());
1431
1432 switch (file.compressed) {
1433 case 1: // Uncompressed
1434 dir.compression_algorithm_ = zlib::kNoCompression;
1435 break;
1436 case 2: // Compressed with Zlib
1437 dir.compression_algorithm_ = zlib::kZlibDefault;
1438 break;
1439 // future cases: different compression schemes
1440 default: // default behaviour: compressed if internal, content-addressed.
1441 // Uncompressed if external
1442 dir.compression_algorithm_ = file.internal ? zlib::kZlibDefault
1443 : zlib::kNoCompression;
1444 }
1445
1446 if (exists) {
1447 CUSTOM_ASSERT(
1448 !S_ISDIR(dir.mode()) && !S_ISLNK(dir.mode()),
1449 "Refusing to replace existing dir/symlink at %s with a file",
1450 file.name.c_str());
1451 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing existing file [%s]",
1452 file.name.c_str());
1453 catalog_manager.RemoveFile(file.name);
1454 }
1455 string const parent = get_parent(file.name);
1456 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Adding chunked file [%s]",
1457 file.name.c_str());
1458 catalog_manager.AddChunkedFile(dir, xattr, parent, file.chunks);
1459 }
1460
1461 return 0;
1462 }
1463
1464 int swissknife::IngestSQL::do_deletions(
1465 sqlite3 *db, catalog::WritableCatalogManager &catalog_manager,
1466 const std::string &lease_path, const std::string &additional_prefix) {
1467 sqlite3_stmt *stmt;
1468 int const row_count = get_row_count(db, "deletions");
1469 int const print_every = calculate_print_frequency(row_count);
1470 int curr_row = 0;
1471 int ret = sqlite3_prepare_v2(db,
1472 "SELECT name, directory, file, link FROM "
1473 "deletions ORDER BY length(name) DESC",
1474 -1, &stmt, NULL);
1475 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1476 while (sqlite3_step(stmt) == SQLITE_ROW) {
1477 curr_row++;
1478
1479 char *name = (char *)sqlite3_column_text(stmt, 0);
1480 int64_t const isdir = sqlite3_column_int64(stmt, 1);
1481 int64_t const isfile = sqlite3_column_int64(stmt, 2);
1482 int64_t const islink = sqlite3_column_int64(stmt, 3);
1483
1484 string const names = additional_prefix + sanitise_name(name);
1485 CUSTOM_ASSERT(check_prefix(names, lease_path),
1486 "%s is not below lease path %s", names.c_str(),
1487 lease_path.c_str());
1488
1489 catalog::DirectoryEntry dirent;
1490 bool exists = false;
1491 exists = catalog_manager.LookupDirEntry(MakeCatalogPath(names),
1492 catalog::kLookupDefault, &dirent);
1493 if (exists) {
1494 if ((isdir && S_ISDIR(dirent.mode()))
1495 || (islink && S_ISLNK(dirent.mode()))
1496 || (isfile && S_ISREG(dirent.mode()))) {
1497 if (S_ISDIR(dirent.mode())) {
1498 PathString names_path(names);
1499 recursively_delete_directory(names_path, catalog_manager);
1500 } else {
1501 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing link/file [%s]",
1502 names.c_str());
1503 catalog_manager.RemoveFile(names);
1504 }
1505 } else {
1506 LogCvmfs(kLogCvmfs, kLogVerboseMsg,
1507 "Mismatch in deletion type, not deleting: [%s] (dir %ld/%d , "
1508 "link %ld/%d, file %ld/%d)",
1509 names.c_str(), isdir, S_ISDIR(dirent.mode()), islink,
1510 S_ISLNK(dirent.mode()), isfile, S_ISREG(dirent.mode()));
1511 }
1512 } else {
1513 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Not Removing non-existent [%s]",
1514 names.c_str());
1515 }
1516
1517 SHOW_PROGRESS("deletions", print_every, curr_row, row_count);
1518 }
1519 ret = sqlite3_finalize(stmt);
1520 return ret;
1521 }
1522
1523 const char *schema[] = {"PRAGMA journal_mode=WAL;",
1524
1525 "CREATE TABLE IF NOT EXISTS dirs ( \
1526 name TEXT PRIMARY KEY, \
1527 mode INTEGER NOT NULL DEFAULT 493,\
1528 mtime INTEGER NOT NULL DEFAULT 0,\
1529 owner INTEGER NOT NULL DEFAULT 0, \
1530 grp INTEGER NOT NULL DEFAULT 0, \
1531 acl TEXT NOT NULL DEFAULT '', \
1532 nested INTEGER DEFAULT 1);",
1533
1534 "CREATE TABLE IF NOT EXISTS files ( \
1535 name TEXT PRIMARY KEY, \
1536 mode INTEGER NOT NULL DEFAULT 420, \
1537 mtime INTEGER NOT NULL DEFAULT 0,\
1538 owner INTEGER NOT NULL DEFAULT 0,\
1539 grp INTEGER NOT NULL DEFAULT 0,\
1540 size INTEGER NOT NULL DEFAULT 0,\
1541 hashes TEXT NOT NULL DEFAULT '',\
1542 internal INTEGER NOT NULL DEFAULT 0,\
1543 compressed INTEGER NOT NULL DEFAULT 0\
1544 );",
1545
1546 "CREATE TABLE IF NOT EXISTS links (\
1547 name TEXT PRIMARY KEY,\
1548 target TEXT NOT NULL DEFAULT '',\
1549 mtime INTEGER NOT NULL DEFAULT 0,\
1550 owner INTEGER NOT NULL DEFAULT 0,\
1551 grp INTEGER NOT NULL DEFAULT 0,\
1552 skip_if_file_or_dir INTEGER NOT NULL DEFAULT 0\
1553 );",
1554
1555 "CREATE TABLE IF NOT EXISTS deletions (\
1556 name TEXT PRIMARY KEY,\
1557 directory INTEGER NOT NULL DEFAULT 0,\
1558 file INTEGER NOT NULL DEFAULT 0,\
1559 link INTEGER NOT NULL DEFAULT 0\
1560 );",
1561
1562 "CREATE TABLE IF NOT EXISTS properties (\
1563 key TEXT PRIMARY KEY,\
1564 value TEXT NOT NULL\
1565 );",
1566
1567 "INSERT INTO properties VALUES ('schema_revision', "
1568 "'4') ON CONFLICT DO NOTHING;",
1569 NULL};
1570
1571 static void create_empty_database(string &filename) {
1572 sqlite3 *db_out;
1573 LogCvmfs(kLogCvmfs, kLogStdout, "Creating empty database file %s",
1574 filename.c_str());
1575 int ret = sqlite3_open_v2(filename.c_str(), &db_out,
1576 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL);
1577 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1578 relax_db_locking(db_out);
1579
1580 const char **ptr = schema;
1581 while (*ptr != NULL) {
1582 ret = sqlite3_exec(db_out, *ptr, NULL, NULL, NULL);
1583 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1584 ptr++;
1585 }
1586 sqlite3_close(db_out);
1587 }
1588
1589 static void recursively_delete_directory(
1590 PathString &path, catalog::WritableCatalogManager &catalog_manager) {
1591 catalog::DirectoryEntryList const listing;
1592
1593 // Add all names
1594 catalog::StatEntryList listing_from_catalog;
1595 bool const retval = catalog_manager.ListingStat(
1596 PathString("/" + path.ToString()), &listing_from_catalog);
1597
1598 CUSTOM_ASSERT(retval, "failed to call ListingStat for %s", path.c_str());
1599
1600 if (!catalog_manager.IsTransitionPoint(path.ToString())) {
1601 for (unsigned i = 0; i < listing_from_catalog.size(); ++i) {
1602 PathString entry_path;
1603 entry_path.Assign(path);
1604 entry_path.Append("/", 1);
1605 entry_path.Append(listing_from_catalog.AtPtr(i)->name.GetChars(),
1606 listing_from_catalog.AtPtr(i)->name.GetLength());
1607
1608 if (S_ISDIR(listing_from_catalog.AtPtr(i)->info.st_mode)) {
1609 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Recursing into %s/",
1610 entry_path.ToString().c_str());
1611 recursively_delete_directory(entry_path, catalog_manager);
1612
1613
1614 } else {
1615 LogCvmfs(kLogCvmfs, kLogVerboseMsg, " Recursively removing %s",
1616 entry_path.ToString().c_str());
1617 catalog_manager.RemoveFile(entry_path.ToString());
1618 }
1619 }
1620 } else {
1621 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing nested catalog %s",
1622 path.ToString().c_str());
1623 catalog_manager.RemoveNestedCatalog(path.ToString(), false);
1624 }
1625 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "Removing directory %s",
1626 path.ToString().c_str());
1627 catalog_manager.RemoveDirectory(path.ToString());
1628 }
1629
1630
1631 static void relax_db_locking(sqlite3 *db) {
1632 int ret = 0;
1633 ret = sqlite3_exec(db, "PRAGMA temp_store=2", NULL, NULL, NULL);
1634 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1635 ret = sqlite3_exec(db, "PRAGMA synchronous=OFF", NULL, NULL, NULL);
1636 CHECK_SQLITE_ERROR(ret, SQLITE_OK);
1637 }
1638
1639
1640 extern "C" void *lease_refresh_thread(void *payload) {
1641 while (!g_stop_refresh) {
1642 sleep(2);
1643 refresh_lease();
1644 }
1645 return NULL;
1646 }
1647
1648 static bool isDatabaseMarkedComplete(const char *dbfile) {
1649 int ret;
1650 sqlite3 *db;
1651 sqlite3_stmt *stmt;
1652 bool retval = false;
1653
1654 ret = sqlite3_open(dbfile, &db);
1655 if (ret != SQLITE_OK) {
1656 return false;
1657 }
1658
1659 const char *req = "SELECT value FROM properties WHERE key='completed_graft'";
1660 ret = sqlite3_prepare_v2(db, req, -1, &stmt, NULL);
1661 if (ret != SQLITE_OK) {
1662 return false;
1663 }
1664 if (sqlite3_step(stmt) == SQLITE_ROW) {
1665 int const id = sqlite3_column_int(stmt, 0);
1666 if (id > 0) {
1667 retval = true;
1668 }
1669 }
1670 sqlite3_close(db);
1671 return retval;
1672 }
1673
1674 static void setDatabaseMarkedComplete(const char *dbfile) {
1675 int ret;
1676 sqlite3 *db;
1677 char *err;
1678
1679 ret = sqlite3_open(dbfile, &db);
1680 if (ret != SQLITE_OK) {
1681 return;
1682 }
1683
1684 const char *req = "INSERT INTO properties (key, value) VALUES "
1685 "('completed_graft',1) ON CONFLICT(key) DO UPDATE SET "
1686 "value=1 WHERE key='completed_graft'";
1687
1688 ret = sqlite3_exec(db, req, 0, 0, &err);
1689 if (ret != SQLITE_OK) {
1690 return;
1691 }
1692 sqlite3_close(db);
1693 }
1694
1695 static void invalidate_manifest(std::string proxy_list, std::string url) {
1696 // split the proxy string -- remove any '"' and split on '|' or ';'
1697 size_t pos = 0;
1698 // replace any ';' with '|' to simplify subsequent split
1699 while ((pos = proxy_list.find(';', pos)) != std::string::npos) {
1700 proxy_list.replace(pos, 1, 1, '|');
1701 ++pos;
1702 }
1703 // remove any leading or trailing '"'
1704 if (HasPrefix(proxy_list, "\"", true)) {
1705 proxy_list = proxy_list.substr(1);
1706 }
1707 if (HasSuffix(proxy_list, "\"", true)) {
1708 proxy_list = proxy_list.substr(0, proxy_list.size() - 1);
1709 }
1710 // rewrite the port from 6086 to 6081 to ensure the invalidation works
1711 replaceAllSubstrings(proxy_list, ":6086", ":6081");
1712
1713 std::vector<std::string> proxies = SplitString(proxy_list, '|');
1714
1715 // now iterate over all the proxies
1716 // for the first, force a no-cache GET back to google
1717 // for the remainder just PURGE
1718 bool first = true;
1719 for (auto p = proxies.begin(); p != proxies.end(); p++) {
1720 bool ok = true;
1721 const string proxy = *p;
1722 CURL *curl = NULL;
1723 CURLcode res = CURLE_OK;
1724 struct curl_slist *headers = NULL;
1725 curl = curl_easy_init();
1726 if (!curl) {
1727 LogCvmfs(kLogCvmfs, kLogStdout, "Unable to init curl!");
1728 return;
1729 }
1730 res = curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
1731 if (proxy != "DIRECT") {
1732 res = curl_easy_setopt(curl, CURLOPT_PROXY, proxy.c_str());
1733 }
1734 res = curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 1l);
1735 res = curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3l);
1736 res = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, writeFunction);
1737
1738 if (first) {
1739 headers = curl_slist_append(headers, "Cache-Control: no-cache");
1740 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1741 } else {
1742 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PURGE");
1743 }
1744
1745 res = curl_easy_perform(curl);
1746 if (res != CURLE_OK) {
1747 LogCvmfs(kLogCvmfs, kLogStdout,
1748 "Manifest invalidation failed: curl error = [%d] [%s] url = %s "
1749 "proxy = %s",
1750 res, curl_easy_strerror(res), url.c_str(), proxy.c_str());
1751 ok = false;
1752 }
1753 if (headers) {
1754 curl_slist_free_all(headers);
1755 }
1756 curl_easy_cleanup(curl);
1757 if (ok) {
1758 first = false;
1759 }
1760 }
1761 }
1762
1763
1764 static void wait_for_update(std::string path, long revision) {
1765 char val[101];
1766 memset(val, 0, 101);
1767 long current = -1;
1768 DIR *d;
1769 while (-1 != getxattr(path.c_str(), "user.revision", val, 100)) {
1770 const long x = atol(val);
1771 if (x >= revision) {
1772 LogCvmfs(kLogCvmfs, kLogStdout, "Mount reached revision %ld", x);
1773 return;
1774 } else if (x != current) {
1775 current = x;
1776 LogCvmfs(kLogCvmfs, kLogStdout, "Mount at revision %ld, waiting..", x);
1777 }
1778 sleep(1);
1779 d = opendir(path.c_str());
1780 if (d) {
1781 closedir(d);
1782 }
1783 }
1784 LogCvmfs(kLogCvmfs, kLogStdout,
1785 "Unable to query user.revision xattr of [%s]: errno: %d",
1786 path.c_str(), errno);
1787 }
1788