GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/sync_mediator.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 1 579 0.2%
Branches: 0 887 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #define __STDC_FORMAT_MACROS
6
7 #include "sync_mediator.h"
8
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16
17 #include "catalog_virtual.h"
18 #include "compression/compression.h"
19 #include "crypto/hash.h"
20 #include "directory_entry.h"
21 #include "json_document.h"
22 #include "publish/repository.h"
23 #include "sync_union.h"
24 #include "upload.h"
25 #include "util/concurrency.h"
26 #include "util/exception.h"
27 #include "util/fs_traversal.h"
28 #include "util/posix.h"
29 #include "util/smalloc.h"
30 #include "util/string.h"
31
32 using namespace std; // NOLINT
33
34 namespace publish {
35
36 84 AbstractSyncMediator::~AbstractSyncMediator() { }
37
38 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
39 const SyncParameters *params,
40 perf::StatisticsTemplate statistics)
41 : catalog_manager_(catalog_manager)
42 , union_engine_(NULL)
43 , handle_hardlinks_(false)
44 , params_(params)
45 , reporter_(new SyncDiffReporter(params_->print_changeset
46 ? SyncDiffReporter::kPrintChanges
47 : SyncDiffReporter::kPrintDots)) {
48 const int retval = pthread_mutex_init(&lock_file_queue_, NULL);
49 assert(retval == 0);
50
51 params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
52
53 counters_ = new perf::FsCounters(statistics);
54 }
55
56 SyncMediator::~SyncMediator() { pthread_mutex_destroy(&lock_file_queue_); }
57
58
59 void SyncMediator::RegisterUnionEngine(SyncUnion *engine) {
60 union_engine_ = engine;
61 handle_hardlinks_ = engine->SupportsHardlinks();
62 }
63
64
65 /**
66 * The entry /.cvmfs or entries in /.cvmfs/ must not be added, removed or
67 * modified manually. The directory /.cvmfs is generated by the VirtualCatalog
68 * class if requested.
69 */
70 void SyncMediator::EnsureAllowed(SharedPtr<SyncItem> entry) {
71 const bool ignore_case_setting = false;
72 const string relative_path = entry->GetRelativePath();
73 if ((relative_path == string(catalog::VirtualCatalog::kVirtualPath))
74 || (HasPrefix(relative_path,
75 string(catalog::VirtualCatalog::kVirtualPath) + "/",
76 ignore_case_setting))) {
77 PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
78 relative_path.c_str());
79 }
80 }
81
82
83 /**
84 * Add an entry to the repository.
85 * Added directories will be traversed in order to add the complete subtree.
86 */
87 void SyncMediator::Add(SharedPtr<SyncItem> entry) {
88 EnsureAllowed(entry);
89
90 if (entry->IsDirectory()) {
91 AddDirectoryRecursively(entry);
92 return;
93 }
94
95 // .cvmfsbundles file type
96 if (entry->IsBundleSpec()) {
97 PrintWarning(".cvmfsbundles file encountered. "
98 "Bundles is currently an experimental feature.");
99
100 if (!entry->IsRegularFile()) {
101 PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
102 }
103 if (entry->HasHardlinks()) {
104 PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
105 }
106
107 const std::string parent_path = GetParentPath(entry->GetUnionPath());
108 if (parent_path != union_engine_->union_path()) {
109 PANIC(kLogStderr,
110 "Error: .cvmfsbundles file must be in the root"
111 " directory of the repository. Found in %s",
112 parent_path.c_str());
113 }
114
115 std::string json_string;
116
117 const int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
118 if (fd < 0) {
119 PANIC(kLogStderr, "Could not open file: %s",
120 entry->GetUnionPath().c_str());
121 }
122 if (!SafeReadToString(fd, &json_string)) {
123 PANIC(kLogStderr, "Could not read contents of file: %s",
124 entry->GetUnionPath().c_str());
125 }
126 const UniquePtr<JsonDocument> json(JsonDocument::Create(json_string));
127
128 AddFile(entry);
129 return;
130 }
131
132 if (entry->IsRegularFile() || entry->IsSymlink()) {
133 // A file is a hard link if the link count is greater than 1
134 if (entry->HasHardlinks() && handle_hardlinks_)
135 InsertHardlink(entry);
136 else
137 AddFile(entry);
138 return;
139 } else if (entry->IsGraftMarker()) {
140 LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
141 return; // Ignore markers.
142 }
143
144 // In OverlayFS whiteouts can be represented as character devices with major
145 // and minor numbers equal to 0. Special files will be ignored except if they
146 // are whiteout files.
147 if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
148 if (params_->ignore_special_files) {
149 PrintWarning("'" + entry->GetRelativePath()
150 + "' "
151 "is a special file, ignoring.");
152 } else {
153 if (entry->HasHardlinks() && handle_hardlinks_)
154 InsertHardlink(entry);
155 else
156 AddFile(entry);
157 }
158 return;
159 }
160
161 PrintWarning("'" + entry->GetRelativePath()
162 + "' cannot be added. Unrecognized file type.");
163 }
164
165
166 /**
167 * Touch an entry in the repository.
168 */
169 void SyncMediator::Touch(SharedPtr<SyncItem> entry) {
170 EnsureAllowed(entry);
171
172 if (entry->IsGraftMarker()) {
173 return;
174 }
175 if (entry->IsDirectory()) {
176 TouchDirectory(entry);
177 perf::Inc(counters_->n_directories_changed);
178 return;
179 }
180
181 if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
182 Replace(entry); // This way, hardlink processing is correct
183 // Replace calls Remove; cancel Remove's actions:
184 perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
185
186 // Count only the difference between the old and new file
187 // Symlinks do not count into added or removed bytes
188 int64_t dif = 0;
189
190 // Need to handle 4 cases (symlink->symlink, symlink->regular,
191 // regular->symlink, regular->regular)
192 if (entry->WasSymlink()) {
193 // Replace calls Remove; cancel Remove's actions:
194 perf::Dec(counters_->n_symlinks_removed);
195
196 if (entry->IsSymlink()) {
197 perf::Inc(counters_->n_symlinks_changed);
198 } else {
199 perf::Inc(counters_->n_symlinks_removed);
200 perf::Inc(counters_->n_files_added);
201 dif += entry->GetScratchSize();
202 }
203 } else {
204 // Replace calls Remove; cancel Remove's actions:
205 perf::Dec(counters_->n_files_removed);
206 dif -= entry->GetRdOnlySize();
207 if (entry->IsSymlink()) {
208 perf::Inc(counters_->n_files_removed);
209 perf::Inc(counters_->n_symlinks_added);
210 } else {
211 perf::Inc(counters_->n_files_changed);
212 dif += entry->GetScratchSize();
213 }
214 }
215
216 if (dif > 0) { // added bytes
217 perf::Xadd(counters_->sz_added_bytes, dif);
218 } else { // removed bytes
219 perf::Xadd(counters_->sz_removed_bytes, -dif);
220 }
221 return;
222 }
223
224 PrintWarning("'" + entry->GetRelativePath()
225 + "' cannot be touched. Unrecognized file type.");
226 }
227
228
229 /**
230 * Remove an entry from the repository. Directories will be recursively removed.
231 */
232 void SyncMediator::Remove(SharedPtr<SyncItem> entry) {
233 EnsureAllowed(entry);
234
235 if (entry->WasDirectory()) {
236 RemoveDirectoryRecursively(entry);
237 return;
238 }
239
240 if (entry->WasBundleSpec()) {
241 // for now remove using RemoveFile()
242 RemoveFile(entry);
243 return;
244 }
245
246 if (entry->WasRegularFile() || entry->WasSymlink()
247 || entry->WasSpecialFile()) {
248 RemoveFile(entry);
249 return;
250 }
251
252 PrintWarning("'" + entry->GetRelativePath()
253 + "' cannot be deleted. Unrecognized file type.");
254 }
255
256
257 /**
258 * Remove the old entry and add the new one.
259 */
260 void SyncMediator::Replace(SharedPtr<SyncItem> entry) {
261 // EnsureAllowed(entry); <-- Done by Remove() and Add()
262 Remove(entry);
263 Add(entry);
264 }
265
266 void SyncMediator::Clone(const std::string from, const std::string to) {
267 catalog_manager_->Clone(from, to);
268 }
269
270 void SyncMediator::EnterDirectory(SharedPtr<SyncItem> entry) {
271 if (!handle_hardlinks_) {
272 return;
273 }
274
275 const HardlinkGroupMap new_map;
276 hardlink_stack_.push(new_map);
277 }
278
279
280 void SyncMediator::LeaveDirectory(SharedPtr<SyncItem> entry) {
281 if (!handle_hardlinks_) {
282 return;
283 }
284
285 CompleteHardlinks(entry);
286 AddLocalHardlinkGroups(GetHardlinkMap());
287 hardlink_stack_.pop();
288 }
289
290
291 /**
292 * Do any pending processing and commit all changes to the catalogs.
293 * To be called after change set traversal is finished.
294 */
295 bool SyncMediator::Commit(manifest::Manifest *manifest) {
296 reporter_->CommitReport();
297
298 if (!params_->dry_run) {
299 LogCvmfs(kLogPublish, kLogStdout,
300 "Waiting for upload of files before committing...");
301 params_->spooler->WaitForUpload();
302 }
303
304 if (!hardlink_queue_.empty()) {
305 assert(handle_hardlinks_);
306
307 LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
308 params_->spooler->UnregisterListeners();
309 params_->spooler->RegisterListener(&SyncMediator::PublishHardlinksCallback,
310 this);
311
312 // TODO(rmeusel): Revise that for Thread Safety!
313 // This loop will spool hardlinks into the spooler, which will then
314 // process them.
315 // On completion of every hardlink the spooler will asynchronously
316 // emit callbacks (SyncMediator::PublishHardlinksCallback) which
317 // might happen while this for-loop goes through the hardlink_queue_
318 //
319 // For the moment this seems not to be a problem, but it's an accident
320 // just waiting to happen.
321 //
322 // Note: Just wrapping this loop in a mutex might produce a dead lock
323 // since the spooler does not fill it's processing queue to an
324 // unlimited size. Meaning that it might be flooded with hard-
325 // links and waiting for the queue to be processed while proces-
326 // sing is stalled because the callback is waiting for this
327 // mutex.
328 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
329 iEnd = hardlink_queue_.end();
330 i != iEnd;
331 ++i) {
332 LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
333 i->master->GetUnionPath().c_str());
334 IngestionSource *source = new FileIngestionSource(
335 i->master->GetUnionPath());
336 params_->spooler->Process(source);
337 }
338
339 params_->spooler->WaitForUpload();
340
341 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
342 iEnd = hardlink_queue_.end();
343 i != iEnd;
344 ++i) {
345 LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
346 i->master->GetUnionPath().c_str());
347 AddHardlinkGroup(*i);
348 }
349 }
350
351 if (union_engine_)
352 union_engine_->PostUpload();
353
354 params_->spooler->UnregisterListeners();
355
356 if (params_->dry_run) {
357 manifest = NULL;
358 return true;
359 }
360
361 LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
362 if (params_->spooler->GetNumberOfErrors() > 0) {
363 LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
364 return false;
365 }
366
367 if (catalog_manager_->IsBalanceable()
368 || (params_->virtual_dir_actions
369 != catalog::VirtualCatalog::kActionNone)) {
370 if (catalog_manager_->IsBalanceable())
371 catalog_manager_->Balance();
372 // Commit empty string to ensure that the "content" of the auto catalog
373 // markers is present in the repository.
374 const string empty_file =
375 CreateTempPath(params_->dir_temp + "/empty", 0600);
376 IngestionSource *source = new FileIngestionSource(empty_file);
377 params_->spooler->Process(source);
378 params_->spooler->WaitForUpload();
379 unlink(empty_file.c_str());
380 if (params_->spooler->GetNumberOfErrors() > 0) {
381 LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
382 return false;
383 }
384 }
385 catalog_manager_->PrecalculateListings();
386 return catalog_manager_->Commit(
387 params_->stop_for_catalog_tweaks, params_->manual_revision, manifest);
388 }
389
390
391 void SyncMediator::InsertHardlink(SharedPtr<SyncItem> entry) {
392 assert(handle_hardlinks_);
393
394 const uint64_t inode = entry->GetUnionInode();
395 LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
396 inode, entry->GetUnionPath().c_str());
397
398 // Find the hard link group in the lists
399 const HardlinkGroupMap::iterator hardlink_group =
400 GetHardlinkMap().find(inode);
401
402 if (hardlink_group == GetHardlinkMap().end()) {
403 // Create a new hardlink group
404 GetHardlinkMap().insert(
405 HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
406 } else {
407 // Append the file to the appropriate hardlink group
408 hardlink_group->second.AddHardlink(entry);
409 }
410
411 // publish statistics counting for new file
412 if (entry->IsNew()) {
413 perf::Inc(counters_->n_files_added);
414 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
415 }
416 }
417
418
419 void SyncMediator::InsertLegacyHardlink(SharedPtr<SyncItem> entry) {
420 // Check if found file has hardlinks (nlink > 1)
421 // As we are looking through all files in one directory here, there might be
422 // completely untouched hardlink groups, which we can safely skip.
423 // Finally we have to see if the hardlink is already part of this group
424
425 assert(handle_hardlinks_);
426
427 if (entry->GetUnionLinkcount() < 2)
428 return;
429
430 const uint64_t inode = entry->GetUnionInode();
431 HardlinkGroupMap::iterator hl_group;
432 hl_group = GetHardlinkMap().find(inode);
433
434 if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
435 bool found = false;
436
437 // search for the entry in this group
438 for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
439 iEnd = hl_group->second.hardlinks.end();
440 i != iEnd;
441 ++i) {
442 if (*(i->second) == *entry) {
443 found = true;
444 break;
445 }
446 }
447
448 if (!found) {
449 // Hardlink already in the group?
450 // If one element of a hardlink group is edited, all elements must be
451 // replaced. Here, we remove an untouched hardlink and add it to its
452 // hardlink group for re-adding later
453 LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
454 entry->GetUnionPath().c_str());
455 Remove(entry);
456 hl_group->second.AddHardlink(entry);
457 }
458 }
459 }
460
461
462 /**
463 * Create a recursion engine which DOES NOT recurse into directories.
464 * It basically goes through the current directory (in the union volume) and
465 * searches for legacy hardlinks which has to be connected to the new
466 * or edited ones.
467 */
468 void SyncMediator::CompleteHardlinks(SharedPtr<SyncItem> entry) {
469 assert(handle_hardlinks_);
470
471 // If no hardlink in this directory was changed, we can skip this
472 if (GetHardlinkMap().empty())
473 return;
474
475 LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
476 entry->GetUnionPath().c_str());
477
478 // Look for legacy hardlinks
479 FileSystemTraversal<SyncMediator> traversal(this, union_engine_->union_path(),
480 false);
481 traversal.fn_new_file = &SyncMediator::LegacyRegularHardlinkCallback;
482 traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
483 traversal.fn_new_character_dev = &SyncMediator::
484 LegacyCharacterDeviceHardlinkCallback;
485 traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
486 traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
487 traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
488 traversal.Recurse(entry->GetUnionPath());
489 }
490
491
492 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
493 const string &file_name) {
494 const SharedPtr<SyncItem> entry =
495 CreateSyncItem(parent_dir, file_name, kItemFile);
496 InsertLegacyHardlink(entry);
497 }
498
499
500 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
501 const string &file_name) {
502 const SharedPtr<SyncItem> entry =
503 CreateSyncItem(parent_dir, file_name, kItemSymlink);
504 InsertLegacyHardlink(entry);
505 }
506
507 void SyncMediator::LegacyCharacterDeviceHardlinkCallback(
508 const string &parent_dir, const string &file_name) {
509 const SharedPtr<SyncItem> entry =
510 CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
511 InsertLegacyHardlink(entry);
512 }
513
514 void SyncMediator::LegacyBlockDeviceHardlinkCallback(const string &parent_dir,
515 const string &file_name) {
516 const SharedPtr<SyncItem> entry =
517 CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
518 InsertLegacyHardlink(entry);
519 }
520
521 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
522 const string &file_name) {
523 const SharedPtr<SyncItem> entry =
524 CreateSyncItem(parent_dir, file_name, kItemFifo);
525 InsertLegacyHardlink(entry);
526 }
527
528 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
529 const string &file_name) {
530 const SharedPtr<SyncItem> entry =
531 CreateSyncItem(parent_dir, file_name, kItemSocket);
532 InsertLegacyHardlink(entry);
533 }
534
535
536 void SyncMediator::AddDirectoryRecursively(SharedPtr<SyncItem> entry) {
537 AddDirectory(entry);
538
539 // Create a recursion engine, which recursively adds all entries in a newly
540 // created directory
541 FileSystemTraversal<SyncMediator> traversal(
542 this, union_engine_->scratch_path(), true);
543 traversal.fn_enter_dir = &SyncMediator::EnterAddedDirectoryCallback;
544 traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
545 traversal.fn_new_file = &SyncMediator::AddFileCallback;
546 traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
547 traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
548 traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
549 traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
550 traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
551 traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
552 traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
553 traversal.Recurse(entry->GetScratchPath());
554 }
555
556
557 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
558 const std::string &dir_name) {
559 const SharedPtr<SyncItem> entry =
560 CreateSyncItem(parent_dir, dir_name, kItemDir);
561 AddDirectory(entry);
562 return true; // The recursion engine should recurse deeper here
563 }
564
565
566 void SyncMediator::AddFileCallback(const std::string &parent_dir,
567 const std::string &file_name) {
568 const SharedPtr<SyncItem> entry =
569 CreateSyncItem(parent_dir, file_name, kItemFile);
570 Add(entry);
571 }
572
573
574 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
575 const std::string &file_name) {
576 const SharedPtr<SyncItem> entry =
577 CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
578 Add(entry);
579 }
580
581 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
582 const std::string &file_name) {
583 const SharedPtr<SyncItem> entry =
584 CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
585 Add(entry);
586 }
587
588 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
589 const std::string &file_name) {
590 const SharedPtr<SyncItem> entry =
591 CreateSyncItem(parent_dir, file_name, kItemFifo);
592 Add(entry);
593 }
594
595 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
596 const std::string &file_name) {
597 const SharedPtr<SyncItem> entry =
598 CreateSyncItem(parent_dir, file_name, kItemSocket);
599 Add(entry);
600 }
601
602 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
603 const std::string &link_name) {
604 const SharedPtr<SyncItem> entry =
605 CreateSyncItem(parent_dir, link_name, kItemSymlink);
606 Add(entry);
607 }
608
609
610 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
611 const std::string &dir_name) {
612 const SharedPtr<SyncItem> entry =
613 CreateSyncItem(parent_dir, dir_name, kItemDir);
614 EnterDirectory(entry);
615 }
616
617
618 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
619 const std::string &dir_name) {
620 const SharedPtr<SyncItem> entry =
621 CreateSyncItem(parent_dir, dir_name, kItemDir);
622 LeaveDirectory(entry);
623 }
624
625
626 void SyncMediator::RemoveDirectoryRecursively(SharedPtr<SyncItem> entry) {
627 // Delete a directory AFTER it was emptied here,
628 // because it would start up another recursion
629
630 const bool recurse = false;
631 FileSystemTraversal<SyncMediator> traversal(
632 this, union_engine_->rdonly_path(), recurse);
633 traversal.fn_new_file = &SyncMediator::RemoveFileCallback;
634 traversal.fn_new_dir_postfix = &SyncMediator::RemoveDirectoryCallback;
635 traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
636 traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
637 traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
638 traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
639 traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
640 traversal.Recurse(entry->GetRdOnlyPath());
641
642 // The given directory was emptied recursively and can now itself be deleted
643 RemoveDirectory(entry);
644 }
645
646
647 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
648 const std::string &file_name) {
649 const SharedPtr<SyncItem> entry =
650 CreateSyncItem(parent_dir, file_name, kItemFile);
651 Remove(entry);
652 }
653
654
655 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
656 const std::string &link_name) {
657 const SharedPtr<SyncItem> entry =
658 CreateSyncItem(parent_dir, link_name, kItemSymlink);
659 Remove(entry);
660 }
661
662 void SyncMediator::RemoveCharacterDeviceCallback(const std::string &parent_dir,
663 const std::string &link_name) {
664 const SharedPtr<SyncItem> entry =
665 CreateSyncItem(parent_dir, link_name, kItemCharacterDevice);
666 Remove(entry);
667 }
668
669 void SyncMediator::RemoveBlockDeviceCallback(const std::string &parent_dir,
670 const std::string &link_name) {
671 const SharedPtr<SyncItem> entry =
672 CreateSyncItem(parent_dir, link_name, kItemBlockDevice);
673 Remove(entry);
674 }
675
676 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
677 const std::string &link_name) {
678 const SharedPtr<SyncItem> entry =
679 CreateSyncItem(parent_dir, link_name, kItemFifo);
680 Remove(entry);
681 }
682
683 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
684 const std::string &link_name) {
685 const SharedPtr<SyncItem> entry =
686 CreateSyncItem(parent_dir, link_name, kItemSocket);
687 Remove(entry);
688 }
689
690 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
691 const std::string &dir_name) {
692 const SharedPtr<SyncItem> entry =
693 CreateSyncItem(parent_dir, dir_name, kItemDir);
694 RemoveDirectoryRecursively(entry);
695 }
696
697
698 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
699 const std::string &file_name) {
700 if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
701 return true;
702 }
703
704 const SharedPtr<SyncItem> entry =
705 CreateSyncItem(parent_dir, file_name, kItemUnknown);
706 return entry->IsWhiteout();
707 }
708
709 SharedPtr<SyncItem> SyncMediator::CreateSyncItem(
710 const std::string &relative_parent_path, const std::string &filename,
711 const SyncItemType entry_type) const {
712 return union_engine_->CreateSyncItem(relative_parent_path, filename,
713 entry_type);
714 }
715
716 void SyncMediator::PublishFilesCallback(const upload::SpoolerResult &result) {
717 LogCvmfs(kLogPublish, kLogVerboseMsg,
718 "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
719 result.local_path.c_str(), result.content_hash.ToString().c_str(),
720 result.file_chunks.size(), result.return_code);
721 if (result.return_code != 0) {
722 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
723 result.return_code);
724 }
725
726 SyncItemList::iterator itr;
727 {
728 const MutexLockGuard guard(lock_file_queue_);
729 itr = file_queue_.find(result.local_path);
730 }
731
732 assert(itr != file_queue_.end());
733
734 SyncItem &item = *itr->second;
735 item.SetContentHash(result.content_hash);
736 item.SetCompressionAlgorithm(result.compression_alg);
737
738 XattrList *xattrs = &default_xattrs_;
739 if (params_->include_xattrs) {
740 xattrs = XattrList::CreateFromFile(result.local_path);
741 assert(xattrs != NULL);
742 }
743
744 if (result.IsChunked()) {
745 catalog_manager_->AddChunkedFile(
746 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
747 *xattrs,
748 item.relative_parent_path(),
749 result.file_chunks);
750 } else {
751 catalog_manager_->AddFile(
752 item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
753 *xattrs,
754 item.relative_parent_path());
755 }
756
757 if (xattrs != &default_xattrs_)
758 free(xattrs);
759 }
760
761
762 void SyncMediator::PublishHardlinksCallback(
763 const upload::SpoolerResult &result) {
764 LogCvmfs(kLogPublish, kLogVerboseMsg,
765 "Spooler callback for hardlink %s, digest %s, retval %d",
766 result.local_path.c_str(), result.content_hash.ToString().c_str(),
767 result.return_code);
768 if (result.return_code != 0) {
769 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
770 result.return_code);
771 }
772
773 bool found = false;
774 for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
775 if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
776 found = true;
777 hardlink_queue_[i].master->SetContentHash(result.content_hash);
778 SyncItemList::iterator j, jend;
779 for (j = hardlink_queue_[i].hardlinks.begin(),
780 jend = hardlink_queue_[i].hardlinks.end();
781 j != jend;
782 ++j) {
783 j->second->SetContentHash(result.content_hash);
784 j->second->SetCompressionAlgorithm(result.compression_alg);
785 }
786 if (result.IsChunked())
787 hardlink_queue_[i].file_chunks = result.file_chunks;
788
789 break;
790 }
791 }
792
793 assert(found);
794 }
795
796
797 void SyncMediator::CreateNestedCatalog(SharedPtr<SyncItem> directory) {
798 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
799 reporter_->OnAdd(notice, catalog::DirectoryEntry());
800
801 if (!params_->dry_run) {
802 catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
803 }
804 }
805
806
807 void SyncMediator::RemoveNestedCatalog(SharedPtr<SyncItem> directory) {
808 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
809 reporter_->OnRemove(notice, catalog::DirectoryEntry());
810
811 if (!params_->dry_run) {
812 catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
813 }
814 }
815
816 void SyncDiffReporter::OnInit(const history::History::Tag & /*from_tag*/,
817 const history::History::Tag & /*to_tag*/) { }
818
819 void SyncDiffReporter::OnStats(const catalog::DeltaCounters & /*delta*/) { }
820
821 void SyncDiffReporter::OnAdd(const std::string &path,
822 const catalog::DirectoryEntry & /*entry*/) {
823 changed_items_++;
824 AddImpl(path);
825 }
826 void SyncDiffReporter::OnRemove(const std::string &path,
827 const catalog::DirectoryEntry & /*entry*/) {
828 changed_items_++;
829 RemoveImpl(path);
830 }
831 void SyncDiffReporter::OnModify(const std::string &path,
832 const catalog::DirectoryEntry & /*entry_from*/,
833 const catalog::DirectoryEntry & /*entry_to*/) {
834 changed_items_++;
835 ModifyImpl(path);
836 }
837
838 void SyncDiffReporter::CommitReport() {
839 if (print_action_ == kPrintDots) {
840 if (changed_items_ >= processing_dot_interval_) {
841 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, "\n");
842 }
843 }
844 }
845
846 void SyncDiffReporter::PrintDots() {
847 if (changed_items_ % processing_dot_interval_ == 0) {
848 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, ".");
849 }
850 }
851
852 void SyncDiffReporter::AddImpl(const std::string &path) {
853 const char *action_label;
854
855 switch (print_action_) {
856 case kPrintChanges:
857 if (path.at(0) != '/') {
858 action_label = "[x-catalog-add]";
859 } else {
860 action_label = "[add]";
861 }
862 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
863 break;
864
865 case kPrintDots:
866 PrintDots();
867 break;
868 default:
869 assert("Invalid print action.");
870 }
871 }
872
873 void SyncDiffReporter::RemoveImpl(const std::string &path) {
874 const char *action_label;
875
876 switch (print_action_) {
877 case kPrintChanges:
878 if (path.at(0) != '/') {
879 action_label = "[x-catalog-rem]";
880 } else {
881 action_label = "[rem]";
882 }
883
884 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
885 break;
886
887 case kPrintDots:
888 PrintDots();
889 break;
890 default:
891 assert("Invalid print action.");
892 }
893 }
894
895 void SyncDiffReporter::ModifyImpl(const std::string &path) {
896 const char *action_label;
897
898 switch (print_action_) {
899 case kPrintChanges:
900 action_label = "[mod]";
901 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
902 break;
903
904 case kPrintDots:
905 PrintDots();
906 break;
907 default:
908 assert("Invalid print action.");
909 }
910 }
911
912 void SyncMediator::AddFile(SharedPtr<SyncItem> entry) {
913 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
914
915 if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
916 assert(!entry->HasGraftMarker());
917 // Symlinks and special files are completely stored in the catalog
918 XattrList *xattrs = &default_xattrs_;
919 if (params_->include_xattrs) {
920 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
921 assert(xattrs);
922 }
923 catalog_manager_->AddFile(
924 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
925 *xattrs,
926 entry->relative_parent_path());
927 if (xattrs != &default_xattrs_)
928 free(xattrs);
929 } else if (entry->HasGraftMarker() && !params_->dry_run) {
930 if (entry->IsValidGraft()) {
931 // Graft files are added to catalog immediately.
932 if (entry->IsChunkedGraft()) {
933 catalog_manager_->AddChunkedFile(
934 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
935 default_xattrs_,
936 entry->relative_parent_path(),
937 *(entry->GetGraftChunks()));
938 } else {
939 catalog_manager_->AddFile(
940 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
941 default_xattrs_, // TODO(bbockelm): For now, use default xattrs
942 // on grafted files.
943 entry->relative_parent_path());
944 }
945 } else {
946 // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
947 // the graft file is missing information. It's not clear that continuing
948 // forward with the publish is the correct thing to do; abort for now.
949 PANIC(kLogStderr,
950 "Encountered a grafted file (%s) with "
951 "invalid grafting information; check contents of .cvmfsgraft-*"
952 " file. Aborting publish.",
953 entry->GetRelativePath().c_str());
954 }
955 } else if (entry->relative_parent_path().empty()
956 && entry->IsCatalogMarker()) {
957 PANIC(kLogStderr, "Error: nested catalog marker in root directory");
958 } else if (!params_->dry_run) {
959 {
960 // Push the file to the spooler, remember the entry for the path
961 const MutexLockGuard m(&lock_file_queue_);
962 file_queue_[entry->GetUnionPath()] = entry;
963 }
964 // Spool the file
965 params_->spooler->Process(entry->CreateIngestionSource());
966 }
967
968 // publish statistics counting for new file
969 if (entry->IsNew()) {
970 if (entry->IsSymlink()) {
971 perf::Inc(counters_->n_symlinks_added);
972 } else {
973 perf::Inc(counters_->n_files_added);
974 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
975 }
976 }
977 }
978
979 void SyncMediator::RemoveFile(SharedPtr<SyncItem> entry) {
980 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
981
982 if (!params_->dry_run) {
983 if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
984 LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
985 entry->GetUnionPath().c_str());
986 catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
987 }
988 catalog_manager_->RemoveFile(entry->GetRelativePath());
989 }
990
991 // Counting nr of removed files and removed bytes
992 if (entry->WasSymlink()) {
993 perf::Inc(counters_->n_symlinks_removed);
994 } else {
995 perf::Inc(counters_->n_files_removed);
996 }
997 perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
998 }
999
1000 void SyncMediator::AddUnmaterializedDirectory(SharedPtr<SyncItem> entry) {
1001 AddDirectory(entry);
1002 }
1003
1004 void SyncMediator::AddDirectory(SharedPtr<SyncItem> entry) {
1005 if (entry->IsBundleSpec()) {
1006 PANIC(kLogStderr,
1007 "Illegal directory name: .cvmfsbundles (%s). "
1008 ".cvmfsbundles is reserved for bundles specification files",
1009 entry->GetUnionPath().c_str());
1010 }
1011
1012 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1013
1014 perf::Inc(counters_->n_directories_added);
1015 assert(!entry->HasGraftMarker());
1016 if (!params_->dry_run) {
1017 XattrList *xattrs = &default_xattrs_;
1018 if (params_->include_xattrs) {
1019 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1020 assert(xattrs);
1021 }
1022 catalog_manager_->AddDirectory(
1023 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1024 entry->relative_parent_path());
1025 if (xattrs != &default_xattrs_)
1026 free(xattrs);
1027 }
1028
1029 if (entry->HasCatalogMarker()
1030 && !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1031 CreateNestedCatalog(entry);
1032 }
1033 }
1034
1035
1036 /**
1037 * this method deletes a single directory entry! Make sure to empty it
1038 * before you call this method or simply use
1039 * SyncMediator::RemoveDirectoryRecursively instead.
1040 */
1041 void SyncMediator::RemoveDirectory(SharedPtr<SyncItem> entry) {
1042 const std::string directory_path = entry->GetRelativePath();
1043
1044 if (catalog_manager_->IsTransitionPoint(directory_path)) {
1045 RemoveNestedCatalog(entry);
1046 }
1047
1048 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1049 if (!params_->dry_run) {
1050 catalog_manager_->RemoveDirectory(directory_path);
1051 }
1052
1053 perf::Inc(counters_->n_directories_removed);
1054 }
1055
1056 void SyncMediator::TouchDirectory(SharedPtr<SyncItem> entry) {
1057 reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1058 catalog::DirectoryEntry());
1059
1060 const std::string directory_path = entry->GetRelativePath();
1061
1062 if (!params_->dry_run) {
1063 XattrList *xattrs = &default_xattrs_;
1064 if (params_->include_xattrs) {
1065 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1066 assert(xattrs);
1067 }
1068 catalog_manager_->TouchDirectory(
1069 entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1070 directory_path);
1071 if (xattrs != &default_xattrs_)
1072 free(xattrs);
1073 }
1074
1075 if (entry->HasCatalogMarker()
1076 && !catalog_manager_->IsTransitionPoint(directory_path)) {
1077 CreateNestedCatalog(entry);
1078 } else if (!entry->HasCatalogMarker()
1079 && catalog_manager_->IsTransitionPoint(directory_path)) {
1080 RemoveNestedCatalog(entry);
1081 }
1082 }
1083
1084 /**
1085 * All hardlinks in the current directory have been picked up. Now they are
1086 * added to the catalogs.
1087 */
1088 void SyncMediator::AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks) {
1089 assert(handle_hardlinks_);
1090
1091 for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1092 iEnd = hardlinks.end();
1093 i != iEnd;
1094 ++i) {
1095 if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()
1096 && !params_->ignore_xdir_hardlinks) {
1097 PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1098 i->second.master->GetUnionPath().c_str());
1099 }
1100
1101 if (params_->print_changeset) {
1102 for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1103 jEnd = i->second.hardlinks.end();
1104 j != jEnd;
1105 ++j) {
1106 const std::string changeset_notice =
1107 GetParentPath(i->second.master->GetUnionPath()) + "/" +
1108 j->second->filename();
1109 reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1110 }
1111 }
1112
1113 if (params_->dry_run)
1114 continue;
1115
1116 if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1117 AddHardlinkGroup(i->second);
1118 else
1119 hardlink_queue_.push_back(i->second);
1120 }
1121 }
1122
1123
1124 void SyncMediator::AddHardlinkGroup(const HardlinkGroup &group) {
1125 assert(handle_hardlinks_);
1126
1127 // Create a DirectoryEntry list out of the hardlinks
1128 catalog::DirectoryEntryBaseList hardlinks;
1129 for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1130 iEnd = group.hardlinks.end();
1131 i != iEnd;
1132 ++i) {
1133 hardlinks.push_back(
1134 i->second->CreateBasicCatalogDirent(params_->enable_mtime_ns));
1135 }
1136 XattrList *xattrs = &default_xattrs_;
1137 if (params_->include_xattrs) {
1138 xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1139 assert(xattrs);
1140 }
1141 catalog_manager_->AddHardlinkGroup(hardlinks,
1142 *xattrs,
1143 group.master->relative_parent_path(),
1144 group.file_chunks);
1145 if (xattrs != &default_xattrs_)
1146 free(xattrs);
1147 }
1148
1149 } // namespace publish
1150