GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_sync.cc
Date: 2025-06-15 02:35:55
Exec Total Coverage
Lines: 0 494 0.0%
Branches: 0 322 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System
3 *
4 * This tool figures out the changes made to a cvmfs repository by means
5 * of a union file system mounted on top of a cvmfs volume.
6 * We take all three volumes (namely union, overlay and repository) into
7 * account to sync the changes back into the repository.
8 *
9 * On the repository side we have a catalogs directory that mimics the
10 * shadow directory structure and stores compressed and uncompressed
11 * versions of all catalogs. The raw data are stored in the data
12 * subdirectory in zlib-compressed form. They are named with their SHA-1
13 * hash of the compressed file (like in CVMFS client cache, but with a
14 * 2-level cache hierarchy). Symlinks from the catalog directory to the
15 * data directory form the connection. If necessary, add a .htaccess file
16 * to allow Apache to follow the symlinks.
17 */
18
19 // NOLINTNEXTLINE
20 #define _FILE_OFFSET_BITS 64
21 // NOLINTNEXTLINE
22 #define __STDC_FORMAT_MACROS
23
24 #include "swissknife_sync.h"
25
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <glob.h>
29 #include <inttypes.h>
30 #include <limits.h>
31 #include <sys/capability.h>
32
33 #include <cstdio>
34 #include <cstdlib>
35 #include <string>
36 #include <vector>
37
38 #include "catalog_mgr_ro.h"
39 #include "catalog_mgr_rw.h"
40 #include "catalog_virtual.h"
41 #include "manifest.h"
42 #include "monitor.h"
43 #include "network/download.h"
44 #include "path_filters/dirtab.h"
45 #include "reflog.h"
46 #include "sanitizer.h"
47 #include "statistics.h"
48 #include "statistics_database.h"
49 #include "swissknife_capabilities.h"
50 #include "sync_mediator.h"
51 #include "sync_union.h"
52 #include "sync_union_aufs.h"
53 #include "sync_union_overlayfs.h"
54 #include "util/logging.h"
55 #include "util/platform.h"
56 #include "util/string.h"
57
58 using namespace std; // NOLINT
59
60 bool swissknife::CommandSync::CheckParams(const SyncParameters &p) {
61 if (!DirectoryExists(p.dir_scratch)) {
62 PrintError("overlay (copy on write) directory does not exist");
63 return false;
64 }
65 if (!DirectoryExists(p.dir_union)) {
66 PrintError("union volume does not exist");
67 return false;
68 }
69 if (!DirectoryExists(p.dir_rdonly)) {
70 PrintError("cvmfs read/only repository does not exist");
71 return false;
72 }
73 if (p.stratum0 == "") {
74 PrintError("Stratum0 url missing");
75 return false;
76 }
77
78 if (p.manifest_path == "") {
79 PrintError("manifest output required");
80 return false;
81 }
82 if (!DirectoryExists(p.dir_temp)) {
83 PrintError("data store directory does not exist");
84 return false;
85 }
86
87 if (p.min_file_chunk_size >= p.avg_file_chunk_size
88 || p.avg_file_chunk_size >= p.max_file_chunk_size) {
89 PrintError("file chunk size values are not sane");
90 return false;
91 }
92
93 if (HasPrefix(p.spooler_definition, "gw", false)) {
94 if (p.session_token_file.empty()) {
95 PrintError("Session token file has to be provided "
96 "when upstream type is gw.");
97 return false;
98 }
99 }
100
101 return true;
102 }
103
104 int swissknife::CommandCreate::Main(const swissknife::ArgumentList &args) {
105 const string manifest_path = *args.find('o')->second;
106 const string dir_temp = *args.find('t')->second;
107 const string spooler_definition = *args.find('r')->second;
108 const string repo_name = *args.find('n')->second;
109 const string reflog_chksum_path = *args.find('R')->second;
110 if (args.find('l') != args.end()) {
111 const unsigned log_level = kLogLevel0
112 << String2Uint64(*args.find('l')->second);
113 if (log_level > kLogNone) {
114 LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
115 return 1;
116 }
117 SetLogVerbosity(static_cast<LogLevels>(log_level));
118 }
119 shash::Algorithms hash_algorithm = shash::kSha1;
120 if (args.find('a') != args.end()) {
121 hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
122 if (hash_algorithm == shash::kAny) {
123 PrintError("unknown hash algorithm");
124 return 1;
125 }
126 }
127
128 const bool volatile_content = (args.count('v') > 0);
129 const bool garbage_collectable = (args.count('z') > 0);
130 std::string voms_authz;
131 if (args.find('V') != args.end()) {
132 voms_authz = *args.find('V')->second;
133 }
134
135 const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
136 zlib::kZlibDefault);
137 const UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
138 assert(spooler.IsValid());
139
140 const UniquePtr<manifest::Manifest> manifest(
141 catalog::WritableCatalogManager::CreateRepository(
142 dir_temp, volatile_content, voms_authz, spooler.weak_ref()));
143 if (!manifest.IsValid()) {
144 PrintError("Swissknife Sync: Failed to create new repository");
145 return 1;
146 }
147
148 UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
149 if (!reflog.IsValid()) {
150 PrintError("Swissknife Sync: Failed to create fresh Reflog");
151 return 1;
152 }
153
154 reflog->DropDatabaseFileOwnership();
155 const string reflog_path = reflog->database_file();
156 reflog.Destroy();
157 shash::Any reflog_hash(hash_algorithm);
158 manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
159 spooler->UploadReflog(reflog_path);
160 spooler->WaitForUpload();
161 unlink(reflog_path.c_str());
162 if (spooler->GetNumberOfErrors()) {
163 LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: Failed to upload reflog");
164 return 4;
165 }
166 assert(!reflog_chksum_path.empty());
167 manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
168
169 // set optional manifest fields
170 const bool needs_bootstrap_shortcuts = !voms_authz.empty();
171 manifest->set_garbage_collectability(garbage_collectable);
172 manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
173
174 if (!manifest->Export(manifest_path)) {
175 PrintError("Swissknife Sync: Failed to create new repository");
176 return 5;
177 }
178
179 return 0;
180 }
181
182 int swissknife::CommandUpload::Main(const swissknife::ArgumentList &args) {
183 const string source = *args.find('i')->second;
184 const string dest = *args.find('o')->second;
185 const string spooler_definition = *args.find('r')->second;
186 shash::Algorithms hash_algorithm = shash::kSha1;
187 if (args.find('a') != args.end()) {
188 hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
189 if (hash_algorithm == shash::kAny) {
190 PrintError("Swissknife Sync: Unknown hash algorithm");
191 return 1;
192 }
193 }
194
195 const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
196 upload::Spooler *spooler = upload::Spooler::Construct(sd);
197 assert(spooler);
198 spooler->Upload(source, dest);
199 spooler->WaitForUpload();
200
201 if (spooler->GetNumberOfErrors() > 0) {
202 LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to upload %s",
203 source.c_str());
204 return 1;
205 }
206
207 delete spooler;
208
209 return 0;
210 }
211
212 int swissknife::CommandPeek::Main(const swissknife::ArgumentList &args) {
213 const string file_to_peek = *args.find('d')->second;
214 const string spooler_definition = *args.find('r')->second;
215
216 // Hash doesn't matter
217 const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
218 upload::Spooler *spooler = upload::Spooler::Construct(sd);
219 assert(spooler);
220 const bool success = spooler->Peek(file_to_peek);
221
222 if (spooler->GetNumberOfErrors() > 0) {
223 LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to peek for %s",
224 file_to_peek.c_str());
225 return 2;
226 }
227 if (!success) {
228 LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s not found",
229 file_to_peek.c_str());
230 return 1;
231 }
232 LogCvmfs(kLogCatalog, kLogStdout, "Swissknife Sync: %s available",
233 file_to_peek.c_str());
234
235 delete spooler;
236
237 return 0;
238 }
239
240 int swissknife::CommandRemove::Main(const ArgumentList &args) {
241 const string file_to_delete = *args.find('o')->second;
242 const string spooler_definition = *args.find('r')->second;
243
244 // Hash doesn't matter
245 const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
246 upload::Spooler *spooler = upload::Spooler::Construct(sd);
247 assert(spooler);
248 spooler->RemoveAsync(file_to_delete);
249 spooler->WaitForUpload();
250
251 if (spooler->GetNumberOfErrors() > 0) {
252 LogCvmfs(kLogCatalog, kLogStderr, "Swissknife Sync: failed to delete %s",
253 file_to_delete.c_str());
254 return 1;
255 }
256
257 delete spooler;
258
259 return 0;
260 }
261
262 int swissknife::CommandApplyDirtab::Main(const ArgumentList &args) {
263 const string dirtab_file = *args.find('d')->second;
264 union_dir_ = MakeCanonicalPath(*args.find('u')->second);
265 scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
266 const shash::Any base_hash = shash::MkFromHexPtr(
267 shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
268 const string stratum0 = *args.find('w')->second;
269 const string dir_temp = *args.find('t')->second;
270 verbose_ = (args.find('x') != args.end());
271
272 // check if there is a dirtab file
273 if (!FileExists(dirtab_file)) {
274 LogCvmfs(kLogCatalog, kLogVerboseMsg,
275 "Swissknife Sync: Didn't find a dirtab at '%s'. Skipping...",
276 dirtab_file.c_str());
277 return 0;
278 }
279
280 // parse dirtab file
281 catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
282 if (!dirtab->IsValid()) {
283 LogCvmfs(kLogCatalog, kLogStderr,
284 "Swissknife Sync: Invalid or not readable dirtab '%s'",
285 dirtab_file.c_str());
286 return 1;
287 }
288 LogCvmfs(kLogCatalog, kLogVerboseMsg,
289 "Swissknife Sync: Found %lu rules in dirtab '%s'",
290 dirtab->RuleCount(), dirtab_file.c_str());
291
292 // initialize catalog infrastructure
293 const bool auto_manage_catalog_files = true;
294 const bool follow_redirects = (args.count('L') > 0);
295 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
296 if (!InitDownloadManager(follow_redirects, proxy)) {
297 return 1;
298 }
299 catalog::SimpleCatalogManager catalog_manager(
300 base_hash, stratum0, dir_temp, download_manager(), statistics(),
301 auto_manage_catalog_files);
302 catalog_manager.Init();
303
304 vector<string> new_nested_catalogs;
305 DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
306 &new_nested_catalogs);
307 const bool success = CreateCatalogMarkers(new_nested_catalogs);
308 delete dirtab;
309
310 return (success) ? 0 : 1;
311 }
312
313
314 namespace {
315
316 // Overwrite directory traversal in the globbing in order to avoid breaking out
317 // the repository tree
318
319 std::string *g_glob_uniondir = NULL;
320
321 bool GlobCheckPath(const char *name) {
322 char resolved_cstr[PATH_MAX];
323 char *retval = realpath(name, resolved_cstr);
324 if (retval == NULL)
325 return false;
326
327 const std::string resolved(resolved_cstr);
328 if (resolved == *g_glob_uniondir)
329 return true;
330 if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
331 errno = EACCES;
332 return false;
333 }
334 return true;
335 }
336
337 void *GlobOpendir(const char *name) {
338 if (!GlobCheckPath(name))
339 return NULL;
340 return opendir(name);
341 }
342
343 void GlobClosedir(void *dirp) { closedir(static_cast<DIR *>(dirp)); }
344
345 struct dirent *GlobReaddir(void *dirp) {
346 return readdir(static_cast<DIR *>(dirp));
347 }
348
349 int GlobLstat(const char *name, struct stat *st) {
350 if (!GlobCheckPath(name))
351 return -1;
352 return lstat(name, st);
353 }
354
355 int GlobStat(const char *name, struct stat *st) {
356 if (!GlobCheckPath(name))
357 return -1;
358 return stat(name, st);
359 }
360
361
362 } // anonymous namespace
363
364 void swissknife::CommandApplyDirtab::DetermineNestedCatalogCandidates(
365 const catalog::Dirtab &dirtab,
366 catalog::SimpleCatalogManager *catalog_manager,
367 vector<string> *nested_catalog_candidates) {
368 // find possible new nested catalog locations
369 const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
370 catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
371 const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
372 for (; i != iend; ++i) {
373 assert(!i->is_negation);
374
375 // run a glob using the current dirtab rule on the current repository
376 // state
377 const std::string &glob_string = i->pathspec.GetGlobString();
378 const std::string &glob_string_abs = union_dir_ + glob_string;
379 const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD
380 | GLOB_ALTDIRFUNC;
381 glob_t glob_res;
382 g_glob_uniondir = new std::string(union_dir_);
383 glob_res.gl_opendir = GlobOpendir;
384 glob_res.gl_readdir = GlobReaddir;
385 glob_res.gl_closedir = GlobClosedir;
386 glob_res.gl_lstat = GlobLstat;
387 glob_res.gl_stat = GlobStat;
388 const int glob_retval = glob(glob_string_abs.c_str(), glob_flags, NULL,
389 &glob_res);
390 delete g_glob_uniondir;
391 g_glob_uniondir = NULL;
392
393 if (glob_retval == 0) {
394 // found some candidates... filtering by cvmfs catalog structure
395 LogCvmfs(kLogCatalog, kLogDebug,
396 "Swissknife Sync: Found %lu entries for pathspec (%s)",
397 glob_res.gl_pathc, glob_string.c_str());
398 FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
399 glob_res.gl_pathc, catalog_manager,
400 nested_catalog_candidates);
401 } else if (glob_retval == GLOB_NOMATCH) {
402 LogCvmfs(kLogCvmfs, kLogStderr,
403 "Swissknife Sync: WARNING: cannot apply pathspec %s",
404 glob_string.c_str());
405 } else {
406 LogCvmfs(kLogCvmfs, kLogStderr,
407 "Swissknife Sync: Failed to run glob matching (%s)",
408 glob_string.c_str());
409 }
410
411 globfree(&glob_res);
412 }
413 }
414
415 void swissknife::CommandApplyDirtab::FilterCandidatesFromGlobResult(
416 const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
417 catalog::SimpleCatalogManager *catalog_manager,
418 std::vector<std::string> *nested_catalog_candidates) {
419 // go through the paths produced by glob() and filter them
420 for (size_t i = 0; i < npaths; ++i) {
421 // process candidate paths
422 const std::string candidate(paths[i]);
423 const std::string candidate_rel = candidate.substr(union_dir_.size());
424
425 // check if path points to a directory
426 platform_stat64 candidate_info;
427 const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
428 if (lstat_retval != 0) {
429 LogCvmfs(kLogCatalog, kLogDebug | kLogStderr | kLogSyslogErr,
430 "Swissknife Sync: "
431 "Error in processing .cvmfsdirtab: cannot access %s (%d)",
432 candidate.c_str(), errno);
433 abort();
434 }
435 assert(lstat_retval == 0);
436 if (!S_ISDIR(candidate_info.st_mode)) {
437 // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
438 // returned
439 LogCvmfs(kLogCatalog, kLogDebug,
440 "Swissknife Sync: "
441 "The '%s' dirtab entry does not point to a directory "
442 "but to a file or a symbolic link",
443 candidate_rel.c_str());
444 continue;
445 }
446
447 // check if the path is a meta-directory (. or ..)
448 assert(candidate_rel.size() >= 2);
449 if (candidate_rel.substr(candidate_rel.size() - 2) == "/."
450 || candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
451 continue;
452 }
453
454 // check that the path isn't excluded in the dirtab
455 if (dirtab.IsOpposing(candidate_rel)) {
456 LogCvmfs(kLogCatalog, kLogDebug,
457 "Swissknife Sync: Candidate '%s' is excluded by dirtab",
458 candidate_rel.c_str());
459 continue;
460 }
461
462 // lookup the path in the catalog structure to find out if it already
463 // points to a nested catalog transition point. Furthermore it could be
464 // a new directory and thus not in any catalog yet.
465 catalog::DirectoryEntry dirent;
466 const bool lookup_success = catalog_manager->LookupPath(
467 candidate_rel, catalog::kLookupDefault, &dirent);
468 if (!lookup_success) {
469 LogCvmfs(kLogCatalog, kLogDebug,
470 "Swissknife Sync: Didn't find '%s' in catalogs, could "
471 "be a new directory and nested catalog.",
472 candidate_rel.c_str());
473 nested_catalog_candidates->push_back(candidate);
474 } else if (!dirent.IsNestedCatalogMountpoint()
475 && !dirent.IsNestedCatalogRoot()) {
476 LogCvmfs(kLogCatalog, kLogDebug,
477 "Swissknife Sync: Found '%s' in catalogs but is not a "
478 "nested catalog yet.",
479 candidate_rel.c_str());
480 nested_catalog_candidates->push_back(candidate);
481 } else {
482 // check if the nested catalog marker is still there, we might need to
483 // recreate the catalog after manual marker removal
484 // Note: First we check if the parent directory shows up in the scratch
485 // space to verify that it was touched (copy-on-write)
486 // Otherwise we would force the cvmfs client behind the union
487 // file-
488 // system to (potentially) unnecessarily fetch catalogs
489 if (DirectoryExists(scratch_dir_ + candidate_rel)
490 && !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
491 LogCvmfs(kLogCatalog, kLogStdout,
492 "Swissknife Sync: WARNING: '%s' should be a nested "
493 "catalog according to the dirtab. "
494 "Recreating...",
495 candidate_rel.c_str());
496 nested_catalog_candidates->push_back(candidate);
497 } else {
498 LogCvmfs(kLogCatalog, kLogDebug,
499 "Swissknife Sync: "
500 "Found '%s' in catalogs and it already is a nested catalog.",
501 candidate_rel.c_str());
502 }
503 }
504 }
505 }
506
507 bool swissknife::CommandApplyDirtab::CreateCatalogMarkers(
508 const std::vector<std::string> &new_nested_catalogs) {
509 // go through the new nested catalog paths and create .cvmfscatalog markers
510 // where necessary
511 bool success = true;
512 std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
513 const std::vector<std::string>::const_iterator kend = new_nested_catalogs
514 .end();
515 for (; k != kend; ++k) {
516 assert(!k->empty() && k->size() > union_dir_.size());
517
518 // was the marker already created by hand?
519 const std::string marker_path = *k + "/.cvmfscatalog";
520 if (FileExists(marker_path)) {
521 continue;
522 }
523
524 // create a nested catalog marker
525 const mode_t mode = kDefaultFileMode;
526 const int fd = open(marker_path.c_str(), O_CREAT, mode);
527 if (fd < 0) {
528 LogCvmfs(kLogCvmfs, kLogStderr,
529 "Swissknife Sync: Failed to create nested catalog marker "
530 "at '%s' (errno: %d)",
531 marker_path.c_str(), errno);
532 success = false;
533 continue;
534 }
535 close(fd);
536
537 // inform the user if requested
538 if (verbose_) {
539 LogCvmfs(kLogCvmfs, kLogStdout,
540 "Swissknife Sync: Auto-creating nested catalog in %s",
541 k->c_str());
542 }
543 }
544
545 return success;
546 }
547
548 struct chunk_arg {
549 chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) { }
550 char param;
551 size_t *save_to;
552 };
553
554 bool swissknife::CommandSync::ReadFileChunkingArgs(
555 const swissknife::ArgumentList &args, SyncParameters *params) {
556 typedef std::vector<chunk_arg> ChunkArgs;
557
558 // define where to store the value of which file chunk argument
559 ChunkArgs chunk_args;
560 chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
561 chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
562 chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
563
564 // read the arguments
565 ChunkArgs::const_iterator i = chunk_args.begin();
566 const ChunkArgs::const_iterator iend = chunk_args.end();
567 for (; i != iend; ++i) {
568 const swissknife::ArgumentList::const_iterator arg = args.find(i->param);
569
570 if (arg != args.end()) {
571 const size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
572 if (arg_value > 0) {
573 *i->save_to = arg_value;
574 } else {
575 return false;
576 }
577 }
578 }
579
580 // check if argument values are sane
581 return true;
582 }
583
584 int swissknife::CommandSync::Main(const swissknife::ArgumentList &args) {
585 const string start_time = GetGMTimestamp();
586
587 // Spawn monitoring process (watchdog)
588 const std::string watchdog_dir = "/tmp";
589 char watchdog_path[PATH_MAX];
590 const std::string timestamp = GetGMTimestamp("%Y.%m.%d-%H.%M.%S");
591 const int path_size =
592 snprintf(watchdog_path, sizeof(watchdog_path),
593 "%s/cvmfs-swissknife-sync-stacktrace.%s.%d",
594 watchdog_dir.c_str(), timestamp.c_str(), getpid());
595 assert(path_size > 0);
596 assert(path_size < PATH_MAX);
597 const UniquePtr<Watchdog> watchdog(Watchdog::Create(NULL));
598 watchdog->Spawn(std::string(watchdog_path));
599
600 SyncParameters params;
601
602 // Initialization
603 params.dir_union = MakeCanonicalPath(*args.find('u')->second);
604 params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
605 params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
606 params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
607 params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
608 shash::kSuffixCatalog);
609 params.stratum0 = *args.find('w')->second;
610 params.manifest_path = *args.find('o')->second;
611 params.spooler_definition = *args.find('r')->second;
612
613 params.public_keys = *args.find('K')->second;
614 params.repo_name = *args.find('N')->second;
615
616 params.ttl_seconds = catalog::Catalog::kDefaultTTL;
617
618 if (args.find('f') != args.end())
619 params.union_fs_type = *args.find('f')->second;
620 if (args.find('A') != args.end())
621 params.is_balanced = true;
622 if (args.find('x') != args.end())
623 params.print_changeset = true;
624 if (args.find('y') != args.end())
625 params.dry_run = true;
626 if (args.find('m') != args.end())
627 params.mucatalogs = true;
628 if (args.find('i') != args.end())
629 params.ignore_xdir_hardlinks = true;
630 if (args.find('d') != args.end())
631 params.stop_for_catalog_tweaks = true;
632 if (args.find('V') != args.end())
633 params.voms_authz = true;
634 if (args.find('F') != args.end())
635 params.authz_file = *args.find('F')->second;
636 if (args.find('k') != args.end())
637 params.include_xattrs = true;
638 if (args.find('j') != args.end())
639 params.enable_mtime_ns = true;
640 if (args.find('Y') != args.end())
641 params.external_data = true;
642 if (args.find('W') != args.end())
643 params.direct_io = true;
644 if (args.find('S') != args.end()) {
645 const bool retval = catalog::VirtualCatalog::ParseActions(
646 *args.find('S')->second, &params.virtual_dir_actions);
647 if (!retval) {
648 LogCvmfs(kLogCvmfs, kLogStderr,
649 "Swissknife Sync: Invalid virtual catalog options: %s",
650 args.find('S')->second->c_str());
651 return 1;
652 }
653 }
654 if (args.find('z') != args.end()) {
655 const unsigned log_level =
656 1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
657 if (log_level > kLogNone) {
658 LogCvmfs(kLogCvmfs, kLogStderr, "Swissknife Sync: invalid log level");
659 return 1;
660 }
661 SetLogVerbosity(static_cast<LogLevels>(log_level));
662 }
663
664 if (args.find('X') != args.end())
665 params.max_weight = String2Uint64(*args.find('X')->second);
666 if (args.find('M') != args.end())
667 params.min_weight = String2Uint64(*args.find('M')->second);
668
669 if (args.find('p') != args.end()) {
670 params.use_file_chunking = true;
671 if (!ReadFileChunkingArgs(args, &params)) {
672 PrintError("Swissknife Sync: Failed to read file chunk size values");
673 return 2;
674 }
675 }
676 if (args.find('O') != args.end()) {
677 params.generate_legacy_bulk_chunks = true;
678 }
679 shash::Algorithms hash_algorithm = shash::kSha1;
680 if (args.find('e') != args.end()) {
681 hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
682 if (hash_algorithm == shash::kAny) {
683 PrintError("Swissknife Sync: Unknown hash algorithm");
684 return 1;
685 }
686 }
687 if (args.find('Z') != args.end()) {
688 params.compression_alg = zlib::ParseCompressionAlgorithm(
689 *args.find('Z')->second);
690 }
691
692 if (args.find('E') != args.end())
693 params.enforce_limits = true;
694 if (args.find('Q') != args.end()) {
695 params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
696 } else {
697 params.nested_kcatalog_limit = SyncParameters::kDefaultNestedKcatalogLimit;
698 }
699 if (args.find('R') != args.end()) {
700 params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
701 } else {
702 params.root_kcatalog_limit = SyncParameters::kDefaultRootKcatalogLimit;
703 }
704 if (args.find('U') != args.end()) {
705 params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
706 } else {
707 params.file_mbyte_limit = SyncParameters::kDefaultFileMbyteLimit;
708 }
709
710 if (args.find('v') != args.end()) {
711 const sanitizer::IntegerSanitizer sanitizer;
712 if (!sanitizer.IsValid(*args.find('v')->second)) {
713 PrintError("Swissknife Sync: Invalid revision number");
714 return 1;
715 }
716 params.manual_revision = String2Uint64(*args.find('v')->second);
717 }
718
719 params.branched_catalog = args.find('B') != args.end();
720
721 if (args.find('q') != args.end()) {
722 params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
723 }
724
725 if (args.find('0') != args.end()) {
726 params.num_upload_tasks = String2Uint64(*args.find('0')->second);
727 }
728
729 if (args.find('T') != args.end()) {
730 params.ttl_seconds = String2Uint64(*args.find('T')->second);
731 }
732
733 if (args.find('g') != args.end()) {
734 params.ignore_special_files = true;
735 }
736
737 if (args.find('P') != args.end()) {
738 params.session_token_file = *args.find('P')->second;
739 }
740
741 if (args.find('H') != args.end()) {
742 params.key_file = *args.find('H')->second;
743 }
744
745 if (args.find('D') != args.end()) {
746 params.repo_tag.SetName(*args.find('D')->second);
747 }
748
749 if (args.find('J') != args.end()) {
750 params.repo_tag.SetDescription(*args.find('J')->second);
751 }
752
753 if (args.find('G') != args.end()) {
754 params.cache_dir = "/var/spool/cvmfs/" + params.repo_name + "/cache.server";
755 }
756
757 const bool upload_statsdb = (args.count('I') > 0);
758
759 if (!CheckParams(params))
760 return 2;
761 // This may fail, in which case a warning is printed and the process continues
762 ObtainDacReadSearchCapability();
763
764 perf::StatisticsTemplate publish_statistics("publish", this->statistics());
765
766 // Start spooler
767 upload::SpoolerDefinition spooler_definition(
768 params.spooler_definition, hash_algorithm, params.compression_alg,
769 params.generate_legacy_bulk_chunks, params.use_file_chunking,
770 params.min_file_chunk_size, params.avg_file_chunk_size,
771 params.max_file_chunk_size, params.session_token_file, params.key_file);
772 if (params.max_concurrent_write_jobs > 0) {
773 spooler_definition
774 .number_of_concurrent_uploads = params.max_concurrent_write_jobs;
775 }
776 spooler_definition.num_upload_tasks = params.num_upload_tasks;
777
778 const upload::SpoolerDefinition spooler_definition_catalogs(
779 spooler_definition.Dup2DefaultCompression());
780
781 params.spooler = upload::Spooler::Construct(spooler_definition,
782 &publish_statistics);
783 if (NULL == params.spooler)
784 return 3;
785 const UniquePtr<upload::Spooler> spooler_catalogs(upload::Spooler::Construct(
786 spooler_definition_catalogs, &publish_statistics));
787 if (!spooler_catalogs.IsValid())
788 return 3;
789
790 const bool follow_redirects = (args.count('L') > 0);
791 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
792 if (!InitDownloadManager(follow_redirects, proxy)) {
793 return 3;
794 }
795
796 if (!InitSignatureManager(params.public_keys)) {
797 return 3;
798 }
799
800 /*
801 * Note: If the upstream is of type gateway, due to the possibility of
802 * concurrent release managers, it's possible to have a different local and
803 * remote root hashes. We proceed by loading the remote manifest but we give
804 * an empty base hash.
805 */
806 UniquePtr<manifest::Manifest> manifest;
807 if (params.branched_catalog) {
808 // Throw-away manifest
809 manifest = new manifest::Manifest(shash::Any(), 0, "");
810 } else if (params.virtual_dir_actions
811 != catalog::VirtualCatalog::kActionNone) {
812 manifest = this->OpenLocalManifest(params.manifest_path);
813 params.base_hash = manifest->catalog_hash();
814 } else {
815 // TODO(jblomer): revert to params.base_hash if spooler driver type is not
816 // upload::SpoolerDefinition::Gateway
817 manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
818 shash::Any());
819 }
820 if (!manifest.IsValid()) {
821 return 3;
822 }
823
824 StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(
825 params.repo_name);
826
827 const std::string old_root_hash = manifest->catalog_hash().ToString(true);
828
829 catalog::WritableCatalogManager catalog_manager(
830 params.base_hash, params.stratum0, params.dir_temp,
831 spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
832 params.nested_kcatalog_limit, params.root_kcatalog_limit,
833 params.file_mbyte_limit, statistics(), params.is_balanced,
834 params.max_weight, params.min_weight, params.cache_dir);
835 catalog_manager.Init();
836
837 publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
838 LogCvmfs(kLogPublish, kLogStdout, "Swissknife Sync: Processing changes...");
839
840 // Should be before the synchronization starts to avoid race of GetTTL with
841 // other sqlite operations
842 if ((params.ttl_seconds > 0)
843 && ((params.ttl_seconds != catalog_manager.GetTTL())
844 || !catalog_manager.HasExplicitTTL())) {
845 LogCvmfs(kLogCvmfs, kLogStdout,
846 "Swissknife Sync: Setting repository TTL to %" PRIu64 "s",
847 params.ttl_seconds);
848 catalog_manager.SetTTL(params.ttl_seconds);
849 }
850
851 // Either real catalogs or virtual catalog
852 if (params.virtual_dir_actions == catalog::VirtualCatalog::kActionNone) {
853 publish::SyncUnion *sync;
854 if (params.union_fs_type == "overlayfs") {
855 sync = new publish::SyncUnionOverlayfs(
856 &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
857 } else if (params.union_fs_type == "aufs") {
858 sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
859 params.dir_union, params.dir_scratch);
860 } else {
861 LogCvmfs(kLogCvmfs, kLogStderr,
862 "Swissknife Sync: unknown union file system: %s",
863 params.union_fs_type.c_str());
864 return 3;
865 }
866
867 if (!sync->Initialize()) {
868 LogCvmfs(kLogCvmfs, kLogStderr,
869 "Swissknife Sync: Initialization of the synchronisation "
870 "engine failed");
871 return 4;
872 }
873
874 sync->Traverse();
875 } else {
876 assert(!manifest->history().IsNull());
877 catalog::VirtualCatalog virtual_catalog(
878 manifest.weak_ref(), download_manager(), &catalog_manager, &params);
879 virtual_catalog.Generate(params.virtual_dir_actions);
880 }
881
882 if (!params.authz_file.empty()) {
883 LogCvmfs(kLogCvmfs, kLogDebug,
884 "Swissknife Sync: Adding contents of authz file %s to"
885 " root catalog.",
886 params.authz_file.c_str());
887 const int fd = open(params.authz_file.c_str(), O_RDONLY);
888 if (fd == -1) {
889 LogCvmfs(kLogCvmfs, kLogStderr,
890 "Swissknife Sync: Unable to open authz file (%s)"
891 "from the publication process: %s",
892 params.authz_file.c_str(), strerror(errno));
893 return 7;
894 }
895
896 std::string new_authz;
897 const bool read_successful = SafeReadToString(fd, &new_authz);
898 close(fd);
899
900 if (!read_successful) {
901 LogCvmfs(kLogCvmfs, kLogStderr,
902 "Swissknife Sync: Failed to read authz file (%s): %s",
903 params.authz_file.c_str(), strerror(errno));
904 return 8;
905 }
906
907 catalog_manager.SetVOMSAuthz(new_authz);
908 }
909
910 if (!mediator.Commit(manifest.weak_ref())) {
911 PrintError("Swissknife Sync: Something went wrong during sync");
912 if (!params.dry_run) {
913 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
914 if (upload_statsdb) {
915 stats_db->UploadStatistics(params.spooler);
916 }
917 }
918 return 5;
919 }
920
921 perf::Counter *revision_counter = statistics()->Register(
922 "publish.revision", "Published revision number");
923 revision_counter->Set(
924 static_cast<int64_t>(catalog_manager.GetRootCatalog()->revision()));
925
926 // finalize the spooler
927 LogCvmfs(kLogCvmfs, kLogStdout,
928 "Swissknife Sync: Wait for all uploads to finish");
929 params.spooler->WaitForUpload();
930 spooler_catalogs->WaitForUpload();
931 params.spooler->FinalizeSession(false);
932
933 LogCvmfs(kLogCvmfs, kLogStdout,
934 "Swissknife Sync: Exporting repository manifest");
935
936 // We call FinalizeSession(true) this time, to also trigger the commit
937 // operation on the gateway machine (if the upstream is of type "gw").
938
939 // Get the path of the new root catalog
940 const std::string new_root_hash = manifest->catalog_hash().ToString(true);
941
942 if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
943 params.repo_tag)) {
944 PrintError("Swissknife Sync: Failed to commit transaction.");
945 if (!params.dry_run) {
946 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
947 if (upload_statsdb) {
948 stats_db->UploadStatistics(params.spooler);
949 }
950 }
951 return 9;
952 }
953
954 if (!params.dry_run) {
955 stats_db->StorePublishStatistics(this->statistics(), start_time, true);
956 if (upload_statsdb) {
957 stats_db->UploadStatistics(params.spooler);
958 }
959 }
960
961 delete params.spooler;
962
963 if (!manifest->Export(params.manifest_path)) {
964 PrintError("Swissknife Sync: Failed to create new repository");
965 return 6;
966 }
967
968 return 0;
969 }
970