GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/sync_mediator.cc
Date: 2026-03-15 02:35:27
Exec Total Coverage
Lines: 1 609 0.2%
Branches: 0 927 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #define __STDC_FORMAT_MACROS
6
7 #include "sync_mediator.h"
8
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16
17 #include "catalog_virtual.h"
18 #include "compression/compression.h"
19 #include "crypto/hash.h"
20 #include "directory_entry.h"
21 #include "json_document.h"
22 #include "publish/repository.h"
23 #include "sync_union.h"
24 #include "upload.h"
25 #include "util/concurrency.h"
26 #include "util/exception.h"
27 #include "util/fs_traversal.h"
28 #include "util/posix.h"
29 #include "util/smalloc.h"
30 #include "util/string.h"
31
32 using namespace std; // NOLINT
33
34 namespace publish {
35
36 216 AbstractSyncMediator::~AbstractSyncMediator() { }
37
38 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
39 const SyncParameters *params,
40 perf::StatisticsTemplate statistics)
41 : catalog_manager_(catalog_manager)
42 , union_engine_(NULL)
43 , handle_hardlinks_(false)
44 , params_(params)
45 , reporter_(new SyncDiffReporter(params_->print_changeset
46 ? SyncDiffReporter::kPrintChanges
47 : SyncDiffReporter::kPrintDots)) {
48 const int retval = pthread_mutex_init(&lock_file_queue_, NULL);
49 assert(retval == 0);
50
51 params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
52
53 counters_ = new perf::FsCounters(statistics);
54 }
55
56 SyncMediator::~SyncMediator() { pthread_mutex_destroy(&lock_file_queue_); }
57
58
59 void SyncMediator::RegisterUnionEngine(SyncUnion *engine) {
60 union_engine_ = engine;
61 handle_hardlinks_ = engine->SupportsHardlinks();
62 }
63
64
65 /**
66 * The entry /.cvmfs or entries in /.cvmfs/ must not be added, removed or
67 * modified manually. The directory /.cvmfs is generated by the VirtualCatalog
68 * class if requested.
69 */
70 void SyncMediator::EnsureAllowed(SharedPtr<SyncItem> entry) {
71 const bool ignore_case_setting = false;
72 const string relative_path = entry->GetRelativePath();
73 if ((relative_path == string(catalog::VirtualCatalog::kVirtualPath))
74 || (HasPrefix(relative_path,
75 string(catalog::VirtualCatalog::kVirtualPath) + "/",
76 ignore_case_setting))) {
77 PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
78 relative_path.c_str());
79 }
80 }
81
82
83 /**
84 * Add an entry to the repository.
85 * Added directories will be traversed in order to add the complete subtree.
86 */
87 void SyncMediator::Add(SharedPtr<SyncItem> entry) {
88 EnsureAllowed(entry);
89
90 if (entry->IsDirectory()) {
91 AddDirectoryRecursively(entry);
92 return;
93 }
94
95 // .cvmfsbundles file type
96 if (entry->IsBundleSpec()) {
97 PrintWarning(".cvmfsbundles file encountered. "
98 "Bundles is currently an experimental feature.");
99
100 if (!entry->IsRegularFile()) {
101 PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
102 }
103 if (entry->HasHardlinks()) {
104 PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
105 }
106
107 const std::string parent_path = GetParentPath(entry->GetUnionPath());
108 if (parent_path != union_engine_->union_path()) {
109 PANIC(kLogStderr,
110 "Error: .cvmfsbundles file must be in the root"
111 " directory of the repository. Found in %s",
112 parent_path.c_str());
113 }
114
115 std::string json_string;
116
117 const int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
118 if (fd < 0) {
119 PANIC(kLogStderr, "Could not open file: %s",
120 entry->GetUnionPath().c_str());
121 }
122 if (!SafeReadToString(fd, &json_string)) {
123 PANIC(kLogStderr, "Could not read contents of file: %s",
124 entry->GetUnionPath().c_str());
125 }
126 const UniquePtr<JsonDocument> json(JsonDocument::Create(json_string));
127
128 AddFile(entry);
129 return;
130 }
131
132 if (entry->IsRegularFile() || entry->IsSymlink()) {
133 // A file is a hard link if the link count is greater than 1
134 if (entry->HasHardlinks() && handle_hardlinks_)
135 InsertHardlink(entry);
136 else
137 AddFile(entry);
138 return;
139 } else if (entry->IsGraftMarker()) {
140 LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
141 return; // Ignore markers.
142 }
143
144 // In OverlayFS whiteouts can be represented as character devices with major
145 // and minor numbers equal to 0. Special files will be ignored except if they
146 // are whiteout files.
147 if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
148 if (params_->ignore_special_files) {
149 PrintWarning("'" + entry->GetRelativePath()
150 + "' "
151 "is a special file, ignoring.");
152 } else {
153 if (entry->HasHardlinks() && handle_hardlinks_)
154 InsertHardlink(entry);
155 else
156 AddFile(entry);
157 }
158 return;
159 }
160
161 PrintWarning("'" + entry->GetRelativePath()
162 + "' cannot be added. Unrecognized file type.");
163 }
164
165
166 /**
167 * Touch an entry in the repository.
168 */
169 void SyncMediator::Touch(SharedPtr<SyncItem> entry) {
170 EnsureAllowed(entry);
171
172 if (entry->IsGraftMarker()) {
173 return;
174 }
175 if (entry->IsDirectory()) {
176 TouchDirectory(entry);
177 perf::Inc(counters_->n_directories_changed);
178 return;
179 }
180
181 if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
182 Replace(entry); // This way, hardlink processing is correct
183 // Replace calls Remove; cancel Remove's actions:
184 perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
185
186 // Count only the difference between the old and new file
187 // Symlinks do not count into added or removed bytes
188 int64_t dif = 0;
189
190 // Need to handle 4 cases (symlink->symlink, symlink->regular,
191 // regular->symlink, regular->regular)
192 if (entry->WasSymlink()) {
193 // Replace calls Remove; cancel Remove's actions:
194 perf::Dec(counters_->n_symlinks_removed);
195
196 if (entry->IsSymlink()) {
197 perf::Inc(counters_->n_symlinks_changed);
198 } else {
199 perf::Inc(counters_->n_symlinks_removed);
200 perf::Inc(counters_->n_files_added);
201 dif += entry->GetScratchSize();
202 }
203 } else {
204 // Replace calls Remove; cancel Remove's actions:
205 perf::Dec(counters_->n_files_removed);
206 dif -= entry->GetRdOnlySize();
207 if (entry->IsSymlink()) {
208 perf::Inc(counters_->n_files_removed);
209 perf::Inc(counters_->n_symlinks_added);
210 } else {
211 perf::Inc(counters_->n_files_changed);
212 dif += entry->GetScratchSize();
213 }
214 }
215
216 if (dif > 0) { // added bytes
217 perf::Xadd(counters_->sz_added_bytes, dif);
218 } else { // removed bytes
219 perf::Xadd(counters_->sz_removed_bytes, -dif);
220 }
221 return;
222 }
223
224 PrintWarning("'" + entry->GetRelativePath()
225 + "' cannot be touched. Unrecognized file type.");
226 }
227
228
229 /**
230 * Remove an entry from the repository. Directories will be recursively removed.
231 */
232 void SyncMediator::Remove(SharedPtr<SyncItem> entry, bool fast_delete) {
233 EnsureAllowed(entry);
234
235 if (entry->WasDirectory()) {
236 RemoveDirectoryRecursively(entry, fast_delete);
237 return;
238 }
239
240 if (entry->WasBundleSpec()) {
241 // for now remove using RemoveFile()
242 RemoveFile(entry);
243 return;
244 }
245
246 if (entry->WasRegularFile() || entry->WasSymlink()
247 || entry->WasSpecialFile()) {
248 RemoveFile(entry);
249 return;
250 }
251
252 PrintWarning("'" + entry->GetRelativePath()
253 + "' cannot be deleted. Unrecognized file type.");
254 }
255
256
257 /**
258 * Remove the old entry and add the new one.
259 */
260 void SyncMediator::Replace(SharedPtr<SyncItem> entry) {
261 // EnsureAllowed(entry); <-- Done by Remove() and Add()
262 Remove(entry);
263 Add(entry);
264 }
265
266 void SyncMediator::Clone(const std::string from, const std::string to) {
267 catalog_manager_->Clone(from, to);
268 }
269
270 void SyncMediator::EnterDirectory(SharedPtr<SyncItem> entry) {
271 if (!handle_hardlinks_) {
272 return;
273 }
274
275 const HardlinkGroupMap new_map;
276 hardlink_stack_.push(new_map);
277 }
278
279
280 void SyncMediator::LeaveDirectory(SharedPtr<SyncItem> entry) {
281 if (!handle_hardlinks_) {
282 return;
283 }
284
285 CompleteHardlinks(entry);
286 AddLocalHardlinkGroups(GetHardlinkMap());
287 hardlink_stack_.pop();
288 }
289
290
291 /**
292 * Do any pending processing and commit all changes to the catalogs.
293 * To be called after change set traversal is finished.
294 */
295 bool SyncMediator::Commit(manifest::Manifest *manifest) {
296 reporter_->CommitReport();
297
298 if (!params_->dry_run) {
299 LogCvmfs(kLogPublish, kLogStdout,
300 "Waiting for upload of files before committing...");
301 params_->spooler->WaitForUpload();
302 }
303
304 if (!hardlink_queue_.empty()) {
305 assert(handle_hardlinks_);
306
307 LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
308 params_->spooler->UnregisterListeners();
309 params_->spooler->RegisterListener(&SyncMediator::PublishHardlinksCallback,
310 this);
311
312 // TODO(rmeusel): Revise that for Thread Safety!
313 // This loop will spool hardlinks into the spooler, which will then
314 // process them.
315 // On completion of every hardlink the spooler will asynchronously
316 // emit callbacks (SyncMediator::PublishHardlinksCallback) which
317 // might happen while this for-loop goes through the hardlink_queue_
318 //
319 // For the moment this seems not to be a problem, but it's an accident
320 // just waiting to happen.
321 //
322 // Note: Just wrapping this loop in a mutex might produce a dead lock
323 // since the spooler does not fill it's processing queue to an
324 // unlimited size. Meaning that it might be flooded with hard-
325 // links and waiting for the queue to be processed while proces-
326 // sing is stalled because the callback is waiting for this
327 // mutex.
328 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
329 iEnd = hardlink_queue_.end();
330 i != iEnd;
331 ++i) {
332 LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
333 i->master->GetUnionPath().c_str());
334 IngestionSource *source = new FileIngestionSource(
335 i->master->GetUnionPath());
336 params_->spooler->Process(source);
337 }
338
339 params_->spooler->WaitForUpload();
340
341 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
342 iEnd = hardlink_queue_.end();
343 i != iEnd;
344 ++i) {
345 LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
346 i->master->GetUnionPath().c_str());
347 AddHardlinkGroup(*i);
348 }
349 }
350
351 if (union_engine_)
352 union_engine_->PostUpload();
353
354 params_->spooler->UnregisterListeners();
355
356 if (params_->dry_run) {
357 manifest = NULL;
358 return true;
359 }
360
361 LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
362 if (params_->spooler->GetNumberOfErrors() > 0) {
363 LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
364 return false;
365 }
366
367 if (catalog_manager_->IsBalanceable()
368 || (params_->virtual_dir_actions
369 != catalog::VirtualCatalog::kActionNone)) {
370 if (catalog_manager_->IsBalanceable())
371 catalog_manager_->Balance();
372 // Commit empty string to ensure that the "content" of the auto catalog
373 // markers is present in the repository.
374 const string empty_file = CreateTempPath(params_->dir_temp + "/empty",
375 0600);
376 IngestionSource *source = new FileIngestionSource(empty_file);
377 params_->spooler->Process(source);
378 params_->spooler->WaitForUpload();
379 unlink(empty_file.c_str());
380 if (params_->spooler->GetNumberOfErrors() > 0) {
381 LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
382 return false;
383 }
384 }
385 catalog_manager_->PrecalculateListings();
386 return catalog_manager_->Commit(params_->stop_for_catalog_tweaks,
387 params_->manual_revision, manifest);
388 }
389
390
391 void SyncMediator::InsertHardlink(SharedPtr<SyncItem> entry) {
392 assert(handle_hardlinks_);
393
394 const uint64_t inode = entry->GetUnionInode();
395 LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
396 inode, entry->GetUnionPath().c_str());
397
398 // Find the hard link group in the lists
399 const HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(
400 inode);
401
402 if (hardlink_group == GetHardlinkMap().end()) {
403 // Create a new hardlink group
404 GetHardlinkMap().insert(
405 HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
406 } else {
407 // Append the file to the appropriate hardlink group
408 hardlink_group->second.AddHardlink(entry);
409 }
410
411 // publish statistics counting for new file
412 if (entry->IsNew()) {
413 perf::Inc(counters_->n_files_added);
414 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
415 }
416 }
417
418
419 void SyncMediator::InsertLegacyHardlink(SharedPtr<SyncItem> entry) {
420 // Check if found file has hardlinks (nlink > 1)
421 // As we are looking through all files in one directory here, there might be
422 // completely untouched hardlink groups, which we can safely skip.
423 // Finally we have to see if the hardlink is already part of this group
424
425 assert(handle_hardlinks_);
426
427 if (entry->GetUnionLinkcount() < 2)
428 return;
429
430 const uint64_t inode = entry->GetUnionInode();
431 HardlinkGroupMap::iterator hl_group;
432 hl_group = GetHardlinkMap().find(inode);
433
434 if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
435 bool found = false;
436
437 // search for the entry in this group
438 for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
439 iEnd = hl_group->second.hardlinks.end();
440 i != iEnd;
441 ++i) {
442 if (*(i->second) == *entry) {
443 found = true;
444 break;
445 }
446 }
447
448 if (!found) {
449 // Hardlink already in the group?
450 // If one element of a hardlink group is edited, all elements must be
451 // replaced. Here, we remove an untouched hardlink and add it to its
452 // hardlink group for re-adding later
453 LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
454 entry->GetUnionPath().c_str());
455 Remove(entry);
456 hl_group->second.AddHardlink(entry);
457 }
458 }
459 }
460
461
462 /**
463 * Create a recursion engine which DOES NOT recurse into directories.
464 * It basically goes through the current directory (in the union volume) and
465 * searches for legacy hardlinks which has to be connected to the new
466 * or edited ones.
467 */
468 void SyncMediator::CompleteHardlinks(SharedPtr<SyncItem> entry) {
469 assert(handle_hardlinks_);
470
471 // If no hardlink in this directory was changed, we can skip this
472 if (GetHardlinkMap().empty())
473 return;
474
475 LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
476 entry->GetUnionPath().c_str());
477
478 // Look for legacy hardlinks
479 FileSystemTraversal<SyncMediator> traversal(this, union_engine_->union_path(),
480 false);
481 traversal.fn_new_file = &SyncMediator::LegacyRegularHardlinkCallback;
482 traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
483 traversal.fn_new_character_dev = &SyncMediator::
484 LegacyCharacterDeviceHardlinkCallback;
485 traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
486 traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
487 traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
488 traversal.Recurse(entry->GetUnionPath());
489 }
490
491
492 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
493 const string &file_name) {
494 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
495 kItemFile);
496 InsertLegacyHardlink(entry);
497 }
498
499
500 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
501 const string &file_name) {
502 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
503 kItemSymlink);
504 InsertLegacyHardlink(entry);
505 }
506
507 void SyncMediator::LegacyCharacterDeviceHardlinkCallback(
508 const string &parent_dir, const string &file_name) {
509 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
510 kItemCharacterDevice);
511 InsertLegacyHardlink(entry);
512 }
513
514 void SyncMediator::LegacyBlockDeviceHardlinkCallback(const string &parent_dir,
515 const string &file_name) {
516 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
517 kItemBlockDevice);
518 InsertLegacyHardlink(entry);
519 }
520
521 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
522 const string &file_name) {
523 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
524 kItemFifo);
525 InsertLegacyHardlink(entry);
526 }
527
528 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
529 const string &file_name) {
530 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
531 kItemSocket);
532 InsertLegacyHardlink(entry);
533 }
534
535
536 void SyncMediator::AddDirectoryRecursively(SharedPtr<SyncItem> entry) {
537 AddDirectory(entry);
538
539 // Create a recursion engine, which recursively adds all entries in a newly
540 // created directory
541 FileSystemTraversal<SyncMediator> traversal(
542 this, union_engine_->scratch_path(), true);
543 traversal.fn_enter_dir = &SyncMediator::EnterAddedDirectoryCallback;
544 traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
545 traversal.fn_new_file = &SyncMediator::AddFileCallback;
546 traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
547 traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
548 traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
549 traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
550 traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
551 traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
552 traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
553 traversal.Recurse(entry->GetScratchPath());
554 }
555
556
557 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
558 const std::string &dir_name) {
559 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
560 kItemDir);
561 AddDirectory(entry);
562 return true; // The recursion engine should recurse deeper here
563 }
564
565
566 void SyncMediator::AddFileCallback(const std::string &parent_dir,
567 const std::string &file_name) {
568 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
569 kItemFile);
570 Add(entry);
571 }
572
573
574 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
575 const std::string &file_name) {
576 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
577 kItemCharacterDevice);
578 Add(entry);
579 }
580
581 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
582 const std::string &file_name) {
583 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
584 kItemBlockDevice);
585 Add(entry);
586 }
587
588 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
589 const std::string &file_name) {
590 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
591 kItemFifo);
592 Add(entry);
593 }
594
595 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
596 const std::string &file_name) {
597 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
598 kItemSocket);
599 Add(entry);
600 }
601
602 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
603 const std::string &link_name) {
604 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
605 kItemSymlink);
606 Add(entry);
607 }
608
609
610 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
611 const std::string &dir_name) {
612 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
613 kItemDir);
614 EnterDirectory(entry);
615 }
616
617
618 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
619 const std::string &dir_name) {
620 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
621 kItemDir);
622 LeaveDirectory(entry);
623 }
624
625
626 void SyncMediator::RemoveDirectoryRecursively(SharedPtr<SyncItem> entry,
627 bool fast_delete) {
628 const std::string directory_path = entry->GetRelativePath();
629
630 // Fast delete: skip filesystem traversal for nested catalog directories.
631 // Instead of recursively walking the filesystem and removing each entry,
632 // we just remove the nested catalog reference from the parent catalog
633 // (with merge=false so entries are not copied to the parent) and then
634 // remove the mountpoint directory entry.
635 if (fast_delete && catalog_manager_->IsTransitionPoint(directory_path)) {
636 // Get the nested catalog's counters before removal so we can update
637 // publish statistics with the total number of removed entries
638 std::string subcatalog_path;
639 shash::Any hash;
640 PathString ps_path;
641 ps_path.Assign(directory_path.data(), directory_path.length());
642 const catalog::Counters counters =
643 catalog_manager_->LookupCounters(ps_path, &subcatalog_path, &hash);
644 {
645 perf::Xadd(counters_->n_files_removed,
646 static_cast<int64_t>(counters.self.regular_files
647 + counters.subtree.regular_files));
648 perf::Xadd(counters_->n_directories_removed,
649 static_cast<int64_t>(counters.self.directories
650 + counters.subtree.directories));
651 perf::Xadd(counters_->n_symlinks_removed,
652 static_cast<int64_t>(counters.self.symlinks
653 + counters.subtree.symlinks));
654 perf::Xadd(counters_->sz_removed_bytes,
655 static_cast<int64_t>(counters.self.file_size
656 + counters.subtree.file_size));
657 }
658
659 // Remove nested catalog (merge=false: just remove the reference and
660 // adjust parent subtree counters, don't copy entries into parent)
661 const std::string notice = "Nested catalog at " + entry->GetUnionPath();
662 reporter_->OnRemove(notice, catalog::DirectoryEntry());
663 if (!params_->dry_run) {
664 catalog_manager_->RemoveNestedCatalog(directory_path, false);
665 }
666
667 // Remove the mountpoint directory entry from the parent catalog
668 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
669 if (!params_->dry_run) {
670 catalog_manager_->RemoveDirectory(directory_path);
671 }
672 perf::Inc(counters_->n_directories_removed);
673
674 return;
675 }
676
677 // Normal path: delete a directory AFTER it was emptied here,
678 // because it would start up another recursion
679
680 const bool recurse = false;
681 FileSystemTraversal<SyncMediator> traversal(
682 this, union_engine_->rdonly_path(), recurse);
683 traversal.fn_new_file = &SyncMediator::RemoveFileCallback;
684 traversal.fn_new_dir_postfix = &SyncMediator::RemoveDirectoryCallback;
685 traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
686 traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
687 traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
688 traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
689 traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
690 traversal.Recurse(entry->GetRdOnlyPath());
691
692 // The given directory was emptied recursively and can now itself be deleted
693 RemoveDirectory(entry);
694 }
695
696
697 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
698 const std::string &file_name) {
699 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
700 kItemFile);
701 Remove(entry);
702 }
703
704
705 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
706 const std::string &link_name) {
707 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
708 kItemSymlink);
709 Remove(entry);
710 }
711
712 void SyncMediator::RemoveCharacterDeviceCallback(const std::string &parent_dir,
713 const std::string &link_name) {
714 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
715 kItemCharacterDevice);
716 Remove(entry);
717 }
718
719 void SyncMediator::RemoveBlockDeviceCallback(const std::string &parent_dir,
720 const std::string &link_name) {
721 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
722 kItemBlockDevice);
723 Remove(entry);
724 }
725
726 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
727 const std::string &link_name) {
728 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
729 kItemFifo);
730 Remove(entry);
731 }
732
733 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
734 const std::string &link_name) {
735 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
736 kItemSocket);
737 Remove(entry);
738 }
739
740 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
741 const std::string &dir_name) {
742 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
743 kItemDir);
744 RemoveDirectoryRecursively(entry);
745 }
746
747
748 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
749 const std::string &file_name) {
750 if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
751 return true;
752 }
753
754 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
755 kItemUnknown);
756 return entry->IsWhiteout();
757 }
758
759 SharedPtr<SyncItem> SyncMediator::CreateSyncItem(
760 const std::string &relative_parent_path, const std::string &filename,
761 const SyncItemType entry_type) const {
762 return union_engine_->CreateSyncItem(relative_parent_path, filename,
763 entry_type);
764 }
765
766 void SyncMediator::PublishFilesCallback(const upload::SpoolerResult &result) {
767 LogCvmfs(kLogPublish, kLogVerboseMsg,
768 "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
769 result.local_path.c_str(), result.content_hash.ToString().c_str(),
770 result.file_chunks.size(), result.return_code);
771 if (result.return_code != 0) {
772 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
773 result.return_code);
774 }
775
776 SyncItemList::iterator itr;
777 {
778 const MutexLockGuard guard(lock_file_queue_);
779 itr = file_queue_.find(result.local_path);
780 }
781
782 assert(itr != file_queue_.end());
783
784 SyncItem &item = *itr->second;
785 item.SetContentHash(result.content_hash);
786 item.SetCompressionAlgorithm(result.compression_alg);
787
788 XattrList *xattrs = &default_xattrs_;
789 if (params_->include_xattrs) {
790 xattrs = XattrList::CreateFromFile(result.local_path);
791 assert(xattrs != NULL);
792 }
793
794 if (result.IsChunked()) {
795 catalog_manager_->AddChunkedFile(
796 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
797 *xattrs,
798 item.relative_parent_path(),
799 result.file_chunks);
800 } else {
801 catalog_manager_->AddFile(
802 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
803 *xattrs,
804 item.relative_parent_path());
805 }
806
807 if (xattrs != &default_xattrs_)
808 free(xattrs);
809 }
810
811
812 void SyncMediator::PublishHardlinksCallback(
813 const upload::SpoolerResult &result) {
814 LogCvmfs(kLogPublish, kLogVerboseMsg,
815 "Spooler callback for hardlink %s, digest %s, retval %d",
816 result.local_path.c_str(), result.content_hash.ToString().c_str(),
817 result.return_code);
818 if (result.return_code != 0) {
819 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
820 result.return_code);
821 }
822
823 bool found = false;
824 for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
825 if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
826 found = true;
827 hardlink_queue_[i].master->SetContentHash(result.content_hash);
828 SyncItemList::iterator j, jend;
829 for (j = hardlink_queue_[i].hardlinks.begin(),
830 jend = hardlink_queue_[i].hardlinks.end();
831 j != jend;
832 ++j) {
833 j->second->SetContentHash(result.content_hash);
834 j->second->SetCompressionAlgorithm(result.compression_alg);
835 }
836 if (result.IsChunked())
837 hardlink_queue_[i].file_chunks = result.file_chunks;
838
839 break;
840 }
841 }
842
843 assert(found);
844 }
845
846
847 void SyncMediator::CreateNestedCatalog(SharedPtr<SyncItem> directory) {
848 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
849 reporter_->OnAdd(notice, catalog::DirectoryEntry());
850
851 if (!params_->dry_run) {
852 catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
853 }
854 }
855
856
857 void SyncMediator::RemoveNestedCatalog(SharedPtr<SyncItem> directory) {
858 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
859 reporter_->OnRemove(notice, catalog::DirectoryEntry());
860
861 if (!params_->dry_run) {
862 catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
863 }
864 }
865
866 void SyncDiffReporter::OnInit(const history::History::Tag & /*from_tag*/,
867 const history::History::Tag & /*to_tag*/) { }
868
869 void SyncDiffReporter::OnStats(const catalog::DeltaCounters & /*delta*/) { }
870
871 void SyncDiffReporter::OnAdd(const std::string &path,
872 const catalog::DirectoryEntry & /*entry*/) {
873 changed_items_++;
874 AddImpl(path);
875 }
876 void SyncDiffReporter::OnRemove(const std::string &path,
877 const catalog::DirectoryEntry & /*entry*/) {
878 changed_items_++;
879 RemoveImpl(path);
880 }
881 void SyncDiffReporter::OnModify(const std::string &path,
882 const catalog::DirectoryEntry & /*entry_from*/,
883 const catalog::DirectoryEntry & /*entry_to*/) {
884 changed_items_++;
885 ModifyImpl(path);
886 }
887
888 void SyncDiffReporter::CommitReport() {
889 if (print_action_ == kPrintDots) {
890 if (changed_items_ >= processing_dot_interval_) {
891 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, "\n");
892 }
893 }
894 }
895
896 void SyncDiffReporter::PrintDots() {
897 if (changed_items_ % processing_dot_interval_ == 0) {
898 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, ".");
899 }
900 }
901
902 void SyncDiffReporter::AddImpl(const std::string &path) {
903 const char *action_label;
904
905 switch (print_action_) {
906 case kPrintChanges:
907 if (path.at(0) != '/') {
908 action_label = "[x-catalog-add]";
909 } else {
910 action_label = "[add]";
911 }
912 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
913 break;
914
915 case kPrintDots:
916 PrintDots();
917 break;
918 default:
919 assert("Invalid print action.");
920 }
921 }
922
923 void SyncDiffReporter::RemoveImpl(const std::string &path) {
924 const char *action_label;
925
926 switch (print_action_) {
927 case kPrintChanges:
928 if (path.at(0) != '/') {
929 action_label = "[x-catalog-rem]";
930 } else {
931 action_label = "[rem]";
932 }
933
934 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
935 break;
936
937 case kPrintDots:
938 PrintDots();
939 break;
940 default:
941 assert("Invalid print action.");
942 }
943 }
944
945 void SyncDiffReporter::ModifyImpl(const std::string &path) {
946 const char *action_label;
947
948 switch (print_action_) {
949 case kPrintChanges:
950 action_label = "[mod]";
951 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
952 break;
953
954 case kPrintDots:
955 PrintDots();
956 break;
957 default:
958 assert("Invalid print action.");
959 }
960 }
961
962 void SyncMediator::AddFile(SharedPtr<SyncItem> entry) {
963 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
964
965 if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
966 assert(!entry->HasGraftMarker());
967 // Symlinks and special files are completely stored in the catalog
968 XattrList *xattrs = &default_xattrs_;
969 if (params_->include_xattrs) {
970 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
971 assert(xattrs);
972 }
973 catalog_manager_->AddFile(
974 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
975 *xattrs,
976 entry->relative_parent_path());
977 if (xattrs != &default_xattrs_)
978 free(xattrs);
979 } else if (entry->HasGraftMarker() && !params_->dry_run) {
980 if (entry->IsValidGraft()) {
981 // Graft files are added to catalog immediately.
982 if (entry->IsChunkedGraft()) {
983 catalog_manager_->AddChunkedFile(
984 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
985 default_xattrs_,
986 entry->relative_parent_path(),
987 *(entry->GetGraftChunks()));
988 } else {
989 catalog_manager_->AddFile(
990 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
991 default_xattrs_, // TODO(bbockelm): For now, use default xattrs
992 // on grafted files.
993 entry->relative_parent_path());
994 }
995 } else {
996 // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
997 // the graft file is missing information. It's not clear that continuing
998 // forward with the publish is the correct thing to do; abort for now.
999 PANIC(kLogStderr,
1000 "Encountered a grafted file (%s) with "
1001 "invalid grafting information; check contents of .cvmfsgraft-*"
1002 " file. Aborting publish.",
1003 entry->GetRelativePath().c_str());
1004 }
1005 } else if (entry->relative_parent_path().empty()
1006 && entry->IsCatalogMarker()) {
1007 PANIC(kLogStderr, "Error: nested catalog marker in root directory");
1008 } else if (!params_->dry_run) {
1009 {
1010 // Push the file to the spooler, remember the entry for the path
1011 const MutexLockGuard m(&lock_file_queue_);
1012 file_queue_[entry->GetUnionPath()] = entry;
1013 }
1014 // Spool the file
1015 params_->spooler->Process(entry->CreateIngestionSource());
1016 }
1017
1018 // publish statistics counting for new file
1019 if (entry->IsNew()) {
1020 if (entry->IsSymlink()) {
1021 perf::Inc(counters_->n_symlinks_added);
1022 } else {
1023 perf::Inc(counters_->n_files_added);
1024 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
1025 }
1026 }
1027 }
1028
1029 void SyncMediator::RemoveFile(SharedPtr<SyncItem> entry) {
1030 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1031
1032 if (!params_->dry_run) {
1033 if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
1034 LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
1035 entry->GetUnionPath().c_str());
1036 catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
1037 }
1038 catalog_manager_->RemoveFile(entry->GetRelativePath());
1039 }
1040
1041 // Counting nr of removed files and removed bytes
1042 if (entry->WasSymlink()) {
1043 perf::Inc(counters_->n_symlinks_removed);
1044 } else {
1045 perf::Inc(counters_->n_files_removed);
1046 }
1047 perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
1048 }
1049
1050 void SyncMediator::AddUnmaterializedDirectory(SharedPtr<SyncItem> entry) {
1051 AddDirectory(entry);
1052 }
1053
1054 void SyncMediator::AddDirectory(SharedPtr<SyncItem> entry) {
1055 if (entry->IsBundleSpec()) {
1056 PANIC(kLogStderr,
1057 "Illegal directory name: .cvmfsbundles (%s). "
1058 ".cvmfsbundles is reserved for bundles specification files",
1059 entry->GetUnionPath().c_str());
1060 }
1061
1062 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1063
1064 perf::Inc(counters_->n_directories_added);
1065 assert(!entry->HasGraftMarker());
1066 if (!params_->dry_run) {
1067 XattrList *xattrs = &default_xattrs_;
1068 if (params_->include_xattrs) {
1069 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1070 assert(xattrs);
1071 }
1072 catalog_manager_->AddDirectory(
1073 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1074 entry->relative_parent_path());
1075 if (xattrs != &default_xattrs_)
1076 free(xattrs);
1077 }
1078
1079 if (entry->HasCatalogMarker()
1080 && !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1081 CreateNestedCatalog(entry);
1082 }
1083 }
1084
1085
1086 /**
1087 * this method deletes a single directory entry! Make sure to empty it
1088 * before you call this method or simply use
1089 * SyncMediator::RemoveDirectoryRecursively instead.
1090 */
1091 void SyncMediator::RemoveDirectory(SharedPtr<SyncItem> entry) {
1092 const std::string directory_path = entry->GetRelativePath();
1093
1094 if (catalog_manager_->IsTransitionPoint(directory_path)) {
1095 RemoveNestedCatalog(entry);
1096 }
1097
1098 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1099 if (!params_->dry_run) {
1100 catalog_manager_->RemoveDirectory(directory_path);
1101 }
1102
1103 perf::Inc(counters_->n_directories_removed);
1104 }
1105
1106 void SyncMediator::TouchDirectory(SharedPtr<SyncItem> entry) {
1107 reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1108 catalog::DirectoryEntry());
1109
1110 const std::string directory_path = entry->GetRelativePath();
1111
1112 if (!params_->dry_run) {
1113 XattrList *xattrs = &default_xattrs_;
1114 if (params_->include_xattrs) {
1115 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1116 assert(xattrs);
1117 }
1118 catalog_manager_->TouchDirectory(
1119 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1120 directory_path);
1121 if (xattrs != &default_xattrs_)
1122 free(xattrs);
1123 }
1124
1125 if (entry->HasCatalogMarker()
1126 && !catalog_manager_->IsTransitionPoint(directory_path)) {
1127 CreateNestedCatalog(entry);
1128 } else if (!entry->HasCatalogMarker()
1129 && catalog_manager_->IsTransitionPoint(directory_path)) {
1130 RemoveNestedCatalog(entry);
1131 }
1132 }
1133
1134 /**
1135 * All hardlinks in the current directory have been picked up. Now they are
1136 * added to the catalogs.
1137 */
1138 void SyncMediator::AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks) {
1139 assert(handle_hardlinks_);
1140
1141 for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1142 iEnd = hardlinks.end();
1143 i != iEnd;
1144 ++i) {
1145 if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()
1146 && !params_->ignore_xdir_hardlinks) {
1147 PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1148 i->second.master->GetUnionPath().c_str());
1149 }
1150
1151 if (params_->print_changeset) {
1152 for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1153 jEnd = i->second.hardlinks.end();
1154 j != jEnd;
1155 ++j) {
1156 const std::string changeset_notice = GetParentPath(i->second.master
1157 ->GetUnionPath())
1158 + "/" + j->second->filename();
1159 reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1160 }
1161 }
1162
1163 if (params_->dry_run)
1164 continue;
1165
1166 if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1167 AddHardlinkGroup(i->second);
1168 else
1169 hardlink_queue_.push_back(i->second);
1170 }
1171 }
1172
1173
1174 void SyncMediator::AddHardlinkGroup(const HardlinkGroup &group) {
1175 assert(handle_hardlinks_);
1176
1177 // Create a DirectoryEntry list out of the hardlinks
1178 catalog::DirectoryEntryBaseList hardlinks;
1179 for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1180 iEnd = group.hardlinks.end();
1181 i != iEnd;
1182 ++i) {
1183 hardlinks.push_back(
1184 i->second->CreateBasicCatalogDirent(params_->enable_mtime_ns));
1185 }
1186 XattrList *xattrs = &default_xattrs_;
1187 if (params_->include_xattrs) {
1188 xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1189 assert(xattrs);
1190 }
1191 catalog_manager_->AddHardlinkGroup(hardlinks,
1192 *xattrs,
1193 group.master->relative_parent_path(),
1194 group.file_chunks);
1195 if (xattrs != &default_xattrs_)
1196 free(xattrs);
1197 }
1198
1199 } // namespace publish
1200