GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/sync_mediator.cc
Date: 2026-05-19 11:45:12
Exec Total Coverage
Lines: 1 613 0.2%
Branches: 0 929 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "sync_mediator.h"
6
7 #include <fcntl.h>
8 #include <inttypes.h>
9 #include <unistd.h>
10
11 #include <cassert>
12 #include <cstdio>
13 #include <cstdlib>
14
15 #include "catalog_virtual.h"
16 #include "compression/compression.h"
17 #include "crypto/hash.h"
18 #include "directory_entry.h"
19 #include "json_document.h"
20 #include "publish/repository.h"
21 #include "sync_union.h"
22 #include "upload.h"
23 #include "util/exception.h"
24 #include "util/fs_traversal.h"
25 #include "util/posix.h"
26 #include "util/string.h"
27
28 using namespace std; // NOLINT
29
30 namespace publish {
31
32 24 AbstractSyncMediator::~AbstractSyncMediator() { }
33
34 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
35 const SyncParameters *params,
36 perf::StatisticsTemplate statistics)
37 : catalog_manager_(catalog_manager)
38 , union_engine_(NULL)
39 , handle_hardlinks_(false)
40 , recursive_fast_delete_(false)
41 , params_(params)
42 , reporter_(new SyncDiffReporter(params_->print_changeset
43 ? SyncDiffReporter::kPrintChanges
44 : SyncDiffReporter::kPrintDots)) {
45 const int retval = pthread_mutex_init(&lock_file_queue_, NULL);
46 assert(retval == 0);
47
48 params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
49
50 counters_ = new perf::FsCounters(statistics);
51 }
52
53 SyncMediator::~SyncMediator() { pthread_mutex_destroy(&lock_file_queue_); }
54
55
56 void SyncMediator::RegisterUnionEngine(SyncUnion *engine) {
57 union_engine_ = engine;
58 handle_hardlinks_ = engine->SupportsHardlinks();
59 }
60
61
62 /**
63 * The entry /.cvmfs or entries in /.cvmfs/ must not be added, removed or
64 * modified manually. The directory /.cvmfs is generated by the VirtualCatalog
65 * class if requested.
66 */
67 void SyncMediator::EnsureAllowed(SharedPtr<SyncItem> entry) {
68 const bool ignore_case_setting = false;
69 const string relative_path = entry->GetRelativePath();
70 if ((relative_path == string(catalog::VirtualCatalog::kVirtualPath))
71 || (HasPrefix(relative_path,
72 string(catalog::VirtualCatalog::kVirtualPath) + "/",
73 ignore_case_setting))) {
74 PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
75 relative_path.c_str());
76 }
77 }
78
79
80 /**
81 * Add an entry to the repository.
82 * Added directories will be traversed in order to add the complete subtree.
83 */
84 void SyncMediator::Add(SharedPtr<SyncItem> entry) {
85 EnsureAllowed(entry);
86
87 if (entry->IsDirectory()) {
88 AddDirectoryRecursively(entry);
89 return;
90 }
91
92 // .cvmfsbundles file type
93 if (entry->IsBundleSpec()) {
94 PrintWarning(".cvmfsbundles file encountered. "
95 "Bundles is currently an experimental feature.");
96
97 if (!entry->IsRegularFile()) {
98 PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
99 }
100 if (entry->HasHardlinks()) {
101 PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
102 }
103
104 const std::string parent_path = GetParentPath(entry->GetUnionPath());
105 if (parent_path != union_engine_->union_path()) {
106 PANIC(kLogStderr,
107 "Error: .cvmfsbundles file must be in the root"
108 " directory of the repository. Found in %s",
109 parent_path.c_str());
110 }
111
112 std::string json_string;
113
114 const int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
115 if (fd < 0) {
116 PANIC(kLogStderr, "Could not open file: %s",
117 entry->GetUnionPath().c_str());
118 }
119 if (!SafeReadToString(fd, &json_string)) {
120 PANIC(kLogStderr, "Could not read contents of file: %s",
121 entry->GetUnionPath().c_str());
122 }
123 const UniquePtr<JsonDocument> json(JsonDocument::Create(json_string));
124
125 AddFile(entry);
126 return;
127 }
128
129 if (entry->IsRegularFile() || entry->IsSymlink()) {
130 // A file is a hard link if the link count is greater than 1
131 if (entry->HasHardlinks() && handle_hardlinks_)
132 InsertHardlink(entry);
133 else
134 AddFile(entry);
135 return;
136 } else if (entry->IsGraftMarker()) {
137 LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
138 return; // Ignore markers.
139 }
140
141 // In OverlayFS whiteouts can be represented as character devices with major
142 // and minor numbers equal to 0. Special files will be ignored except if they
143 // are whiteout files.
144 if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
145 if (params_->ignore_special_files) {
146 PrintWarning("'" + entry->GetRelativePath()
147 + "' "
148 "is a special file, ignoring.");
149 } else {
150 if (entry->HasHardlinks() && handle_hardlinks_)
151 InsertHardlink(entry);
152 else
153 AddFile(entry);
154 }
155 return;
156 }
157
158 PrintWarning("'" + entry->GetRelativePath()
159 + "' cannot be added. Unrecognized file type.");
160 }
161
162
163 /**
164 * Touch an entry in the repository.
165 */
166 void SyncMediator::Touch(SharedPtr<SyncItem> entry) {
167 EnsureAllowed(entry);
168
169 if (entry->IsGraftMarker()) {
170 return;
171 }
172 if (entry->IsDirectory()) {
173 TouchDirectory(entry);
174 perf::Inc(counters_->n_directories_changed);
175 return;
176 }
177
178 if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
179 Replace(entry); // This way, hardlink processing is correct
180 // Replace calls Remove; cancel Remove's actions:
181 perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
182
183 // Count only the difference between the old and new file
184 // Symlinks do not count into added or removed bytes
185 int64_t dif = 0;
186
187 // Need to handle 4 cases (symlink->symlink, symlink->regular,
188 // regular->symlink, regular->regular)
189 if (entry->WasSymlink()) {
190 // Replace calls Remove; cancel Remove's actions:
191 perf::Dec(counters_->n_symlinks_removed);
192
193 if (entry->IsSymlink()) {
194 perf::Inc(counters_->n_symlinks_changed);
195 } else {
196 perf::Inc(counters_->n_symlinks_removed);
197 perf::Inc(counters_->n_files_added);
198 dif += entry->GetScratchSize();
199 }
200 } else {
201 // Replace calls Remove; cancel Remove's actions:
202 perf::Dec(counters_->n_files_removed);
203 dif -= entry->GetRdOnlySize();
204 if (entry->IsSymlink()) {
205 perf::Inc(counters_->n_files_removed);
206 perf::Inc(counters_->n_symlinks_added);
207 } else {
208 perf::Inc(counters_->n_files_changed);
209 dif += entry->GetScratchSize();
210 }
211 }
212
213 if (dif > 0) { // added bytes
214 perf::Xadd(counters_->sz_added_bytes, dif);
215 } else { // removed bytes
216 perf::Xadd(counters_->sz_removed_bytes, -dif);
217 }
218 return;
219 }
220
221 PrintWarning("'" + entry->GetRelativePath()
222 + "' cannot be touched. Unrecognized file type.");
223 }
224
225
226 /**
227 * Remove an entry from the repository. Directories will be recursively removed.
228 */
229 void SyncMediator::Remove(SharedPtr<SyncItem> entry, bool fast_delete) {
230 EnsureAllowed(entry);
231
232 if (entry->WasDirectory()) {
233 RemoveDirectoryRecursively(entry, fast_delete);
234 return;
235 }
236
237 if (entry->WasBundleSpec()) {
238 // for now remove using RemoveFile()
239 RemoveFile(entry);
240 return;
241 }
242
243 if (entry->WasRegularFile() || entry->WasSymlink()
244 || entry->WasSpecialFile()) {
245 RemoveFile(entry);
246 return;
247 }
248
249 PrintWarning("'" + entry->GetRelativePath()
250 + "' cannot be deleted. Unrecognized file type.");
251 }
252
253
254 /**
255 * Remove the old entry and add the new one.
256 */
257 void SyncMediator::Replace(SharedPtr<SyncItem> entry) {
258 // EnsureAllowed(entry); <-- Done by Remove() and Add()
259 Remove(entry);
260 Add(entry);
261 }
262
263 void SyncMediator::Clone(const std::string from, const std::string to) {
264 catalog_manager_->Clone(from, to);
265 }
266
267 void SyncMediator::EnterDirectory(SharedPtr<SyncItem> entry) {
268 if (!handle_hardlinks_) {
269 return;
270 }
271
272 const HardlinkGroupMap new_map;
273 hardlink_stack_.push(new_map);
274 }
275
276
277 void SyncMediator::LeaveDirectory(SharedPtr<SyncItem> entry) {
278 if (!handle_hardlinks_) {
279 return;
280 }
281
282 CompleteHardlinks(entry);
283 AddLocalHardlinkGroups(GetHardlinkMap());
284 hardlink_stack_.pop();
285 }
286
287
288 /**
289 * Do any pending processing and commit all changes to the catalogs.
290 * To be called after change set traversal is finished.
291 */
292 bool SyncMediator::Commit(manifest::Manifest *manifest) {
293 reporter_->CommitReport();
294
295 if (!params_->dry_run) {
296 LogCvmfs(kLogPublish, kLogStdout,
297 "Waiting for upload of files before committing...");
298 params_->spooler->WaitForUpload();
299 }
300
301 if (!hardlink_queue_.empty()) {
302 assert(handle_hardlinks_);
303
304 LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
305 params_->spooler->UnregisterListeners();
306 params_->spooler->RegisterListener(&SyncMediator::PublishHardlinksCallback,
307 this);
308
309 // TODO(rmeusel): Revise that for Thread Safety!
310 // This loop will spool hardlinks into the spooler, which will then
311 // process them.
312 // On completion of every hardlink the spooler will asynchronously
313 // emit callbacks (SyncMediator::PublishHardlinksCallback) which
314 // might happen while this for-loop goes through the hardlink_queue_
315 //
316 // For the moment this seems not to be a problem, but it's an accident
317 // just waiting to happen.
318 //
319 // Note: Just wrapping this loop in a mutex might produce a dead lock
320 // since the spooler does not fill it's processing queue to an
321 // unlimited size. Meaning that it might be flooded with hard-
322 // links and waiting for the queue to be processed while proces-
323 // sing is stalled because the callback is waiting for this
324 // mutex.
325 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
326 iEnd = hardlink_queue_.end();
327 i != iEnd;
328 ++i) {
329 LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
330 i->master->GetUnionPath().c_str());
331 IngestionSource *source = new FileIngestionSource(
332 i->master->GetUnionPath());
333 params_->spooler->Process(source);
334 }
335
336 params_->spooler->WaitForUpload();
337
338 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
339 iEnd = hardlink_queue_.end();
340 i != iEnd;
341 ++i) {
342 LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
343 i->master->GetUnionPath().c_str());
344 AddHardlinkGroup(*i);
345 }
346 }
347
348 if (union_engine_)
349 union_engine_->PostUpload();
350
351 params_->spooler->UnregisterListeners();
352
353 if (params_->dry_run) {
354 manifest = NULL;
355 return true;
356 }
357
358 LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
359 if (params_->spooler->GetNumberOfErrors() > 0) {
360 LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
361 return false;
362 }
363
364 if (catalog_manager_->IsBalanceable()
365 || (params_->virtual_dir_actions
366 != catalog::VirtualCatalog::kActionNone)) {
367 if (catalog_manager_->IsBalanceable())
368 catalog_manager_->Balance();
369 // Commit empty string to ensure that the "content" of the auto catalog
370 // markers is present in the repository.
371 const string empty_file = CreateTempPath(params_->dir_temp + "/empty",
372 0600);
373 IngestionSource *source = new FileIngestionSource(empty_file);
374 params_->spooler->Process(source);
375 params_->spooler->WaitForUpload();
376 unlink(empty_file.c_str());
377 if (params_->spooler->GetNumberOfErrors() > 0) {
378 LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
379 return false;
380 }
381 }
382 catalog_manager_->PrecalculateListings();
383 return catalog_manager_->Commit(params_->stop_for_catalog_tweaks,
384 params_->manual_revision, manifest);
385 }
386
387
388 void SyncMediator::InsertHardlink(SharedPtr<SyncItem> entry) {
389 assert(handle_hardlinks_);
390
391 const uint64_t inode = entry->GetUnionInode();
392 LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
393 inode, entry->GetUnionPath().c_str());
394
395 // Find the hard link group in the lists
396 const HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(
397 inode);
398
399 if (hardlink_group == GetHardlinkMap().end()) {
400 // Create a new hardlink group
401 GetHardlinkMap().insert(
402 HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
403 } else {
404 // Append the file to the appropriate hardlink group
405 hardlink_group->second.AddHardlink(entry);
406 }
407
408 // publish statistics counting for new file
409 if (entry->IsNew()) {
410 perf::Inc(counters_->n_files_added);
411 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
412 }
413 }
414
415
416 void SyncMediator::InsertLegacyHardlink(SharedPtr<SyncItem> entry) {
417 // Check if found file has hardlinks (nlink > 1)
418 // As we are looking through all files in one directory here, there might be
419 // completely untouched hardlink groups, which we can safely skip.
420 // Finally we have to see if the hardlink is already part of this group
421
422 assert(handle_hardlinks_);
423
424 if (entry->GetUnionLinkcount() < 2)
425 return;
426
427 const uint64_t inode = entry->GetUnionInode();
428 HardlinkGroupMap::iterator hl_group;
429 hl_group = GetHardlinkMap().find(inode);
430
431 if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
432 bool found = false;
433
434 // search for the entry in this group
435 for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
436 iEnd = hl_group->second.hardlinks.end();
437 i != iEnd;
438 ++i) {
439 if (*(i->second) == *entry) {
440 found = true;
441 break;
442 }
443 }
444
445 if (!found) {
446 // Hardlink already in the group?
447 // If one element of a hardlink group is edited, all elements must be
448 // replaced. Here, we remove an untouched hardlink and add it to its
449 // hardlink group for re-adding later
450 LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
451 entry->GetUnionPath().c_str());
452 Remove(entry);
453 hl_group->second.AddHardlink(entry);
454 }
455 }
456 }
457
458
459 /**
460 * Create a recursion engine which DOES NOT recurse into directories.
461 * It basically goes through the current directory (in the union volume) and
462 * searches for legacy hardlinks which has to be connected to the new
463 * or edited ones.
464 */
465 void SyncMediator::CompleteHardlinks(SharedPtr<SyncItem> entry) {
466 assert(handle_hardlinks_);
467
468 // If no hardlink in this directory was changed, we can skip this
469 if (GetHardlinkMap().empty())
470 return;
471
472 LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
473 entry->GetUnionPath().c_str());
474
475 // Look for legacy hardlinks
476 FileSystemTraversal<SyncMediator> traversal(this, union_engine_->union_path(),
477 false);
478 traversal.fn_new_file = &SyncMediator::LegacyRegularHardlinkCallback;
479 traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
480 traversal.fn_new_character_dev = &SyncMediator::
481 LegacyCharacterDeviceHardlinkCallback;
482 traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
483 traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
484 traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
485 traversal.Recurse(entry->GetUnionPath());
486 }
487
488
489 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
490 const string &file_name) {
491 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
492 kItemFile);
493 InsertLegacyHardlink(entry);
494 }
495
496
497 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
498 const string &file_name) {
499 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
500 kItemSymlink);
501 InsertLegacyHardlink(entry);
502 }
503
504 void SyncMediator::LegacyCharacterDeviceHardlinkCallback(
505 const string &parent_dir, const string &file_name) {
506 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
507 kItemCharacterDevice);
508 InsertLegacyHardlink(entry);
509 }
510
511 void SyncMediator::LegacyBlockDeviceHardlinkCallback(const string &parent_dir,
512 const string &file_name) {
513 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
514 kItemBlockDevice);
515 InsertLegacyHardlink(entry);
516 }
517
518 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
519 const string &file_name) {
520 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
521 kItemFifo);
522 InsertLegacyHardlink(entry);
523 }
524
525 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
526 const string &file_name) {
527 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
528 kItemSocket);
529 InsertLegacyHardlink(entry);
530 }
531
532
533 void SyncMediator::AddDirectoryRecursively(SharedPtr<SyncItem> entry) {
534 AddDirectory(entry);
535
536 // Create a recursion engine, which recursively adds all entries in a newly
537 // created directory
538 FileSystemTraversal<SyncMediator> traversal(
539 this, union_engine_->scratch_path(), true);
540 traversal.fn_enter_dir = &SyncMediator::EnterAddedDirectoryCallback;
541 traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
542 traversal.fn_new_file = &SyncMediator::AddFileCallback;
543 traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
544 traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
545 traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
546 traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
547 traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
548 traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
549 traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
550 traversal.Recurse(entry->GetScratchPath());
551 }
552
553
554 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
555 const std::string &dir_name) {
556 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
557 kItemDir);
558 AddDirectory(entry);
559 return true; // The recursion engine should recurse deeper here
560 }
561
562
563 void SyncMediator::AddFileCallback(const std::string &parent_dir,
564 const std::string &file_name) {
565 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
566 kItemFile);
567 Add(entry);
568 }
569
570
571 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
572 const std::string &file_name) {
573 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
574 kItemCharacterDevice);
575 Add(entry);
576 }
577
578 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
579 const std::string &file_name) {
580 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
581 kItemBlockDevice);
582 Add(entry);
583 }
584
585 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
586 const std::string &file_name) {
587 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
588 kItemFifo);
589 Add(entry);
590 }
591
592 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
593 const std::string &file_name) {
594 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
595 kItemSocket);
596 Add(entry);
597 }
598
599 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
600 const std::string &link_name) {
601 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
602 kItemSymlink);
603 Add(entry);
604 }
605
606
607 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
608 const std::string &dir_name) {
609 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
610 kItemDir);
611 EnterDirectory(entry);
612 }
613
614
615 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
616 const std::string &dir_name) {
617 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
618 kItemDir);
619 LeaveDirectory(entry);
620 }
621
622
623 void SyncMediator::RemoveDirectoryRecursively(SharedPtr<SyncItem> entry,
624 bool fast_delete) {
625 const std::string directory_path = entry->GetRelativePath();
626
627 // Fast delete: skip filesystem traversal for nested catalog directories.
628 // Instead of recursively walking the filesystem and removing each entry,
629 // we just remove the nested catalog reference from the parent catalog
630 // (with merge=false so entries are not copied to the parent) and then
631 // remove the mountpoint directory entry.
632 if (fast_delete && catalog_manager_->IsTransitionPoint(directory_path)) {
633 // Get the nested catalog's counters before removal so we can update
634 // publish statistics with the total number of removed entries
635 std::string subcatalog_path;
636 shash::Any hash;
637 PathString ps_path;
638 ps_path.Assign(directory_path.data(), directory_path.length());
639 const catalog::Counters counters =
640 catalog_manager_->LookupCounters(ps_path, &subcatalog_path, &hash);
641 {
642 perf::Xadd(counters_->n_files_removed,
643 static_cast<int64_t>(counters.self.regular_files
644 + counters.subtree.regular_files));
645 perf::Xadd(counters_->n_directories_removed,
646 static_cast<int64_t>(counters.self.directories
647 + counters.subtree.directories));
648 perf::Xadd(counters_->n_symlinks_removed,
649 static_cast<int64_t>(counters.self.symlinks
650 + counters.subtree.symlinks));
651 perf::Xadd(counters_->sz_removed_bytes,
652 static_cast<int64_t>(counters.self.file_size
653 + counters.subtree.file_size));
654 }
655
656 // Remove nested catalog (merge=false: just remove the reference and
657 // adjust parent subtree counters, don't copy entries into parent)
658 const std::string notice = "Nested catalog at " + entry->GetUnionPath();
659 reporter_->OnRemove(notice, catalog::DirectoryEntry());
660 if (!params_->dry_run) {
661 catalog_manager_->RemoveNestedCatalog(directory_path, false);
662 }
663
664 // Remove the mountpoint directory entry from the parent catalog
665 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
666 if (!params_->dry_run) {
667 catalog_manager_->RemoveDirectory(directory_path);
668 }
669 perf::Inc(counters_->n_directories_removed);
670
671 return;
672 }
673
674 // Normal path: delete a directory AFTER it was emptied here,
675 // because it would start up another recursion.
676 // Propagate fast_delete so that any nested catalog transition points
677 // encountered during the traversal are still fast-deleted.
678 const bool prev_fast_delete = recursive_fast_delete_;
679 if (fast_delete) recursive_fast_delete_ = true;
680
681 const bool recurse = false;
682 FileSystemTraversal<SyncMediator> traversal(
683 this, union_engine_->rdonly_path(), recurse);
684 traversal.fn_new_file = &SyncMediator::RemoveFileCallback;
685 traversal.fn_new_dir_postfix = &SyncMediator::RemoveDirectoryCallback;
686 traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
687 traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
688 traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
689 traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
690 traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
691 traversal.Recurse(entry->GetRdOnlyPath());
692
693 recursive_fast_delete_ = prev_fast_delete;
694
695 // The given directory was emptied recursively and can now itself be deleted
696 RemoveDirectory(entry);
697 }
698
699
700 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
701 const std::string &file_name) {
702 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
703 kItemFile);
704 Remove(entry);
705 }
706
707
708 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
709 const std::string &link_name) {
710 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
711 kItemSymlink);
712 Remove(entry);
713 }
714
715 void SyncMediator::RemoveCharacterDeviceCallback(const std::string &parent_dir,
716 const std::string &link_name) {
717 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
718 kItemCharacterDevice);
719 Remove(entry);
720 }
721
722 void SyncMediator::RemoveBlockDeviceCallback(const std::string &parent_dir,
723 const std::string &link_name) {
724 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
725 kItemBlockDevice);
726 Remove(entry);
727 }
728
729 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
730 const std::string &link_name) {
731 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
732 kItemFifo);
733 Remove(entry);
734 }
735
736 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
737 const std::string &link_name) {
738 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
739 kItemSocket);
740 Remove(entry);
741 }
742
743 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
744 const std::string &dir_name) {
745 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name,
746 kItemDir);
747 RemoveDirectoryRecursively(entry, recursive_fast_delete_);
748 }
749
750
751 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
752 const std::string &file_name) {
753 if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
754 return true;
755 }
756
757 const SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
758 kItemUnknown);
759 return entry->IsWhiteout();
760 }
761
762 SharedPtr<SyncItem> SyncMediator::CreateSyncItem(
763 const std::string &relative_parent_path, const std::string &filename,
764 const SyncItemType entry_type) const {
765 return union_engine_->CreateSyncItem(relative_parent_path, filename,
766 entry_type);
767 }
768
769 void SyncMediator::PublishFilesCallback(const upload::SpoolerResult &result) {
770 LogCvmfs(kLogPublish, kLogVerboseMsg,
771 "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
772 result.local_path.c_str(), result.content_hash.ToString().c_str(),
773 result.file_chunks.size(), result.return_code);
774 if (result.return_code != 0) {
775 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
776 result.return_code);
777 }
778
779 SyncItemList::iterator itr;
780 {
781 const MutexLockGuard guard(lock_file_queue_);
782 itr = file_queue_.find(result.local_path);
783 }
784
785 assert(itr != file_queue_.end());
786
787 SyncItem &item = *itr->second;
788 item.SetContentHash(result.content_hash);
789 item.SetCompressionAlgorithm(result.compression_alg);
790
791 XattrList *xattrs = &default_xattrs_;
792 if (params_->include_xattrs) {
793 xattrs = XattrList::CreateFromFile(result.local_path);
794 assert(xattrs != NULL);
795 }
796
797 if (result.IsChunked()) {
798 catalog_manager_->AddChunkedFile(
799 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
800 *xattrs,
801 item.relative_parent_path(),
802 result.file_chunks);
803 } else {
804 catalog_manager_->AddFile(
805 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
806 *xattrs,
807 item.relative_parent_path());
808 }
809
810 if (xattrs != &default_xattrs_)
811 free(xattrs);
812 }
813
814
815 void SyncMediator::PublishHardlinksCallback(
816 const upload::SpoolerResult &result) {
817 LogCvmfs(kLogPublish, kLogVerboseMsg,
818 "Spooler callback for hardlink %s, digest %s, retval %d",
819 result.local_path.c_str(), result.content_hash.ToString().c_str(),
820 result.return_code);
821 if (result.return_code != 0) {
822 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
823 result.return_code);
824 }
825
826 bool found = false;
827 for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
828 if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
829 found = true;
830 hardlink_queue_[i].master->SetContentHash(result.content_hash);
831 SyncItemList::iterator j, jend;
832 for (j = hardlink_queue_[i].hardlinks.begin(),
833 jend = hardlink_queue_[i].hardlinks.end();
834 j != jend;
835 ++j) {
836 j->second->SetContentHash(result.content_hash);
837 j->second->SetCompressionAlgorithm(result.compression_alg);
838 }
839 if (result.IsChunked())
840 hardlink_queue_[i].file_chunks = result.file_chunks;
841
842 break;
843 }
844 }
845
846 assert(found);
847 }
848
849
850 void SyncMediator::CreateNestedCatalog(SharedPtr<SyncItem> directory) {
851 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
852 reporter_->OnAdd(notice, catalog::DirectoryEntry());
853
854 if (!params_->dry_run) {
855 catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
856 }
857 }
858
859
860 void SyncMediator::RemoveNestedCatalog(SharedPtr<SyncItem> directory) {
861 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
862 reporter_->OnRemove(notice, catalog::DirectoryEntry());
863
864 if (!params_->dry_run) {
865 catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
866 }
867 }
868
869 void SyncDiffReporter::OnInit(const history::History::Tag & /*from_tag*/,
870 const history::History::Tag & /*to_tag*/) { }
871
872 void SyncDiffReporter::OnStats(const catalog::DeltaCounters & /*delta*/) { }
873
874 void SyncDiffReporter::OnAdd(const std::string &path,
875 const catalog::DirectoryEntry & /*entry*/) {
876 changed_items_++;
877 AddImpl(path);
878 }
879 void SyncDiffReporter::OnRemove(const std::string &path,
880 const catalog::DirectoryEntry & /*entry*/) {
881 changed_items_++;
882 RemoveImpl(path);
883 }
884 void SyncDiffReporter::OnModify(const std::string &path,
885 const catalog::DirectoryEntry & /*entry_from*/,
886 const catalog::DirectoryEntry & /*entry_to*/) {
887 changed_items_++;
888 ModifyImpl(path);
889 }
890
891 void SyncDiffReporter::CommitReport() {
892 if (print_action_ == kPrintDots) {
893 if (changed_items_ >= processing_dot_interval_) {
894 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, "\n");
895 }
896 }
897 }
898
899 void SyncDiffReporter::PrintDots() {
900 if (changed_items_ % processing_dot_interval_ == 0) {
901 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, ".");
902 }
903 }
904
905 void SyncDiffReporter::AddImpl(const std::string &path) {
906 const char *action_label;
907
908 switch (print_action_) {
909 case kPrintChanges:
910 if (path.at(0) != '/') {
911 action_label = "[x-catalog-add]";
912 } else {
913 action_label = "[add]";
914 }
915 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
916 break;
917
918 case kPrintDots:
919 PrintDots();
920 break;
921 default:
922 assert("Invalid print action.");
923 }
924 }
925
926 void SyncDiffReporter::RemoveImpl(const std::string &path) {
927 const char *action_label;
928
929 switch (print_action_) {
930 case kPrintChanges:
931 if (path.at(0) != '/') {
932 action_label = "[x-catalog-rem]";
933 } else {
934 action_label = "[rem]";
935 }
936
937 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
938 break;
939
940 case kPrintDots:
941 PrintDots();
942 break;
943 default:
944 assert("Invalid print action.");
945 }
946 }
947
948 void SyncDiffReporter::ModifyImpl(const std::string &path) {
949 const char *action_label;
950
951 switch (print_action_) {
952 case kPrintChanges:
953 action_label = "[mod]";
954 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
955 break;
956
957 case kPrintDots:
958 PrintDots();
959 break;
960 default:
961 assert("Invalid print action.");
962 }
963 }
964
965 void SyncMediator::AddFile(SharedPtr<SyncItem> entry) {
966 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
967
968 if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
969 assert(!entry->HasGraftMarker());
970 // Symlinks and special files are completely stored in the catalog
971 XattrList *xattrs = &default_xattrs_;
972 if (params_->include_xattrs) {
973 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
974 assert(xattrs);
975 }
976 catalog_manager_->AddFile(
977 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
978 *xattrs,
979 entry->relative_parent_path());
980 if (xattrs != &default_xattrs_)
981 free(xattrs);
982 } else if (entry->HasGraftMarker() && !params_->dry_run) {
983 if (entry->IsValidGraft()) {
984 // Graft files are added to catalog immediately.
985 if (entry->IsChunkedGraft()) {
986 catalog_manager_->AddChunkedFile(
987 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
988 default_xattrs_,
989 entry->relative_parent_path(),
990 *(entry->GetGraftChunks()));
991 } else {
992 catalog_manager_->AddFile(
993 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
994 default_xattrs_, // TODO(bbockelm): For now, use default xattrs
995 // on grafted files.
996 entry->relative_parent_path());
997 }
998 } else {
999 // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
1000 // the graft file is missing information. It's not clear that continuing
1001 // forward with the publish is the correct thing to do; abort for now.
1002 PANIC(kLogStderr,
1003 "Encountered a grafted file (%s) with "
1004 "invalid grafting information; check contents of .cvmfsgraft-*"
1005 " file. Aborting publish.",
1006 entry->GetRelativePath().c_str());
1007 }
1008 } else if (entry->relative_parent_path().empty()
1009 && entry->IsCatalogMarker()) {
1010 PANIC(kLogStderr, "Error: nested catalog marker in root directory");
1011 } else if (!params_->dry_run) {
1012 {
1013 // Push the file to the spooler, remember the entry for the path
1014 const MutexLockGuard m(&lock_file_queue_);
1015 file_queue_[entry->GetUnionPath()] = entry;
1016 }
1017 // Spool the file
1018 params_->spooler->Process(entry->CreateIngestionSource());
1019 }
1020
1021 // publish statistics counting for new file
1022 if (entry->IsNew()) {
1023 if (entry->IsSymlink()) {
1024 perf::Inc(counters_->n_symlinks_added);
1025 } else {
1026 perf::Inc(counters_->n_files_added);
1027 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
1028 }
1029 }
1030 }
1031
1032 void SyncMediator::RemoveFile(SharedPtr<SyncItem> entry) {
1033 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1034
1035 if (!params_->dry_run) {
1036 if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
1037 LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
1038 entry->GetUnionPath().c_str());
1039 catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
1040 }
1041 catalog_manager_->RemoveFile(entry->GetRelativePath());
1042 }
1043
1044 // Counting nr of removed files and removed bytes
1045 if (entry->WasSymlink()) {
1046 perf::Inc(counters_->n_symlinks_removed);
1047 } else {
1048 perf::Inc(counters_->n_files_removed);
1049 }
1050 perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
1051 }
1052
1053 void SyncMediator::AddUnmaterializedDirectory(SharedPtr<SyncItem> entry) {
1054 AddDirectory(entry);
1055 }
1056
1057 void SyncMediator::AddDirectory(SharedPtr<SyncItem> entry) {
1058 if (entry->IsBundleSpec()) {
1059 PANIC(kLogStderr,
1060 "Illegal directory name: .cvmfsbundles (%s). "
1061 ".cvmfsbundles is reserved for bundles specification files",
1062 entry->GetUnionPath().c_str());
1063 }
1064
1065 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1066
1067 perf::Inc(counters_->n_directories_added);
1068 assert(!entry->HasGraftMarker());
1069 if (!params_->dry_run) {
1070 XattrList *xattrs = &default_xattrs_;
1071 if (params_->include_xattrs) {
1072 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1073 assert(xattrs);
1074 }
1075 catalog_manager_->AddDirectory(
1076 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1077 entry->relative_parent_path());
1078 if (xattrs != &default_xattrs_)
1079 free(xattrs);
1080 }
1081
1082 if (entry->HasCatalogMarker()
1083 && !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1084 CreateNestedCatalog(entry);
1085 }
1086 }
1087
1088
1089 /**
1090 * this method deletes a single directory entry! Make sure to empty it
1091 * before you call this method or simply use
1092 * SyncMediator::RemoveDirectoryRecursively instead.
1093 */
1094 void SyncMediator::RemoveDirectory(SharedPtr<SyncItem> entry) {
1095 const std::string directory_path = entry->GetRelativePath();
1096
1097 if (catalog_manager_->IsTransitionPoint(directory_path)) {
1098 RemoveNestedCatalog(entry);
1099 }
1100
1101 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1102 if (!params_->dry_run) {
1103 catalog_manager_->RemoveDirectory(directory_path);
1104 }
1105
1106 perf::Inc(counters_->n_directories_removed);
1107 }
1108
1109 void SyncMediator::TouchDirectory(SharedPtr<SyncItem> entry) {
1110 reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1111 catalog::DirectoryEntry());
1112
1113 const std::string directory_path = entry->GetRelativePath();
1114
1115 if (!params_->dry_run) {
1116 XattrList *xattrs = &default_xattrs_;
1117 if (params_->include_xattrs) {
1118 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1119 assert(xattrs);
1120 }
1121 catalog_manager_->TouchDirectory(
1122 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1123 directory_path);
1124 if (xattrs != &default_xattrs_)
1125 free(xattrs);
1126 }
1127
1128 if (entry->HasCatalogMarker()
1129 && !catalog_manager_->IsTransitionPoint(directory_path)) {
1130 CreateNestedCatalog(entry);
1131 } else if (!entry->HasCatalogMarker()
1132 && catalog_manager_->IsTransitionPoint(directory_path)) {
1133 RemoveNestedCatalog(entry);
1134 }
1135 }
1136
1137 /**
1138 * All hardlinks in the current directory have been picked up. Now they are
1139 * added to the catalogs.
1140 */
1141 void SyncMediator::AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks) {
1142 assert(handle_hardlinks_);
1143
1144 for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1145 iEnd = hardlinks.end();
1146 i != iEnd;
1147 ++i) {
1148 if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()
1149 && !params_->ignore_xdir_hardlinks) {
1150 PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1151 i->second.master->GetUnionPath().c_str());
1152 }
1153
1154 if (params_->print_changeset) {
1155 for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1156 jEnd = i->second.hardlinks.end();
1157 j != jEnd;
1158 ++j) {
1159 const std::string changeset_notice = GetParentPath(i->second.master
1160 ->GetUnionPath())
1161 + "/" + j->second->filename();
1162 reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1163 }
1164 }
1165
1166 if (params_->dry_run)
1167 continue;
1168
1169 if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1170 AddHardlinkGroup(i->second);
1171 else
1172 hardlink_queue_.push_back(i->second);
1173 }
1174 }
1175
1176
1177 void SyncMediator::AddHardlinkGroup(const HardlinkGroup &group) {
1178 assert(handle_hardlinks_);
1179
1180 // Create a DirectoryEntry list out of the hardlinks
1181 catalog::DirectoryEntryBaseList hardlinks;
1182 for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1183 iEnd = group.hardlinks.end();
1184 i != iEnd;
1185 ++i) {
1186 hardlinks.push_back(
1187 i->second->CreateBasicCatalogDirent(params_->enable_mtime_ns));
1188 }
1189 XattrList *xattrs = &default_xattrs_;
1190 if (params_->include_xattrs) {
1191 xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1192 assert(xattrs);
1193 }
1194 catalog_manager_->AddHardlinkGroup(hardlinks,
1195 *xattrs,
1196 group.master->relative_parent_path(),
1197 group.file_chunks);
1198 if (xattrs != &default_xattrs_)
1199 free(xattrs);
1200 }
1201
1202 } // namespace publish
1203