GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/sync_mediator.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 1 626 0.2%
Branches: 0 950 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "sync_mediator.h"
6
7 #include <fcntl.h>
8 #include <inttypes.h>
9 #include <unistd.h>
10
11 #include <cassert>
12 #include <cstdio>
13 #include <cstdlib>
14
15 #include "catalog_virtual.h"
16 #include "compression/compression.h"
17 #include "crypto/hash.h"
18 #include "directory_entry.h"
19 #include "json_document.h"
20 #include "publish/repository.h"
21 #include "sync_union.h"
22 #include "upload.h"
23 #include "util/exception.h"
24 #include "util/fs_traversal.h"
25 #include "util/posix.h"
26 #include "util/string.h"
27
28 using namespace std; // NOLINT
29
30 namespace publish {
31
32 420 AbstractSyncMediator::~AbstractSyncMediator() { }
33
34 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
35 const SyncParameters *params,
36 perf::StatisticsTemplate statistics)
37 : catalog_manager_(catalog_manager)
38 , union_engine_(NULL)
39 , handle_hardlinks_(false)
40 , recursive_fast_delete_(false)
41 , params_(params)
42 , reporter_(new SyncDiffReporter(params_->print_changeset
43 ? SyncDiffReporter::kPrintChanges
44 : SyncDiffReporter::kPrintDots)) {
45 const int retval = pthread_mutex_init(&lock_file_queue_, NULL);
46 assert(retval == 0);
47
48 params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
49
50 counters_ = new perf::FsCounters(statistics);
51 }
52
53 SyncMediator::~SyncMediator() { pthread_mutex_destroy(&lock_file_queue_); }
54
55
56 void SyncMediator::RegisterUnionEngine(SyncUnion *engine) {
57 union_engine_ = engine;
58 handle_hardlinks_ = engine->SupportsHardlinks();
59 }
60
61
62 /**
63 * The entry /.cvmfs or entries in /.cvmfs/ must not be added, removed or
64 * modified manually. The directory /.cvmfs is generated by the VirtualCatalog
65 * class if requested.
66 */
67 void SyncMediator::EnsureAllowed(SharedPtr<SyncItem> entry) {
68 const bool ignore_case_setting = false;
69 const string relative_path = entry->GetRelativePath();
70 if ((relative_path == string(catalog::VirtualCatalog::kVirtualPath))
71 || (HasPrefix(relative_path,
72 string(catalog::VirtualCatalog::kVirtualPath) + "/",
73 ignore_case_setting))) {
74 PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
75 relative_path.c_str());
76 }
77 }
78
79
80 /**
81 * Add an entry to the repository.
82 * Added directories will be traversed in order to add the complete subtree.
83 */
84 void SyncMediator::Add(SharedPtr<SyncItem> entry) {
85 EnsureAllowed(entry);
86
87 if (entry->IsDirectory()) {
88 AddDirectoryRecursively(entry);
89 return;
90 }
91
92 if (entry->IsBundleSpec()) {
93 PrintWarning(".cvmfsbundle file encountered. "
94 "Bundles is currently an experimental feature.");
95
96 if (!entry->IsRegularFile() && !entry->IsSymlink()) {
97 PANIC(kLogStderr,
98 "Error: bundle specification must be a regular file or a symlink");
99 }
100 if (entry->HasHardlinks()) {
101 PANIC(kLogStderr, "Error: bundle specification must not be a hard link");
102 }
103
104 InsertBundleSpec(entry);
105 AddFile(entry);
106 return;
107 }
108
109 if (entry->IsRegularFile() || entry->IsSymlink()) {
110 // A file is a hard link if the link count is greater than 1
111 if (entry->HasHardlinks() && handle_hardlinks_)
112 InsertHardlink(entry);
113 else
114 AddFile(entry);
115 return;
116 } else if (entry->IsGraftMarker()) {
117 LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
118 return; // Ignore markers.
119 }
120
121 // In OverlayFS whiteouts can be represented as character devices with major
122 // and minor numbers equal to 0. Special files will be ignored except if they
123 // are whiteout files.
124 if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
125 if (params_->ignore_special_files) {
126 PrintWarning("'" + entry->GetRelativePath()
127 + "' "
128 "is a special file, ignoring.");
129 } else {
130 if (entry->HasHardlinks() && handle_hardlinks_)
131 InsertHardlink(entry);
132 else
133 AddFile(entry);
134 }
135 return;
136 }
137
138 PrintWarning("'" + entry->GetRelativePath()
139 + "' cannot be added. Unrecognized file type.");
140 }
141
142
143 /**
144 * Touch an entry in the repository.
145 */
146 void SyncMediator::Touch(SharedPtr<SyncItem> entry) {
147 EnsureAllowed(entry);
148
149 if (entry->IsGraftMarker()) {
150 return;
151 }
152 if (entry->IsDirectory()) {
153 TouchDirectory(entry);
154 perf::Inc(counters_->n_directories_changed);
155 return;
156 }
157
158 if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
159 Replace(entry); // This way, hardlink processing is correct
160 // Replace calls Remove; cancel Remove's actions:
161 perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
162
163 // Count only the difference between the old and new file
164 // Symlinks do not count into added or removed bytes
165 int64_t dif = 0;
166
167 // Need to handle 4 cases (symlink->symlink, symlink->regular,
168 // regular->symlink, regular->regular)
169 if (entry->WasSymlink()) {
170 // Replace calls Remove; cancel Remove's actions:
171 perf::Dec(counters_->n_symlinks_removed);
172
173 if (entry->IsSymlink()) {
174 perf::Inc(counters_->n_symlinks_changed);
175 } else {
176 perf::Inc(counters_->n_symlinks_removed);
177 perf::Inc(counters_->n_files_added);
178 dif += entry->GetScratchSize();
179 }
180 } else {
181 // Replace calls Remove; cancel Remove's actions:
182 perf::Dec(counters_->n_files_removed);
183 dif -= entry->GetRdOnlySize();
184 if (entry->IsSymlink()) {
185 perf::Inc(counters_->n_files_removed);
186 perf::Inc(counters_->n_symlinks_added);
187 } else {
188 perf::Inc(counters_->n_files_changed);
189 dif += entry->GetScratchSize();
190 }
191 }
192
193 if (dif > 0) { // added bytes
194 perf::Xadd(counters_->sz_added_bytes, dif);
195 } else { // removed bytes
196 perf::Xadd(counters_->sz_removed_bytes, -dif);
197 }
198 return;
199 }
200
201 PrintWarning("'" + entry->GetRelativePath()
202 + "' cannot be touched. Unrecognized file type.");
203 }
204
205
206 /**
207 * Remove an entry from the repository. Directories will be recursively removed.
208 */
209 void SyncMediator::Remove(SharedPtr<SyncItem> entry, bool fast_delete) {
210 EnsureAllowed(entry);
211
212 if (entry->WasDirectory()) {
213 RemoveDirectoryRecursively(entry, fast_delete);
214 return;
215 }
216
217 if (entry->WasBundleSpec()) {
218 if (!params_->dry_run)
219 catalog_manager_->UpdateBundleTrigger(GetBundleTriggerPath(entry), false);
220 RemoveFile(entry);
221 return;
222 }
223
224 if (entry->WasRegularFile() || entry->WasSymlink()
225 || entry->WasSpecialFile()) {
226 RemoveFile(entry);
227 return;
228 }
229
230 PrintWarning("'" + entry->GetRelativePath()
231 + "' cannot be deleted. Unrecognized file type.");
232 }
233
234
235 /**
236 * Remove the old entry and add the new one.
237 */
238 void SyncMediator::Replace(SharedPtr<SyncItem> entry) {
239 // EnsureAllowed(entry); <-- Done by Remove() and Add()
240 Remove(entry);
241 Add(entry);
242 }
243
244 bool SyncMediator::Clone(const std::string from, const std::string to,
245 bool fail_if_source_missing) {
246 return catalog_manager_->Clone(from, to, fail_if_source_missing);
247 }
248
249 void SyncMediator::EnterDirectory(SharedPtr<SyncItem> entry) {
250 if (!handle_hardlinks_) {
251 return;
252 }
253
254 const HardlinkGroupMap new_map;
255 hardlink_stack_.push(new_map);
256 }
257
258
259 void SyncMediator::LeaveDirectory(SharedPtr<SyncItem> entry) {
260 if (!handle_hardlinks_) {
261 return;
262 }
263
264 CompleteHardlinks(entry);
265 AddLocalHardlinkGroups(GetHardlinkMap());
266 hardlink_stack_.pop();
267 }
268
269
270 /**
271 * Do any pending processing and commit all changes to the catalogs.
272 * To be called after change set traversal is finished.
273 */
274 bool SyncMediator::Commit(manifest::Manifest *manifest) {
275 reporter_->CommitReport();
276
277 if (!params_->dry_run) {
278 LogCvmfs(kLogPublish, kLogStdout,
279 "Waiting for upload of files before committing...");
280 params_->spooler->WaitForUpload();
281 }
282
283 if (!hardlink_queue_.empty()) {
284 assert(handle_hardlinks_);
285
286 LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
287 params_->spooler->UnregisterListeners();
288 params_->spooler->RegisterListener(&SyncMediator::PublishHardlinksCallback,
289 this);
290
291 // TODO(rmeusel): Revise that for Thread Safety!
292 // This loop will spool hardlinks into the spooler, which will then
293 // process them.
294 // On completion of every hardlink the spooler will asynchronously
295 // emit callbacks (SyncMediator::PublishHardlinksCallback) which
296 // might happen while this for-loop goes through the hardlink_queue_
297 //
298 // For the moment this seems not to be a problem, but it's an accident
299 // just waiting to happen.
300 //
301 // Note: Just wrapping this loop in a mutex might produce a dead lock
302 // since the spooler does not fill it's processing queue to an
303 // unlimited size. Meaning that it might be flooded with hard-
304 // links and waiting for the queue to be processed while proces-
305 // sing is stalled because the callback is waiting for this
306 // mutex.
307 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
308 iEnd = hardlink_queue_.end();
309 i != iEnd;
310 ++i) {
311 LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
312 i->master->GetUnionPath().c_str());
313 IngestionSource *source = new FileIngestionSource(
314 i->master->GetUnionPath());
315 params_->spooler->Process(source);
316 }
317
318 params_->spooler->WaitForUpload();
319
320 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
321 iEnd = hardlink_queue_.end();
322 i != iEnd;
323 ++i) {
324 LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
325 i->master->GetUnionPath().c_str());
326 AddHardlinkGroup(*i);
327 }
328 }
329
330 if (!bundle_specs_.empty()) {
331 LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
332 AddBundleSpecs();
333 }
334
335 if (union_engine_)
336 union_engine_->PostUpload();
337
338 // PostUpload may have spooled additional files (the tarball engine
339 // materializes empty files for hardlinks whose target is missing from the
340 // archive). Wait for those uploads so their catalog entries are added by
341 // the file callback before the listeners are unregistered below.
342 if (!params_->dry_run)
343 params_->spooler->WaitForUpload();
344
345 params_->spooler->UnregisterListeners();
346
347 if (params_->dry_run) {
348 manifest = NULL;
349 return true;
350 }
351
352 LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
353 if (params_->spooler->GetNumberOfErrors() > 0) {
354 LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
355 return false;
356 }
357
358 if (catalog_manager_->IsBalanceable()
359 || (params_->virtual_dir_actions
360 != catalog::VirtualCatalog::kActionNone)) {
361 if (catalog_manager_->IsBalanceable())
362 catalog_manager_->Balance();
363 // Commit empty string to ensure that the "content" of the auto catalog
364 // markers is present in the repository.
365 const string empty_file = CreateTempPath(params_->dir_temp + "/empty",
366 0600);
367 IngestionSource *source = new FileIngestionSource(empty_file);
368 params_->spooler->Process(source);
369 params_->spooler->WaitForUpload();
370 unlink(empty_file.c_str());
371 if (params_->spooler->GetNumberOfErrors() > 0) {
372 LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
373 return false;
374 }
375 }
376 catalog_manager_->PrecalculateListings();
377 return catalog_manager_->Commit(params_->stop_for_catalog_tweaks,
378 params_->manual_revision, manifest);
379 }
380
381 std::string SyncMediator::GetBundleTriggerPath(
382 SharedPtr<SyncItem> bundle_spec_entry) const
383 {
384 static const size_t nStrip = strlen(".cvmfsbundle-");
385 const std::string main_file_name = bundle_spec_entry->filename().substr(nStrip);
386 if (main_file_name.empty()) {
387 PANIC(kLogStderr, "invalid empty bundle specification: %s",
388 bundle_spec_entry->GetUnionPath().c_str());
389 }
390 // relative_parent_path() is empty for the repo root and otherwise lacks a
391 // leading slash (e.g. "root/lib/ROOT/__pycache__"). The catalog lookup in
392 // WritableCatalogManager::FindCatalog needs an absolute path, so prepend
393 // "/" — but skip it when relative_parent_path() is empty, in which case
394 // the existing literal "/" already produces a valid "/<basename>".
395 const std::string &parent = bundle_spec_entry->relative_parent_path();
396 return (parent.empty() ? "" : "/" + parent) + "/" + main_file_name;
397 }
398
399 void SyncMediator::InsertBundleSpec(SharedPtr<SyncItem> entry) {
400 assert(entry->IsBundleSpec());
401
402 // When we leave the directory, we'll check all the bundle specs and set
403 // the corresponding flags on the main file.
404 bundle_specs_.push_back(GetBundleTriggerPath(entry));
405 }
406
407 void SyncMediator::AddBundleSpecs() {
408 if (params_->dry_run)
409 return;
410
411 for (BundleSpecs::const_iterator itr = bundle_specs_.begin();
412 itr != bundle_specs_.end(); ++itr)
413 {
414 catalog_manager_->UpdateBundleTrigger(*itr, true);
415 }
416 }
417
418 void SyncMediator::InsertHardlink(SharedPtr<SyncItem> entry) {
419 assert(handle_hardlinks_);
420
421 const uint64_t inode = entry->GetUnionInode();
422 LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
423 inode, entry->GetUnionPath().c_str());
424
425 // Find the hard link group in the lists
426 const HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(
427 inode);
428
429 if (hardlink_group == GetHardlinkMap().end()) {
430 // Create a new hardlink group
431 GetHardlinkMap().insert(
432 HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
433 } else {
434 // Append the file to the appropriate hardlink group
435 hardlink_group->second.AddHardlink(entry);
436 }
437
438 // publish statistics counting for new file
439 if (entry->IsNew()) {
440 perf::Inc(counters_->n_files_added);
441 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
442 }
443 }
444
445
446 void SyncMediator::InsertLegacyHardlink(SharedPtr<SyncItem> entry) {
447 // Check if found file has hardlinks (nlink > 1)
448 // As we are looking through all files in one directory here, there might be
449 // completely untouched hardlink groups, which we can safely skip.
450 // Finally we have to see if the hardlink is already part of this group
451
452 assert(handle_hardlinks_);
453
454 if (entry->GetUnionLinkcount() < 2)
455 return;
456
457 const uint64_t inode = entry->GetUnionInode();
458 HardlinkGroupMap::iterator hl_group;
459 hl_group = GetHardlinkMap().find(inode);
460
461 if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
462 bool found = false;
463
464 // search for the entry in this group
465 for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
466 iEnd = hl_group->second.hardlinks.end();
467 i != iEnd;
468 ++i) {
469 if (*(i->second) == *entry) {
470 found = true;
471 break;
472 }
473 }
474
475 if (!found) {
476 // Hardlink already in the group?
477 // If one element of a hardlink group is edited, all elements must be
478 // replaced. Here, we remove an untouched hardlink and add it to its
479 // hardlink group for re-adding later
480 LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
481 entry->GetUnionPath().c_str());
482 Remove(entry);
483 hl_group->second.AddHardlink(entry);
484 }
485 }
486 }
487
488
489 /**
490 * Create a recursion engine which DOES NOT recurse into directories.
491 * It basically goes through the current directory (in the union volume) and
492 * searches for legacy hardlinks which has to be connected to the new
493 * or edited ones.
494 */
495 void SyncMediator::CompleteHardlinks(SharedPtr<SyncItem> entry) {
496 assert(handle_hardlinks_);
497
498 // If no hardlink in this directory was changed, we can skip this
499 if (GetHardlinkMap().empty())
500 return;
501
502 LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
503 entry->GetUnionPath().c_str());
504
505 // Look for legacy hardlinks
506 FileSystemTraversal<SyncMediator> traversal(this, union_engine_->union_path(),
507 false);
508 traversal.fn_new_file = &SyncMediator::LegacyRegularHardlinkCallback;
509 traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
510 traversal.fn_new_character_dev = &SyncMediator::
511 LegacyCharacterDeviceHardlinkCallback;
512 traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
513 traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
514 traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
515 traversal.Recurse(entry->GetUnionPath());
516 }
517
518
519 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
520 const string &file_name) {
521 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
522 kItemFile);
523 InsertLegacyHardlink(entry);
524 }
525
526
527 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
528 const string &file_name) {
529 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
530 kItemSymlink);
531 InsertLegacyHardlink(entry);
532 }
533
534 void SyncMediator::LegacyCharacterDeviceHardlinkCallback(
535 const string &parent_dir, const string &file_name) {
536 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
537 kItemCharacterDevice);
538 InsertLegacyHardlink(entry);
539 }
540
541 void SyncMediator::LegacyBlockDeviceHardlinkCallback(const string &parent_dir,
542 const string &file_name) {
543 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
544 kItemBlockDevice);
545 InsertLegacyHardlink(entry);
546 }
547
548 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
549 const string &file_name) {
550 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
551 kItemFifo);
552 InsertLegacyHardlink(entry);
553 }
554
555 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
556 const string &file_name) {
557 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
558 kItemSocket);
559 InsertLegacyHardlink(entry);
560 }
561
562
563 void SyncMediator::AddDirectoryRecursively(SharedPtr<SyncItem> entry) {
564 AddDirectory(entry);
565
566 // Create a recursion engine, which recursively adds all entries in a newly
567 // created directory
568 FileSystemTraversal<SyncMediator> traversal(
569 this, union_engine_->scratch_path(), true);
570 traversal.fn_enter_dir = &SyncMediator::EnterAddedDirectoryCallback;
571 traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
572 traversal.fn_new_file = &SyncMediator::AddFileCallback;
573 traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
574 traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
575 traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
576 traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
577 traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
578 traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
579 traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
580 traversal.Recurse(entry->GetScratchPath());
581 }
582
583
584 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
585 const std::string &dir_name) {
586 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
587 kItemDir);
588 AddDirectory(entry);
589 return true; // The recursion engine should recurse deeper here
590 }
591
592
593 void SyncMediator::AddFileCallback(const std::string &parent_dir,
594 const std::string &file_name) {
595 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
596 kItemFile);
597 Add(entry);
598 }
599
600
601 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
602 const std::string &file_name) {
603 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
604 kItemCharacterDevice);
605 Add(entry);
606 }
607
608 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
609 const std::string &file_name) {
610 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
611 kItemBlockDevice);
612 Add(entry);
613 }
614
615 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
616 const std::string &file_name) {
617 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
618 kItemFifo);
619 Add(entry);
620 }
621
622 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
623 const std::string &file_name) {
624 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
625 kItemSocket);
626 Add(entry);
627 }
628
629 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
630 const std::string &link_name) {
631 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
632 kItemSymlink);
633 Add(entry);
634 }
635
636
637 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
638 const std::string &dir_name) {
639 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
640 kItemDir);
641 EnterDirectory(entry);
642 }
643
644
645 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
646 const std::string &dir_name) {
647 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
648 kItemDir);
649 LeaveDirectory(entry);
650 }
651
652
653 void SyncMediator::RemoveDirectoryRecursively(SharedPtr<SyncItem> entry,
654 bool fast_delete) {
655 const std::string directory_path = entry->GetRelativePath();
656
657 // Fast delete: skip filesystem traversal for nested catalog directories.
658 // Instead of recursively walking the filesystem and removing each entry,
659 // we just remove the nested catalog reference from the parent catalog
660 // (with merge=false so entries are not copied to the parent) and then
661 // remove the mountpoint directory entry.
662 if (fast_delete && catalog_manager_->IsTransitionPoint(directory_path)) {
663 // Get the nested catalog's counters before removal so we can update
664 // publish statistics with the total number of removed entries
665 std::string subcatalog_path;
666 shash::Any hash;
667 PathString ps_path;
668 ps_path.Assign(directory_path.data(), directory_path.length());
669 const catalog::Counters counters =
670 catalog_manager_->LookupCounters(ps_path, &subcatalog_path, &hash);
671 {
672 perf::Xadd(counters_->n_files_removed,
673 static_cast<int64_t>(counters.self.regular_files
674 + counters.subtree.regular_files));
675 perf::Xadd(counters_->n_directories_removed,
676 static_cast<int64_t>(counters.self.directories
677 + counters.subtree.directories));
678 perf::Xadd(counters_->n_symlinks_removed,
679 static_cast<int64_t>(counters.self.symlinks
680 + counters.subtree.symlinks));
681 perf::Xadd(counters_->sz_removed_bytes,
682 static_cast<int64_t>(counters.self.file_size
683 + counters.subtree.file_size));
684 }
685
686 // Remove nested catalog (merge=false: just remove the reference and
687 // adjust parent subtree counters, don't copy entries into parent)
688 const std::string notice = "Nested catalog at " + entry->GetUnionPath();
689 reporter_->OnRemove(notice, catalog::DirectoryEntry());
690 if (!params_->dry_run) {
691 catalog_manager_->RemoveNestedCatalog(directory_path, false);
692 }
693
694 // Remove the mountpoint directory entry from the parent catalog
695 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
696 if (!params_->dry_run) {
697 catalog_manager_->RemoveDirectory(directory_path);
698 }
699 perf::Inc(counters_->n_directories_removed);
700
701 return;
702 }
703
704 // Normal path: delete a directory AFTER it was emptied here,
705 // because it would start up another recursion.
706 // Propagate fast_delete so that any nested catalog transition points
707 // encountered during the traversal are still fast-deleted.
708 const bool prev_fast_delete = recursive_fast_delete_;
709 if (fast_delete) recursive_fast_delete_ = true;
710
711 const bool recurse = false;
712 FileSystemTraversal<SyncMediator> traversal(
713 this, union_engine_->rdonly_path(), recurse);
714 traversal.fn_new_file = &SyncMediator::RemoveFileCallback;
715 traversal.fn_new_dir_postfix = &SyncMediator::RemoveDirectoryCallback;
716 traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
717 traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
718 traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
719 traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
720 traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
721 traversal.Recurse(entry->GetRdOnlyPath());
722
723 recursive_fast_delete_ = prev_fast_delete;
724
725 // The given directory was emptied recursively and can now itself be deleted
726 RemoveDirectory(entry);
727 }
728
729
730 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
731 const std::string &file_name) {
732 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
733 kItemFile);
734 Remove(entry);
735 }
736
737
738 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
739 const std::string &link_name) {
740 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
741 kItemSymlink);
742 Remove(entry);
743 }
744
745 void SyncMediator::RemoveCharacterDeviceCallback(const std::string &parent_dir,
746 const std::string &link_name) {
747 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
748 kItemCharacterDevice);
749 Remove(entry);
750 }
751
752 void SyncMediator::RemoveBlockDeviceCallback(const std::string &parent_dir,
753 const std::string &link_name) {
754 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
755 kItemBlockDevice);
756 Remove(entry);
757 }
758
759 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
760 const std::string &link_name) {
761 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
762 kItemFifo);
763 Remove(entry);
764 }
765
766 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
767 const std::string &link_name) {
768 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
769 kItemSocket);
770 Remove(entry);
771 }
772
773 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
774 const std::string &dir_name) {
775 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
776 kItemDir);
777 RemoveDirectoryRecursively(entry, recursive_fast_delete_);
778 }
779
780
781 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
782 const std::string &file_name) {
783 if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
784 return true;
785 }
786
787 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
788 kItemUnknown);
789 return entry->IsWhiteout();
790 }
791
792 SharedPtr<SyncItem> SyncMediator::CreateSyncItem(
793 const std::string &relative_parent_path, const std::string &filename,
794 const SyncItemType entry_type) const {
795 return union_engine_->CreateSyncItem(relative_parent_path, filename,
796 entry_type);
797 }
798
799 void SyncMediator::PublishFilesCallback(const upload::SpoolerResult &result) {
800 LogCvmfs(kLogPublish, kLogVerboseMsg,
801 "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
802 result.local_path.c_str(), result.content_hash.ToString().c_str(),
803 result.file_chunks.size(), result.return_code);
804 if (result.return_code != 0) {
805 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
806 result.return_code);
807 }
808
809 SyncItemList::iterator itr;
810 {
811 const MutexLockGuard guard(lock_file_queue_);
812 itr = file_queue_.find(result.local_path);
813 }
814
815 assert(itr != file_queue_.end());
816
817 SyncItem &item = *itr->second;
818 item.SetContentHash(result.content_hash);
819 item.SetCompressionAlgorithm(result.compression_alg);
820
821 XattrList *xattrs = &default_xattrs_;
822 if (params_->include_xattrs) {
823 xattrs = XattrList::CreateFromFile(result.local_path);
824 assert(xattrs != NULL);
825 }
826
827 if (result.IsChunked()) {
828 catalog_manager_->AddChunkedFile(
829 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
830 *xattrs,
831 item.relative_parent_path(),
832 result.file_chunks);
833 } else {
834 catalog_manager_->AddFile(
835 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
836 *xattrs,
837 item.relative_parent_path());
838 }
839
840 if (xattrs != &default_xattrs_)
841 free(xattrs);
842 }
843
844
845 void SyncMediator::PublishHardlinksCallback(
846 const upload::SpoolerResult &result) {
847 LogCvmfs(kLogPublish, kLogVerboseMsg,
848 "Spooler callback for hardlink %s, digest %s, retval %d",
849 result.local_path.c_str(), result.content_hash.ToString().c_str(),
850 result.return_code);
851 if (result.return_code != 0) {
852 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
853 result.return_code);
854 }
855
856 bool found = false;
857 for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
858 if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
859 found = true;
860 hardlink_queue_[i].master->SetContentHash(result.content_hash);
861 SyncItemList::iterator j, jend;
862 for (j = hardlink_queue_[i].hardlinks.begin(),
863 jend = hardlink_queue_[i].hardlinks.end();
864 j != jend;
865 ++j) {
866 j->second->SetContentHash(result.content_hash);
867 j->second->SetCompressionAlgorithm(result.compression_alg);
868 }
869 if (result.IsChunked())
870 hardlink_queue_[i].file_chunks = result.file_chunks;
871
872 break;
873 }
874 }
875
876 assert(found);
877 }
878
879
880 void SyncMediator::CreateNestedCatalog(SharedPtr<SyncItem> directory) {
881 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
882 reporter_->OnAdd(notice, catalog::DirectoryEntry());
883
884 if (!params_->dry_run) {
885 catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
886 }
887 }
888
889
890 void SyncMediator::RemoveNestedCatalog(SharedPtr<SyncItem> directory) {
891 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
892 reporter_->OnRemove(notice, catalog::DirectoryEntry());
893
894 if (!params_->dry_run) {
895 catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
896 }
897 }
898
899 void SyncDiffReporter::OnInit(const history::History::Tag & /*from_tag*/,
900 const history::History::Tag & /*to_tag*/) { }
901
902 void SyncDiffReporter::OnStats(const catalog::DeltaCounters & /*delta*/) { }
903
904 void SyncDiffReporter::OnAdd(const std::string &path,
905 const catalog::DirectoryEntry & /*entry*/) {
906 changed_items_++;
907 AddImpl(path);
908 }
909 void SyncDiffReporter::OnRemove(const std::string &path,
910 const catalog::DirectoryEntry & /*entry*/) {
911 changed_items_++;
912 RemoveImpl(path);
913 }
914 void SyncDiffReporter::OnModify(const std::string &path,
915 const catalog::DirectoryEntry & /*entry_from*/,
916 const catalog::DirectoryEntry & /*entry_to*/) {
917 changed_items_++;
918 ModifyImpl(path);
919 }
920
921 void SyncDiffReporter::CommitReport() {
922 if (print_action_ == kPrintDots) {
923 if (changed_items_ >= processing_dot_interval_) {
924 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, "\n");
925 }
926 }
927 }
928
929 void SyncDiffReporter::PrintDots() {
930 if (changed_items_ % processing_dot_interval_ == 0) {
931 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, ".");
932 }
933 }
934
935 void SyncDiffReporter::AddImpl(const std::string &path) {
936 const char *action_label;
937
938 switch (print_action_) {
939 case kPrintChanges:
940 if (path.at(0) != '/') {
941 action_label = "[x-catalog-add]";
942 } else {
943 action_label = "[add]";
944 }
945 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
946 break;
947
948 case kPrintDots:
949 PrintDots();
950 break;
951 default:
952 assert("Invalid print action.");
953 }
954 }
955
956 void SyncDiffReporter::RemoveImpl(const std::string &path) {
957 const char *action_label;
958
959 switch (print_action_) {
960 case kPrintChanges:
961 if (path.at(0) != '/') {
962 action_label = "[x-catalog-rem]";
963 } else {
964 action_label = "[rem]";
965 }
966
967 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
968 break;
969
970 case kPrintDots:
971 PrintDots();
972 break;
973 default:
974 assert("Invalid print action.");
975 }
976 }
977
978 void SyncDiffReporter::ModifyImpl(const std::string &path) {
979 const char *action_label;
980
981 switch (print_action_) {
982 case kPrintChanges:
983 action_label = "[mod]";
984 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
985 break;
986
987 case kPrintDots:
988 PrintDots();
989 break;
990 default:
991 assert("Invalid print action.");
992 }
993 }
994
995 void SyncMediator::AddFile(SharedPtr<SyncItem> entry) {
996 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
997
998 if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
999 assert(!entry->HasGraftMarker());
1000 // Symlinks and special files are completely stored in the catalog
1001 XattrList *xattrs = &default_xattrs_;
1002 if (params_->include_xattrs) {
1003 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1004 assert(xattrs);
1005 }
1006 catalog_manager_->AddFile(
1007 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
1008 *xattrs,
1009 entry->relative_parent_path());
1010 if (xattrs != &default_xattrs_)
1011 free(xattrs);
1012 } else if (entry->HasGraftMarker() && !params_->dry_run) {
1013 if (entry->IsValidGraft()) {
1014 // Graft files are added to catalog immediately.
1015 if (entry->IsChunkedGraft()) {
1016 catalog_manager_->AddChunkedFile(
1017 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
1018 default_xattrs_,
1019 entry->relative_parent_path(),
1020 *(entry->GetGraftChunks()));
1021 } else {
1022 catalog_manager_->AddFile(
1023 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
1024 default_xattrs_, // TODO(bbockelm): For now, use default xattrs
1025 // on grafted files.
1026 entry->relative_parent_path());
1027 }
1028 } else {
1029 // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
1030 // the graft file is missing information. It's not clear that continuing
1031 // forward with the publish is the correct thing to do; abort for now.
1032 PANIC(kLogStderr,
1033 "Encountered a grafted file (%s) with "
1034 "invalid grafting information; check contents of .cvmfsgraft-*"
1035 " file. Aborting publish.",
1036 entry->GetRelativePath().c_str());
1037 }
1038 } else if (entry->relative_parent_path().empty()
1039 && entry->IsCatalogMarker()) {
1040 PANIC(kLogStderr, "Error: nested catalog marker in root directory");
1041 } else if (!params_->dry_run) {
1042 {
1043 // Push the file to the spooler, remember the entry for the path
1044 const MutexLockGuard m(&lock_file_queue_);
1045 file_queue_[entry->GetUnionPath()] = entry;
1046 }
1047 // Spool the file
1048 params_->spooler->Process(entry->CreateIngestionSource());
1049 }
1050
1051 // publish statistics counting for new file
1052 if (entry->IsNew()) {
1053 if (entry->IsSymlink()) {
1054 perf::Inc(counters_->n_symlinks_added);
1055 } else {
1056 perf::Inc(counters_->n_files_added);
1057 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
1058 }
1059 }
1060 }
1061
1062 void SyncMediator::RemoveFile(SharedPtr<SyncItem> entry) {
1063 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1064
1065 if (!params_->dry_run) {
1066 if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
1067 LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
1068 entry->GetUnionPath().c_str());
1069 catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
1070 }
1071 catalog_manager_->RemoveFile(entry->GetRelativePath());
1072 }
1073
1074 // Counting nr of removed files and removed bytes
1075 if (entry->WasSymlink()) {
1076 perf::Inc(counters_->n_symlinks_removed);
1077 } else {
1078 perf::Inc(counters_->n_files_removed);
1079 }
1080 perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
1081 }
1082
1083 void SyncMediator::AddUnmaterializedDirectory(SharedPtr<SyncItem> entry) {
1084 AddDirectory(entry);
1085 }
1086
1087 void SyncMediator::AddDirectory(SharedPtr<SyncItem> entry) {
1088 if (entry->IsBundleSpec()) {
1089 PANIC(kLogStderr,
1090 "Illegal directory name: %s. "
1091 "The .cvmfsbundle- prefix is reserved for bundles specifications",
1092 entry->GetUnionPath().c_str());
1093 }
1094
1095 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1096
1097 perf::Inc(counters_->n_directories_added);
1098 assert(!entry->HasGraftMarker());
1099 if (!params_->dry_run) {
1100 XattrList *xattrs = &default_xattrs_;
1101 if (params_->include_xattrs) {
1102 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1103 assert(xattrs);
1104 }
1105 catalog_manager_->AddDirectory(
1106 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1107 entry->relative_parent_path());
1108 if (xattrs != &default_xattrs_)
1109 free(xattrs);
1110 }
1111
1112 if (entry->HasCatalogMarker()
1113 && !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1114 CreateNestedCatalog(entry);
1115 }
1116 }
1117
1118
1119 /**
1120 * this method deletes a single directory entry! Make sure to empty it
1121 * before you call this method or simply use
1122 * SyncMediator::RemoveDirectoryRecursively instead.
1123 */
1124 void SyncMediator::RemoveDirectory(SharedPtr<SyncItem> entry) {
1125 const std::string directory_path = entry->GetRelativePath();
1126
1127 if (catalog_manager_->IsTransitionPoint(directory_path)) {
1128 RemoveNestedCatalog(entry);
1129 }
1130
1131 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1132 if (!params_->dry_run) {
1133 catalog_manager_->RemoveDirectory(directory_path);
1134 }
1135
1136 perf::Inc(counters_->n_directories_removed);
1137 }
1138
1139 void SyncMediator::TouchDirectory(SharedPtr<SyncItem> entry) {
1140 reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1141 catalog::DirectoryEntry());
1142
1143 const std::string directory_path = entry->GetRelativePath();
1144
1145 if (!params_->dry_run) {
1146 XattrList *xattrs = &default_xattrs_;
1147 if (params_->include_xattrs) {
1148 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1149 assert(xattrs);
1150 }
1151 catalog_manager_->TouchDirectory(
1152 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1153 directory_path);
1154 if (xattrs != &default_xattrs_)
1155 free(xattrs);
1156 }
1157
1158 if (entry->HasCatalogMarker()
1159 && !catalog_manager_->IsTransitionPoint(directory_path)) {
1160 CreateNestedCatalog(entry);
1161 } else if (!entry->HasCatalogMarker()
1162 && catalog_manager_->IsTransitionPoint(directory_path)) {
1163 RemoveNestedCatalog(entry);
1164 }
1165 }
1166
1167 /**
1168 * All hardlinks in the current directory have been picked up. Now they are
1169 * added to the catalogs.
1170 */
1171 void SyncMediator::AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks) {
1172 assert(handle_hardlinks_);
1173
1174 for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1175 iEnd = hardlinks.end();
1176 i != iEnd;
1177 ++i) {
1178 if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()
1179 && !params_->ignore_xdir_hardlinks) {
1180 PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1181 i->second.master->GetUnionPath().c_str());
1182 }
1183
1184 if (params_->print_changeset) {
1185 for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1186 jEnd = i->second.hardlinks.end();
1187 j != jEnd;
1188 ++j) {
1189 const std::string changeset_notice = GetParentPath(i->second.master
1190 ->GetUnionPath())
1191 + "/" + j->second->filename();
1192 reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1193 }
1194 }
1195
1196 if (params_->dry_run)
1197 continue;
1198
1199 if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1200 AddHardlinkGroup(i->second);
1201 else
1202 hardlink_queue_.push_back(i->second);
1203 }
1204 }
1205
1206
1207 void SyncMediator::AddHardlinkGroup(const HardlinkGroup &group) {
1208 assert(handle_hardlinks_);
1209
1210 // Create a DirectoryEntry list out of the hardlinks
1211 catalog::DirectoryEntryBaseList hardlinks;
1212 for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1213 iEnd = group.hardlinks.end();
1214 i != iEnd;
1215 ++i) {
1216 hardlinks.push_back(
1217 i->second->CreateBasicCatalogDirent(params_->enable_mtime_ns));
1218 }
1219 XattrList *xattrs = &default_xattrs_;
1220 if (params_->include_xattrs) {
1221 xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1222 assert(xattrs);
1223 }
1224 catalog_manager_->AddHardlinkGroup(hardlinks,
1225 *xattrs,
1226 group.master->relative_parent_path(),
1227 group.file_chunks);
1228 if (xattrs != &default_xattrs_)
1229 free(xattrs);
1230 }
1231
1232 } // namespace publish
1233