GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/sync_mediator.cc
Date: 2026-04-26 02:35:59
Exec Total Coverage
Lines: 1 613 0.2%
Branches: 0 929 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #define __STDC_FORMAT_MACROS
6
7 #include "sync_mediator.h"
8
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16
17 #include "catalog_virtual.h"
18 #include "compression/compression.h"
19 #include "crypto/hash.h"
20 #include "directory_entry.h"
21 #include "json_document.h"
22 #include "publish/repository.h"
23 #include "sync_union.h"
24 #include "upload.h"
25 #include "util/exception.h"
26 #include "util/fs_traversal.h"
27 #include "util/posix.h"
28 #include "util/string.h"
29
30 using namespace std; // NOLINT
31
32 namespace publish {
33
34 552 AbstractSyncMediator::~AbstractSyncMediator() { }
35
36 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
37 const SyncParameters *params,
38 perf::StatisticsTemplate statistics)
39 : catalog_manager_(catalog_manager)
40 , union_engine_(NULL)
41 , handle_hardlinks_(false)
42 , recursive_fast_delete_(false)
43 , params_(params)
44 , reporter_(new SyncDiffReporter(params_->print_changeset
45 ? SyncDiffReporter::kPrintChanges
46 : SyncDiffReporter::kPrintDots)) {
47 const int retval = pthread_mutex_init(&lock_file_queue_, NULL);
48 assert(retval == 0);
49
50 params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
51
52 counters_ = new perf::FsCounters(statistics);
53 }
54
55 SyncMediator::~SyncMediator() { pthread_mutex_destroy(&lock_file_queue_); }
56
57
58 void SyncMediator::RegisterUnionEngine(SyncUnion *engine) {
59 union_engine_ = engine;
60 handle_hardlinks_ = engine->SupportsHardlinks();
61 }
62
63
64 /**
65 * The entry /.cvmfs or entries in /.cvmfs/ must not be added, removed or
66 * modified manually. The directory /.cvmfs is generated by the VirtualCatalog
67 * class if requested.
68 */
69 void SyncMediator::EnsureAllowed(SharedPtr<SyncItem> entry) {
70 const bool ignore_case_setting = false;
71 const string relative_path = entry->GetRelativePath();
72 if ((relative_path == string(catalog::VirtualCatalog::kVirtualPath))
73 || (HasPrefix(relative_path,
74 string(catalog::VirtualCatalog::kVirtualPath) + "/",
75 ignore_case_setting))) {
76 PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
77 relative_path.c_str());
78 }
79 }
80
81
82 /**
83 * Add an entry to the repository.
84 * Added directories will be traversed in order to add the complete subtree.
85 */
86 void SyncMediator::Add(SharedPtr<SyncItem> entry) {
87 EnsureAllowed(entry);
88
89 if (entry->IsDirectory()) {
90 AddDirectoryRecursively(entry);
91 return;
92 }
93
94 // .cvmfsbundles file type
95 if (entry->IsBundleSpec()) {
96 PrintWarning(".cvmfsbundles file encountered. "
97 "Bundles is currently an experimental feature.");
98
99 if (!entry->IsRegularFile()) {
100 PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
101 }
102 if (entry->HasHardlinks()) {
103 PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
104 }
105
106 const std::string parent_path = GetParentPath(entry->GetUnionPath());
107 if (parent_path != union_engine_->union_path()) {
108 PANIC(kLogStderr,
109 "Error: .cvmfsbundles file must be in the root"
110 " directory of the repository. Found in %s",
111 parent_path.c_str());
112 }
113
114 std::string json_string;
115
116 const int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
117 if (fd < 0) {
118 PANIC(kLogStderr, "Could not open file: %s",
119 entry->GetUnionPath().c_str());
120 }
121 if (!SafeReadToString(fd, &json_string)) {
122 PANIC(kLogStderr, "Could not read contents of file: %s",
123 entry->GetUnionPath().c_str());
124 }
125 const UniquePtr<JsonDocument> json(JsonDocument::Create(json_string));
126
127 AddFile(entry);
128 return;
129 }
130
131 if (entry->IsRegularFile() || entry->IsSymlink()) {
132 // A file is a hard link if the link count is greater than 1
133 if (entry->HasHardlinks() && handle_hardlinks_)
134 InsertHardlink(entry);
135 else
136 AddFile(entry);
137 return;
138 } else if (entry->IsGraftMarker()) {
139 LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
140 return; // Ignore markers.
141 }
142
143 // In OverlayFS whiteouts can be represented as character devices with major
144 // and minor numbers equal to 0. Special files will be ignored except if they
145 // are whiteout files.
146 if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
147 if (params_->ignore_special_files) {
148 PrintWarning("'" + entry->GetRelativePath()
149 + "' "
150 "is a special file, ignoring.");
151 } else {
152 if (entry->HasHardlinks() && handle_hardlinks_)
153 InsertHardlink(entry);
154 else
155 AddFile(entry);
156 }
157 return;
158 }
159
160 PrintWarning("'" + entry->GetRelativePath()
161 + "' cannot be added. Unrecognized file type.");
162 }
163
164
165 /**
166 * Touch an entry in the repository.
167 */
168 void SyncMediator::Touch(SharedPtr<SyncItem> entry) {
169 EnsureAllowed(entry);
170
171 if (entry->IsGraftMarker()) {
172 return;
173 }
174 if (entry->IsDirectory()) {
175 TouchDirectory(entry);
176 perf::Inc(counters_->n_directories_changed);
177 return;
178 }
179
180 if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
181 Replace(entry); // This way, hardlink processing is correct
182 // Replace calls Remove; cancel Remove's actions:
183 perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
184
185 // Count only the difference between the old and new file
186 // Symlinks do not count into added or removed bytes
187 int64_t dif = 0;
188
189 // Need to handle 4 cases (symlink->symlink, symlink->regular,
190 // regular->symlink, regular->regular)
191 if (entry->WasSymlink()) {
192 // Replace calls Remove; cancel Remove's actions:
193 perf::Dec(counters_->n_symlinks_removed);
194
195 if (entry->IsSymlink()) {
196 perf::Inc(counters_->n_symlinks_changed);
197 } else {
198 perf::Inc(counters_->n_symlinks_removed);
199 perf::Inc(counters_->n_files_added);
200 dif += entry->GetScratchSize();
201 }
202 } else {
203 // Replace calls Remove; cancel Remove's actions:
204 perf::Dec(counters_->n_files_removed);
205 dif -= entry->GetRdOnlySize();
206 if (entry->IsSymlink()) {
207 perf::Inc(counters_->n_files_removed);
208 perf::Inc(counters_->n_symlinks_added);
209 } else {
210 perf::Inc(counters_->n_files_changed);
211 dif += entry->GetScratchSize();
212 }
213 }
214
215 if (dif > 0) { // added bytes
216 perf::Xadd(counters_->sz_added_bytes, dif);
217 } else { // removed bytes
218 perf::Xadd(counters_->sz_removed_bytes, -dif);
219 }
220 return;
221 }
222
223 PrintWarning("'" + entry->GetRelativePath()
224 + "' cannot be touched. Unrecognized file type.");
225 }
226
227
228 /**
229 * Remove an entry from the repository. Directories will be recursively removed.
230 */
231 void SyncMediator::Remove(SharedPtr<SyncItem> entry, bool fast_delete) {
232 EnsureAllowed(entry);
233
234 if (entry->WasDirectory()) {
235 RemoveDirectoryRecursively(entry, fast_delete);
236 return;
237 }
238
239 if (entry->WasBundleSpec()) {
240 // for now remove using RemoveFile()
241 RemoveFile(entry);
242 return;
243 }
244
245 if (entry->WasRegularFile() || entry->WasSymlink()
246 || entry->WasSpecialFile()) {
247 RemoveFile(entry);
248 return;
249 }
250
251 PrintWarning("'" + entry->GetRelativePath()
252 + "' cannot be deleted. Unrecognized file type.");
253 }
254
255
256 /**
257 * Remove the old entry and add the new one.
258 */
259 void SyncMediator::Replace(SharedPtr<SyncItem> entry) {
260 // EnsureAllowed(entry); <-- Done by Remove() and Add()
261 Remove(entry);
262 Add(entry);
263 }
264
265 void SyncMediator::Clone(const std::string from, const std::string to) {
266 catalog_manager_->Clone(from, to);
267 }
268
269 void SyncMediator::EnterDirectory(SharedPtr<SyncItem> entry) {
270 if (!handle_hardlinks_) {
271 return;
272 }
273
274 const HardlinkGroupMap new_map;
275 hardlink_stack_.push(new_map);
276 }
277
278
279 void SyncMediator::LeaveDirectory(SharedPtr<SyncItem> entry) {
280 if (!handle_hardlinks_) {
281 return;
282 }
283
284 CompleteHardlinks(entry);
285 AddLocalHardlinkGroups(GetHardlinkMap());
286 hardlink_stack_.pop();
287 }
288
289
290 /**
291 * Do any pending processing and commit all changes to the catalogs.
292 * To be called after change set traversal is finished.
293 */
294 bool SyncMediator::Commit(manifest::Manifest *manifest) {
295 reporter_->CommitReport();
296
297 if (!params_->dry_run) {
298 LogCvmfs(kLogPublish, kLogStdout,
299 "Waiting for upload of files before committing...");
300 params_->spooler->WaitForUpload();
301 }
302
303 if (!hardlink_queue_.empty()) {
304 assert(handle_hardlinks_);
305
306 LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
307 params_->spooler->UnregisterListeners();
308 params_->spooler->RegisterListener(&SyncMediator::PublishHardlinksCallback,
309 this);
310
311 // TODO(rmeusel): Revise that for Thread Safety!
312 // This loop will spool hardlinks into the spooler, which will then
313 // process them.
314 // On completion of every hardlink the spooler will asynchronously
315 // emit callbacks (SyncMediator::PublishHardlinksCallback) which
316 // might happen while this for-loop goes through the hardlink_queue_
317 //
318 // For the moment this seems not to be a problem, but it's an accident
319 // just waiting to happen.
320 //
321 // Note: Just wrapping this loop in a mutex might produce a dead lock
322 // since the spooler does not fill it's processing queue to an
323 // unlimited size. Meaning that it might be flooded with hard-
324 // links and waiting for the queue to be processed while proces-
325 // sing is stalled because the callback is waiting for this
326 // mutex.
327 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
328 iEnd = hardlink_queue_.end();
329 i != iEnd;
330 ++i) {
331 LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
332 i->master->GetUnionPath().c_str());
333 IngestionSource *source = new FileIngestionSource(
334 i->master->GetUnionPath());
335 params_->spooler->Process(source);
336 }
337
338 params_->spooler->WaitForUpload();
339
340 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
341 iEnd = hardlink_queue_.end();
342 i != iEnd;
343 ++i) {
344 LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
345 i->master->GetUnionPath().c_str());
346 AddHardlinkGroup(*i);
347 }
348 }
349
350 if (union_engine_)
351 union_engine_->PostUpload();
352
353 params_->spooler->UnregisterListeners();
354
355 if (params_->dry_run) {
356 manifest = NULL;
357 return true;
358 }
359
360 LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
361 if (params_->spooler->GetNumberOfErrors() > 0) {
362 LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
363 return false;
364 }
365
366 if (catalog_manager_->IsBalanceable()
367 || (params_->virtual_dir_actions
368 != catalog::VirtualCatalog::kActionNone)) {
369 if (catalog_manager_->IsBalanceable())
370 catalog_manager_->Balance();
371 // Commit empty string to ensure that the "content" of the auto catalog
372 // markers is present in the repository.
373 const string empty_file = CreateTempPath(params_->dir_temp + "/empty",
374 0600);
375 IngestionSource *source = new FileIngestionSource(empty_file);
376 params_->spooler->Process(source);
377 params_->spooler->WaitForUpload();
378 unlink(empty_file.c_str());
379 if (params_->spooler->GetNumberOfErrors() > 0) {
380 LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
381 return false;
382 }
383 }
384 catalog_manager_->PrecalculateListings();
385 return catalog_manager_->Commit(params_->stop_for_catalog_tweaks,
386 params_->manual_revision, manifest);
387 }
388
389
390 void SyncMediator::InsertHardlink(SharedPtr<SyncItem> entry) {
391 assert(handle_hardlinks_);
392
393 const uint64_t inode = entry->GetUnionInode();
394 LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
395 inode, entry->GetUnionPath().c_str());
396
397 // Find the hard link group in the lists
398 const HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(
399 inode);
400
401 if (hardlink_group == GetHardlinkMap().end()) {
402 // Create a new hardlink group
403 GetHardlinkMap().insert(
404 HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
405 } else {
406 // Append the file to the appropriate hardlink group
407 hardlink_group->second.AddHardlink(entry);
408 }
409
410 // publish statistics counting for new file
411 if (entry->IsNew()) {
412 perf::Inc(counters_->n_files_added);
413 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
414 }
415 }
416
417
418 void SyncMediator::InsertLegacyHardlink(SharedPtr<SyncItem> entry) {
419 // Check if found file has hardlinks (nlink > 1)
420 // As we are looking through all files in one directory here, there might be
421 // completely untouched hardlink groups, which we can safely skip.
422 // Finally we have to see if the hardlink is already part of this group
423
424 assert(handle_hardlinks_);
425
426 if (entry->GetUnionLinkcount() < 2)
427 return;
428
429 const uint64_t inode = entry->GetUnionInode();
430 HardlinkGroupMap::iterator hl_group;
431 hl_group = GetHardlinkMap().find(inode);
432
433 if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
434 bool found = false;
435
436 // search for the entry in this group
437 for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
438 iEnd = hl_group->second.hardlinks.end();
439 i != iEnd;
440 ++i) {
441 if (*(i->second) == *entry) {
442 found = true;
443 break;
444 }
445 }
446
447 if (!found) {
448 // Hardlink already in the group?
449 // If one element of a hardlink group is edited, all elements must be
450 // replaced. Here, we remove an untouched hardlink and add it to its
451 // hardlink group for re-adding later
452 LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
453 entry->GetUnionPath().c_str());
454 Remove(entry);
455 hl_group->second.AddHardlink(entry);
456 }
457 }
458 }
459
460
461 /**
462 * Create a recursion engine which DOES NOT recurse into directories.
463 * It basically goes through the current directory (in the union volume) and
464 * searches for legacy hardlinks which has to be connected to the new
465 * or edited ones.
466 */
467 void SyncMediator::CompleteHardlinks(SharedPtr<SyncItem> entry) {
468 assert(handle_hardlinks_);
469
470 // If no hardlink in this directory was changed, we can skip this
471 if (GetHardlinkMap().empty())
472 return;
473
474 LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
475 entry->GetUnionPath().c_str());
476
477 // Look for legacy hardlinks
478 FileSystemTraversal<SyncMediator> traversal(this, union_engine_->union_path(),
479 false);
480 traversal.fn_new_file = &SyncMediator::LegacyRegularHardlinkCallback;
481 traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
482 traversal.fn_new_character_dev = &SyncMediator::
483 LegacyCharacterDeviceHardlinkCallback;
484 traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
485 traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
486 traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
487 traversal.Recurse(entry->GetUnionPath());
488 }
489
490
491 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
492 const string &file_name) {
493 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
494 kItemFile);
495 InsertLegacyHardlink(entry);
496 }
497
498
499 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
500 const string &file_name) {
501 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
502 kItemSymlink);
503 InsertLegacyHardlink(entry);
504 }
505
506 void SyncMediator::LegacyCharacterDeviceHardlinkCallback(
507 const string &parent_dir, const string &file_name) {
508 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
509 kItemCharacterDevice);
510 InsertLegacyHardlink(entry);
511 }
512
513 void SyncMediator::LegacyBlockDeviceHardlinkCallback(const string &parent_dir,
514 const string &file_name) {
515 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
516 kItemBlockDevice);
517 InsertLegacyHardlink(entry);
518 }
519
520 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
521 const string &file_name) {
522 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
523 kItemFifo);
524 InsertLegacyHardlink(entry);
525 }
526
527 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
528 const string &file_name) {
529 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
530 kItemSocket);
531 InsertLegacyHardlink(entry);
532 }
533
534
535 void SyncMediator::AddDirectoryRecursively(SharedPtr<SyncItem> entry) {
536 AddDirectory(entry);
537
538 // Create a recursion engine, which recursively adds all entries in a newly
539 // created directory
540 FileSystemTraversal<SyncMediator> traversal(
541 this, union_engine_->scratch_path(), true);
542 traversal.fn_enter_dir = &SyncMediator::EnterAddedDirectoryCallback;
543 traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
544 traversal.fn_new_file = &SyncMediator::AddFileCallback;
545 traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
546 traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
547 traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
548 traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
549 traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
550 traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
551 traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
552 traversal.Recurse(entry->GetScratchPath());
553 }
554
555
556 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
557 const std::string &dir_name) {
558 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
559 kItemDir);
560 AddDirectory(entry);
561 return true; // The recursion engine should recurse deeper here
562 }
563
564
565 void SyncMediator::AddFileCallback(const std::string &parent_dir,
566 const std::string &file_name) {
567 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
568 kItemFile);
569 Add(entry);
570 }
571
572
573 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
574 const std::string &file_name) {
575 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
576 kItemCharacterDevice);
577 Add(entry);
578 }
579
580 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
581 const std::string &file_name) {
582 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
583 kItemBlockDevice);
584 Add(entry);
585 }
586
587 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
588 const std::string &file_name) {
589 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
590 kItemFifo);
591 Add(entry);
592 }
593
594 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
595 const std::string &file_name) {
596 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
597 kItemSocket);
598 Add(entry);
599 }
600
601 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
602 const std::string &link_name) {
603 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
604 kItemSymlink);
605 Add(entry);
606 }
607
608
609 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
610 const std::string &dir_name) {
611 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
612 kItemDir);
613 EnterDirectory(entry);
614 }
615
616
617 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
618 const std::string &dir_name) {
619 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
620 kItemDir);
621 LeaveDirectory(entry);
622 }
623
624
625 void SyncMediator::RemoveDirectoryRecursively(SharedPtr<SyncItem> entry,
626 bool fast_delete) {
627 const std::string directory_path = entry->GetRelativePath();
628
629 // Fast delete: skip filesystem traversal for nested catalog directories.
630 // Instead of recursively walking the filesystem and removing each entry,
631 // we just remove the nested catalog reference from the parent catalog
632 // (with merge=false so entries are not copied to the parent) and then
633 // remove the mountpoint directory entry.
634 if (fast_delete && catalog_manager_->IsTransitionPoint(directory_path)) {
635 // Get the nested catalog's counters before removal so we can update
636 // publish statistics with the total number of removed entries
637 std::string subcatalog_path;
638 shash::Any hash;
639 PathString ps_path;
640 ps_path.Assign(directory_path.data(), directory_path.length());
641 const catalog::Counters counters =
642 catalog_manager_->LookupCounters(ps_path, &subcatalog_path, &hash);
643 {
644 perf::Xadd(counters_->n_files_removed,
645 static_cast<int64_t>(counters.self.regular_files
646 + counters.subtree.regular_files));
647 perf::Xadd(counters_->n_directories_removed,
648 static_cast<int64_t>(counters.self.directories
649 + counters.subtree.directories));
650 perf::Xadd(counters_->n_symlinks_removed,
651 static_cast<int64_t>(counters.self.symlinks
652 + counters.subtree.symlinks));
653 perf::Xadd(counters_->sz_removed_bytes,
654 static_cast<int64_t>(counters.self.file_size
655 + counters.subtree.file_size));
656 }
657
658 // Remove nested catalog (merge=false: just remove the reference and
659 // adjust parent subtree counters, don't copy entries into parent)
660 const std::string notice = "Nested catalog at " + entry->GetUnionPath();
661 reporter_->OnRemove(notice, catalog::DirectoryEntry());
662 if (!params_->dry_run) {
663 catalog_manager_->RemoveNestedCatalog(directory_path, false);
664 }
665
666 // Remove the mountpoint directory entry from the parent catalog
667 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
668 if (!params_->dry_run) {
669 catalog_manager_->RemoveDirectory(directory_path);
670 }
671 perf::Inc(counters_->n_directories_removed);
672
673 return;
674 }
675
676 // Normal path: delete a directory AFTER it was emptied here,
677 // because it would start up another recursion.
678 // Propagate fast_delete so that any nested catalog transition points
679 // encountered during the traversal are still fast-deleted.
680 const bool prev_fast_delete = recursive_fast_delete_;
681 if (fast_delete) recursive_fast_delete_ = true;
682
683 const bool recurse = false;
684 FileSystemTraversal<SyncMediator> traversal(
685 this, union_engine_->rdonly_path(), recurse);
686 traversal.fn_new_file = &SyncMediator::RemoveFileCallback;
687 traversal.fn_new_dir_postfix = &SyncMediator::RemoveDirectoryCallback;
688 traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
689 traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
690 traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
691 traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
692 traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
693 traversal.Recurse(entry->GetRdOnlyPath());
694
695 recursive_fast_delete_ = prev_fast_delete;
696
697 // The given directory was emptied recursively and can now itself be deleted
698 RemoveDirectory(entry);
699 }
700
701
702 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
703 const std::string &file_name) {
704 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
705 kItemFile);
706 Remove(entry);
707 }
708
709
710 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
711 const std::string &link_name) {
712 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
713 kItemSymlink);
714 Remove(entry);
715 }
716
717 void SyncMediator::RemoveCharacterDeviceCallback(const std::string &parent_dir,
718 const std::string &link_name) {
719 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
720 kItemCharacterDevice);
721 Remove(entry);
722 }
723
724 void SyncMediator::RemoveBlockDeviceCallback(const std::string &parent_dir,
725 const std::string &link_name) {
726 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
727 kItemBlockDevice);
728 Remove(entry);
729 }
730
731 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
732 const std::string &link_name) {
733 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
734 kItemFifo);
735 Remove(entry);
736 }
737
738 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
739 const std::string &link_name) {
740 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
741 kItemSocket);
742 Remove(entry);
743 }
744
745 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
746 const std::string &dir_name) {
747 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
748 kItemDir);
749 RemoveDirectoryRecursively(entry, recursive_fast_delete_);
750 }
751
752
753 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
754 const std::string &file_name) {
755 if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
756 return true;
757 }
758
759 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
760 kItemUnknown);
761 return entry->IsWhiteout();
762 }
763
764 SharedPtr<SyncItem> SyncMediator::CreateSyncItem(
765 const std::string &relative_parent_path, const std::string &filename,
766 const SyncItemType entry_type) const {
767 return union_engine_->CreateSyncItem(relative_parent_path, filename,
768 entry_type);
769 }
770
771 void SyncMediator::PublishFilesCallback(const upload::SpoolerResult &result) {
772 LogCvmfs(kLogPublish, kLogVerboseMsg,
773 "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
774 result.local_path.c_str(), result.content_hash.ToString().c_str(),
775 result.file_chunks.size(), result.return_code);
776 if (result.return_code != 0) {
777 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
778 result.return_code);
779 }
780
781 SyncItemList::iterator itr;
782 {
783 const MutexLockGuard guard(lock_file_queue_);
784 itr = file_queue_.find(result.local_path);
785 }
786
787 assert(itr != file_queue_.end());
788
789 SyncItem &item = *itr->second;
790 item.SetContentHash(result.content_hash);
791 item.SetCompressionAlgorithm(result.compression_alg);
792
793 XattrList *xattrs = &default_xattrs_;
794 if (params_->include_xattrs) {
795 xattrs = XattrList::CreateFromFile(result.local_path);
796 assert(xattrs != NULL);
797 }
798
799 if (result.IsChunked()) {
800 catalog_manager_->AddChunkedFile(
801 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
802 *xattrs,
803 item.relative_parent_path(),
804 result.file_chunks);
805 } else {
806 catalog_manager_->AddFile(
807 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
808 *xattrs,
809 item.relative_parent_path());
810 }
811
812 if (xattrs != &default_xattrs_)
813 free(xattrs);
814 }
815
816
817 void SyncMediator::PublishHardlinksCallback(
818 const upload::SpoolerResult &result) {
819 LogCvmfs(kLogPublish, kLogVerboseMsg,
820 "Spooler callback for hardlink %s, digest %s, retval %d",
821 result.local_path.c_str(), result.content_hash.ToString().c_str(),
822 result.return_code);
823 if (result.return_code != 0) {
824 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
825 result.return_code);
826 }
827
828 bool found = false;
829 for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
830 if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
831 found = true;
832 hardlink_queue_[i].master->SetContentHash(result.content_hash);
833 SyncItemList::iterator j, jend;
834 for (j = hardlink_queue_[i].hardlinks.begin(),
835 jend = hardlink_queue_[i].hardlinks.end();
836 j != jend;
837 ++j) {
838 j->second->SetContentHash(result.content_hash);
839 j->second->SetCompressionAlgorithm(result.compression_alg);
840 }
841 if (result.IsChunked())
842 hardlink_queue_[i].file_chunks = result.file_chunks;
843
844 break;
845 }
846 }
847
848 assert(found);
849 }
850
851
852 void SyncMediator::CreateNestedCatalog(SharedPtr<SyncItem> directory) {
853 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
854 reporter_->OnAdd(notice, catalog::DirectoryEntry());
855
856 if (!params_->dry_run) {
857 catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
858 }
859 }
860
861
862 void SyncMediator::RemoveNestedCatalog(SharedPtr<SyncItem> directory) {
863 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
864 reporter_->OnRemove(notice, catalog::DirectoryEntry());
865
866 if (!params_->dry_run) {
867 catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
868 }
869 }
870
871 void SyncDiffReporter::OnInit(const history::History::Tag & /*from_tag*/,
872 const history::History::Tag & /*to_tag*/) { }
873
874 void SyncDiffReporter::OnStats(const catalog::DeltaCounters & /*delta*/) { }
875
876 void SyncDiffReporter::OnAdd(const std::string &path,
877 const catalog::DirectoryEntry & /*entry*/) {
878 changed_items_++;
879 AddImpl(path);
880 }
881 void SyncDiffReporter::OnRemove(const std::string &path,
882 const catalog::DirectoryEntry & /*entry*/) {
883 changed_items_++;
884 RemoveImpl(path);
885 }
886 void SyncDiffReporter::OnModify(const std::string &path,
887 const catalog::DirectoryEntry & /*entry_from*/,
888 const catalog::DirectoryEntry & /*entry_to*/) {
889 changed_items_++;
890 ModifyImpl(path);
891 }
892
893 void SyncDiffReporter::CommitReport() {
894 if (print_action_ == kPrintDots) {
895 if (changed_items_ >= processing_dot_interval_) {
896 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, "\n");
897 }
898 }
899 }
900
901 void SyncDiffReporter::PrintDots() {
902 if (changed_items_ % processing_dot_interval_ == 0) {
903 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, ".");
904 }
905 }
906
907 void SyncDiffReporter::AddImpl(const std::string &path) {
908 const char *action_label;
909
910 switch (print_action_) {
911 case kPrintChanges:
912 if (path.at(0) != '/') {
913 action_label = "[x-catalog-add]";
914 } else {
915 action_label = "[add]";
916 }
917 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
918 break;
919
920 case kPrintDots:
921 PrintDots();
922 break;
923 default:
924 assert("Invalid print action.");
925 }
926 }
927
928 void SyncDiffReporter::RemoveImpl(const std::string &path) {
929 const char *action_label;
930
931 switch (print_action_) {
932 case kPrintChanges:
933 if (path.at(0) != '/') {
934 action_label = "[x-catalog-rem]";
935 } else {
936 action_label = "[rem]";
937 }
938
939 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
940 break;
941
942 case kPrintDots:
943 PrintDots();
944 break;
945 default:
946 assert("Invalid print action.");
947 }
948 }
949
950 void SyncDiffReporter::ModifyImpl(const std::string &path) {
951 const char *action_label;
952
953 switch (print_action_) {
954 case kPrintChanges:
955 action_label = "[mod]";
956 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
957 break;
958
959 case kPrintDots:
960 PrintDots();
961 break;
962 default:
963 assert("Invalid print action.");
964 }
965 }
966
967 void SyncMediator::AddFile(SharedPtr<SyncItem> entry) {
968 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
969
970 if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
971 assert(!entry->HasGraftMarker());
972 // Symlinks and special files are completely stored in the catalog
973 XattrList *xattrs = &default_xattrs_;
974 if (params_->include_xattrs) {
975 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
976 assert(xattrs);
977 }
978 catalog_manager_->AddFile(
979 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
980 *xattrs,
981 entry->relative_parent_path());
982 if (xattrs != &default_xattrs_)
983 free(xattrs);
984 } else if (entry->HasGraftMarker() && !params_->dry_run) {
985 if (entry->IsValidGraft()) {
986 // Graft files are added to catalog immediately.
987 if (entry->IsChunkedGraft()) {
988 catalog_manager_->AddChunkedFile(
989 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
990 default_xattrs_,
991 entry->relative_parent_path(),
992 *(entry->GetGraftChunks()));
993 } else {
994 catalog_manager_->AddFile(
995 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
996 default_xattrs_, // TODO(bbockelm): For now, use default xattrs
997 // on grafted files.
998 entry->relative_parent_path());
999 }
1000 } else {
1001 // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
1002 // the graft file is missing information. It's not clear that continuing
1003 // forward with the publish is the correct thing to do; abort for now.
1004 PANIC(kLogStderr,
1005 "Encountered a grafted file (%s) with "
1006 "invalid grafting information; check contents of .cvmfsgraft-*"
1007 " file. Aborting publish.",
1008 entry->GetRelativePath().c_str());
1009 }
1010 } else if (entry->relative_parent_path().empty()
1011 && entry->IsCatalogMarker()) {
1012 PANIC(kLogStderr, "Error: nested catalog marker in root directory");
1013 } else if (!params_->dry_run) {
1014 {
1015 // Push the file to the spooler, remember the entry for the path
1016 const MutexLockGuard m(&lock_file_queue_);
1017 file_queue_[entry->GetUnionPath()] = entry;
1018 }
1019 // Spool the file
1020 params_->spooler->Process(entry->CreateIngestionSource());
1021 }
1022
1023 // publish statistics counting for new file
1024 if (entry->IsNew()) {
1025 if (entry->IsSymlink()) {
1026 perf::Inc(counters_->n_symlinks_added);
1027 } else {
1028 perf::Inc(counters_->n_files_added);
1029 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
1030 }
1031 }
1032 }
1033
1034 void SyncMediator::RemoveFile(SharedPtr<SyncItem> entry) {
1035 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1036
1037 if (!params_->dry_run) {
1038 if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
1039 LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
1040 entry->GetUnionPath().c_str());
1041 catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
1042 }
1043 catalog_manager_->RemoveFile(entry->GetRelativePath());
1044 }
1045
1046 // Counting nr of removed files and removed bytes
1047 if (entry->WasSymlink()) {
1048 perf::Inc(counters_->n_symlinks_removed);
1049 } else {
1050 perf::Inc(counters_->n_files_removed);
1051 }
1052 perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
1053 }
1054
1055 void SyncMediator::AddUnmaterializedDirectory(SharedPtr<SyncItem> entry) {
1056 AddDirectory(entry);
1057 }
1058
1059 void SyncMediator::AddDirectory(SharedPtr<SyncItem> entry) {
1060 if (entry->IsBundleSpec()) {
1061 PANIC(kLogStderr,
1062 "Illegal directory name: .cvmfsbundles (%s). "
1063 ".cvmfsbundles is reserved for bundles specification files",
1064 entry->GetUnionPath().c_str());
1065 }
1066
1067 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1068
1069 perf::Inc(counters_->n_directories_added);
1070 assert(!entry->HasGraftMarker());
1071 if (!params_->dry_run) {
1072 XattrList *xattrs = &default_xattrs_;
1073 if (params_->include_xattrs) {
1074 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1075 assert(xattrs);
1076 }
1077 catalog_manager_->AddDirectory(
1078 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1079 entry->relative_parent_path());
1080 if (xattrs != &default_xattrs_)
1081 free(xattrs);
1082 }
1083
1084 if (entry->HasCatalogMarker()
1085 && !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1086 CreateNestedCatalog(entry);
1087 }
1088 }
1089
1090
1091 /**
1092 * this method deletes a single directory entry! Make sure to empty it
1093 * before you call this method or simply use
1094 * SyncMediator::RemoveDirectoryRecursively instead.
1095 */
1096 void SyncMediator::RemoveDirectory(SharedPtr<SyncItem> entry) {
1097 const std::string directory_path = entry->GetRelativePath();
1098
1099 if (catalog_manager_->IsTransitionPoint(directory_path)) {
1100 RemoveNestedCatalog(entry);
1101 }
1102
1103 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1104 if (!params_->dry_run) {
1105 catalog_manager_->RemoveDirectory(directory_path);
1106 }
1107
1108 perf::Inc(counters_->n_directories_removed);
1109 }
1110
1111 void SyncMediator::TouchDirectory(SharedPtr<SyncItem> entry) {
1112 reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1113 catalog::DirectoryEntry());
1114
1115 const std::string directory_path = entry->GetRelativePath();
1116
1117 if (!params_->dry_run) {
1118 XattrList *xattrs = &default_xattrs_;
1119 if (params_->include_xattrs) {
1120 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1121 assert(xattrs);
1122 }
1123 catalog_manager_->TouchDirectory(
1124 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1125 directory_path);
1126 if (xattrs != &default_xattrs_)
1127 free(xattrs);
1128 }
1129
1130 if (entry->HasCatalogMarker()
1131 && !catalog_manager_->IsTransitionPoint(directory_path)) {
1132 CreateNestedCatalog(entry);
1133 } else if (!entry->HasCatalogMarker()
1134 && catalog_manager_->IsTransitionPoint(directory_path)) {
1135 RemoveNestedCatalog(entry);
1136 }
1137 }
1138
1139 /**
1140 * All hardlinks in the current directory have been picked up. Now they are
1141 * added to the catalogs.
1142 */
1143 void SyncMediator::AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks) {
1144 assert(handle_hardlinks_);
1145
1146 for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1147 iEnd = hardlinks.end();
1148 i != iEnd;
1149 ++i) {
1150 if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()
1151 && !params_->ignore_xdir_hardlinks) {
1152 PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1153 i->second.master->GetUnionPath().c_str());
1154 }
1155
1156 if (params_->print_changeset) {
1157 for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1158 jEnd = i->second.hardlinks.end();
1159 j != jEnd;
1160 ++j) {
1161 const std::string changeset_notice = GetParentPath(i->second.master
1162 ->GetUnionPath())
1163 + "/" + j->second->filename();
1164 reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1165 }
1166 }
1167
1168 if (params_->dry_run)
1169 continue;
1170
1171 if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1172 AddHardlinkGroup(i->second);
1173 else
1174 hardlink_queue_.push_back(i->second);
1175 }
1176 }
1177
1178
1179 void SyncMediator::AddHardlinkGroup(const HardlinkGroup &group) {
1180 assert(handle_hardlinks_);
1181
1182 // Create a DirectoryEntry list out of the hardlinks
1183 catalog::DirectoryEntryBaseList hardlinks;
1184 for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1185 iEnd = group.hardlinks.end();
1186 i != iEnd;
1187 ++i) {
1188 hardlinks.push_back(
1189 i->second->CreateBasicCatalogDirent(params_->enable_mtime_ns));
1190 }
1191 XattrList *xattrs = &default_xattrs_;
1192 if (params_->include_xattrs) {
1193 xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1194 assert(xattrs);
1195 }
1196 catalog_manager_->AddHardlinkGroup(hardlinks,
1197 *xattrs,
1198 group.master->relative_parent_path(),
1199 group.file_chunks);
1200 if (xattrs != &default_xattrs_)
1201 free(xattrs);
1202 }
1203
1204 } // namespace publish
1205