GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/sync_mediator.cc
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 1 554 0.2%
Branches: 0 890 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #define __STDC_FORMAT_MACROS
6
7 #include "sync_mediator.h"
8
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16
17 #include "catalog_virtual.h"
18 #include "compression.h"
19 #include "crypto/hash.h"
20 #include "directory_entry.h"
21 #include "json_document.h"
22 #include "publish/repository.h"
23 #include "sync_union.h"
24 #include "upload.h"
25 #include "util/concurrency.h"
26 #include "util/exception.h"
27 #include "util/fs_traversal.h"
28 #include "util/posix.h"
29 #include "util/smalloc.h"
30 #include "util/string.h"
31
32 using namespace std; // NOLINT
33
34 namespace publish {
35
36 6 AbstractSyncMediator::~AbstractSyncMediator() {}
37
38 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
39 const SyncParameters *params,
40 perf::StatisticsTemplate statistics)
41 : catalog_manager_(catalog_manager)
42 , union_engine_(NULL)
43 , handle_hardlinks_(false)
44 , params_(params)
45 , reporter_(new SyncDiffReporter(params_->print_changeset
46 ? SyncDiffReporter::kPrintChanges
47 : SyncDiffReporter::kPrintDots)) {
48 int retval = pthread_mutex_init(&lock_file_queue_, NULL);
49 assert(retval == 0);
50
51 params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
52
53 counters_ = new perf::FsCounters(statistics);
54 }
55
56 SyncMediator::~SyncMediator() {
57 pthread_mutex_destroy(&lock_file_queue_);
58 }
59
60
61 void SyncMediator::RegisterUnionEngine(SyncUnion *engine) {
62 union_engine_ = engine;
63 handle_hardlinks_ = engine->SupportsHardlinks();
64 }
65
66
67 /**
68 * The entry /.cvmfs or entries in /.cvmfs/ must not be added, removed or
69 * modified manually. The directory /.cvmfs is generated by the VirtualCatalog
70 * class if requested.
71 */
72 void SyncMediator::EnsureAllowed(SharedPtr<SyncItem> entry) {
73 const bool ignore_case_setting = false;
74 string relative_path = entry->GetRelativePath();
75 if ( (relative_path == string(catalog::VirtualCatalog::kVirtualPath)) ||
76 (HasPrefix(relative_path,
77 string(catalog::VirtualCatalog::kVirtualPath) + "/",
78 ignore_case_setting)) )
79 {
80 PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
81 relative_path.c_str());
82 }
83 }
84
85
86 /**
87 * Add an entry to the repository.
88 * Added directories will be traversed in order to add the complete subtree.
89 */
90 void SyncMediator::Add(SharedPtr<SyncItem> entry) {
91 EnsureAllowed(entry);
92
93 if (entry->IsDirectory()) {
94 AddDirectoryRecursively(entry);
95 return;
96 }
97
98 // .cvmfsbundles file type
99 if (entry->IsBundleSpec()) {
100 PrintWarning(".cvmfsbundles file encountered. "
101 "Bundles is currently an experimental feature.");
102
103 if (!entry->IsRegularFile()) {
104 PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
105 }
106 if (entry->HasHardlinks()) {
107 PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
108 }
109
110 std::string parent_path = GetParentPath(entry->GetUnionPath());
111 if (parent_path != union_engine_->union_path()) {
112 PANIC(kLogStderr, "Error: .cvmfsbundles file must be in the root"
113 " directory of the repository. Found in %s", parent_path.c_str());
114 }
115
116 std::string json_string;
117
118 int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
119 if (fd < 0) {
120 PANIC(kLogStderr, "Could not open file: %s",
121 entry->GetUnionPath().c_str());
122 }
123 if (!SafeReadToString(fd, &json_string)) {
124 PANIC(kLogStderr, "Could not read contents of file: %s",
125 entry->GetUnionPath().c_str());
126 }
127 UniquePtr<JsonDocument> json(JsonDocument::Create(json_string));
128
129 AddFile(entry);
130 return;
131 }
132
133 if (entry->IsRegularFile() || entry->IsSymlink()) {
134 // A file is a hard link if the link count is greater than 1
135 if (entry->HasHardlinks() && handle_hardlinks_)
136 InsertHardlink(entry);
137 else
138 AddFile(entry);
139 return;
140 } else if (entry->IsGraftMarker()) {
141 LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
142 return; // Ignore markers.
143 }
144
145 // In OverlayFS whiteouts can be represented as character devices with major
146 // and minor numbers equal to 0. Special files will be ignored except if they
147 // are whiteout files.
148 if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
149 if (params_->ignore_special_files) {
150 PrintWarning("'" + entry->GetRelativePath() + "' "
151 "is a special file, ignoring.");
152 } else {
153 if (entry->HasHardlinks() && handle_hardlinks_)
154 InsertHardlink(entry);
155 else
156 AddFile(entry);
157 }
158 return;
159 }
160
161 PrintWarning("'" + entry->GetRelativePath() +
162 "' cannot be added. Unrecognized file type.");
163 }
164
165
166 /**
167 * Touch an entry in the repository.
168 */
169 void SyncMediator::Touch(SharedPtr<SyncItem> entry) {
170 EnsureAllowed(entry);
171
172 if (entry->IsGraftMarker()) {return;}
173 if (entry->IsDirectory()) {
174 TouchDirectory(entry);
175 perf::Inc(counters_->n_directories_changed);
176 return;
177 }
178
179 if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
180 Replace(entry); // This way, hardlink processing is correct
181 // Replace calls Remove; cancel Remove's actions:
182 perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
183
184 // Count only the difference between the old and new file
185 // Symlinks do not count into added or removed bytes
186 int64_t dif = 0;
187
188 // Need to handle 4 cases (symlink->symlink, symlink->regular,
189 // regular->symlink, regular->regular)
190 if (entry->WasSymlink()) {
191 // Replace calls Remove; cancel Remove's actions:
192 perf::Dec(counters_->n_symlinks_removed);
193
194 if (entry->IsSymlink()) {
195 perf::Inc(counters_->n_symlinks_changed);
196 } else {
197 perf::Inc(counters_->n_symlinks_removed);
198 perf::Inc(counters_->n_files_added);
199 dif += entry->GetScratchSize();
200 }
201 } else {
202 // Replace calls Remove; cancel Remove's actions:
203 perf::Dec(counters_->n_files_removed);
204 dif -= entry->GetRdOnlySize();
205 if (entry->IsSymlink()) {
206 perf::Inc(counters_->n_files_removed);
207 perf::Inc(counters_->n_symlinks_added);
208 } else {
209 perf::Inc(counters_->n_files_changed);
210 dif += entry->GetScratchSize();
211 }
212 }
213
214 if (dif > 0) { // added bytes
215 perf::Xadd(counters_->sz_added_bytes, dif);
216 } else { // removed bytes
217 perf::Xadd(counters_->sz_removed_bytes, -dif);
218 }
219 return;
220 }
221
222 PrintWarning("'" + entry->GetRelativePath() +
223 "' cannot be touched. Unrecognized file type.");
224 }
225
226
227 /**
228 * Remove an entry from the repository. Directories will be recursively removed.
229 */
230 void SyncMediator::Remove(SharedPtr<SyncItem> entry) {
231 EnsureAllowed(entry);
232
233 if (entry->WasDirectory()) {
234 RemoveDirectoryRecursively(entry);
235 return;
236 }
237
238 if (entry->WasBundleSpec()) {
239 // for now remove using RemoveFile()
240 RemoveFile(entry);
241 return;
242 }
243
244 if (entry->WasRegularFile() || entry->WasSymlink() ||
245 entry->WasSpecialFile()) {
246 RemoveFile(entry);
247 return;
248 }
249
250 PrintWarning("'" + entry->GetRelativePath() +
251 "' cannot be deleted. Unrecognized file type.");
252 }
253
254
255 /**
256 * Remove the old entry and add the new one.
257 */
258 void SyncMediator::Replace(SharedPtr<SyncItem> entry) {
259 // EnsureAllowed(entry); <-- Done by Remove() and Add()
260 Remove(entry);
261 Add(entry);
262 }
263
264 void SyncMediator::Clone(const std::string from, const std::string to) {
265 catalog_manager_->Clone(from, to);
266 }
267
268 void SyncMediator::EnterDirectory(SharedPtr<SyncItem> entry) {
269 if (!handle_hardlinks_) {
270 return;
271 }
272
273 HardlinkGroupMap new_map;
274 hardlink_stack_.push(new_map);
275 }
276
277
278 void SyncMediator::LeaveDirectory(SharedPtr<SyncItem> entry)
279 {
280 if (!handle_hardlinks_) {
281 return;
282 }
283
284 CompleteHardlinks(entry);
285 AddLocalHardlinkGroups(GetHardlinkMap());
286 hardlink_stack_.pop();
287 }
288
289
290 /**
291 * Do any pending processing and commit all changes to the catalogs.
292 * To be called after change set traversal is finished.
293 */
294 bool SyncMediator::Commit(manifest::Manifest *manifest) {
295 reporter_->CommitReport();
296
297 if (!params_->dry_run) {
298 LogCvmfs(kLogPublish, kLogStdout,
299 "Waiting for upload of files before committing...");
300 params_->spooler->WaitForUpload();
301 }
302
303 if (!hardlink_queue_.empty()) {
304 assert(handle_hardlinks_);
305
306 LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
307 params_->spooler->UnregisterListeners();
308 params_->spooler->RegisterListener(&SyncMediator::PublishHardlinksCallback,
309 this);
310
311 // TODO(rmeusel): Revise that for Thread Safety!
312 // This loop will spool hardlinks into the spooler, which will then
313 // process them.
314 // On completion of every hardlink the spooler will asynchronously
315 // emit callbacks (SyncMediator::PublishHardlinksCallback) which
316 // might happen while this for-loop goes through the hardlink_queue_
317 //
318 // For the moment this seems not to be a problem, but it's an accident
319 // just waiting to happen.
320 //
321 // Note: Just wrapping this loop in a mutex might produce a dead lock
322 // since the spooler does not fill it's processing queue to an
323 // unlimited size. Meaning that it might be flooded with hard-
324 // links and waiting for the queue to be processed while proces-
325 // sing is stalled because the callback is waiting for this
326 // mutex.
327 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
328 iEnd = hardlink_queue_.end(); i != iEnd; ++i)
329 {
330 LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
331 i->master->GetUnionPath().c_str());
332 IngestionSource *source =
333 new FileIngestionSource(i->master->GetUnionPath());
334 params_->spooler->Process(source);
335 }
336
337 params_->spooler->WaitForUpload();
338
339 for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
340 iEnd = hardlink_queue_.end(); i != iEnd; ++i)
341 {
342 LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
343 i->master->GetUnionPath().c_str());
344 AddHardlinkGroup(*i);
345 }
346 }
347
348 if (union_engine_) union_engine_->PostUpload();
349
350 params_->spooler->UnregisterListeners();
351
352 if (params_->dry_run) {
353 manifest = NULL;
354 return true;
355 }
356
357 LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
358 if (params_->spooler->GetNumberOfErrors() > 0) {
359 LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
360 return false;
361 }
362
363 if (catalog_manager_->IsBalanceable() ||
364 (params_->virtual_dir_actions != catalog::VirtualCatalog::kActionNone))
365 {
366 if (catalog_manager_->IsBalanceable())
367 catalog_manager_->Balance();
368 // Commit empty string to ensure that the "content" of the auto catalog
369 // markers is present in the repository.
370 string empty_file = CreateTempPath(params_->dir_temp + "/empty", 0600);
371 IngestionSource* source = new FileIngestionSource(empty_file);
372 params_->spooler->Process(source);
373 params_->spooler->WaitForUpload();
374 unlink(empty_file.c_str());
375 if (params_->spooler->GetNumberOfErrors() > 0) {
376 LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
377 return false;
378 }
379 }
380 catalog_manager_->PrecalculateListings();
381 return catalog_manager_->Commit(params_->stop_for_catalog_tweaks,
382 params_->manual_revision,
383 manifest);
384 }
385
386
387 void SyncMediator::InsertHardlink(SharedPtr<SyncItem> entry) {
388 assert(handle_hardlinks_);
389
390 uint64_t inode = entry->GetUnionInode();
391 LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
392 inode, entry->GetUnionPath().c_str());
393
394 // Find the hard link group in the lists
395 HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(inode);
396
397 if (hardlink_group == GetHardlinkMap().end()) {
398 // Create a new hardlink group
399 GetHardlinkMap().insert(
400 HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
401 } else {
402 // Append the file to the appropriate hardlink group
403 hardlink_group->second.AddHardlink(entry);
404 }
405
406 // publish statistics counting for new file
407 if (entry->IsNew()) {
408 perf::Inc(counters_->n_files_added);
409 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
410 }
411 }
412
413
414 void SyncMediator::InsertLegacyHardlink(SharedPtr<SyncItem> entry) {
415 // Check if found file has hardlinks (nlink > 1)
416 // As we are looking through all files in one directory here, there might be
417 // completely untouched hardlink groups, which we can safely skip.
418 // Finally we have to see if the hardlink is already part of this group
419
420 assert(handle_hardlinks_);
421
422 if (entry->GetUnionLinkcount() < 2)
423 return;
424
425 uint64_t inode = entry->GetUnionInode();
426 HardlinkGroupMap::iterator hl_group;
427 hl_group = GetHardlinkMap().find(inode);
428
429 if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
430 bool found = false;
431
432 // search for the entry in this group
433 for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
434 iEnd = hl_group->second.hardlinks.end(); i != iEnd; ++i)
435 {
436 if (*(i->second) == *entry) {
437 found = true;
438 break;
439 }
440 }
441
442 if (!found) {
443 // Hardlink already in the group?
444 // If one element of a hardlink group is edited, all elements must be
445 // replaced. Here, we remove an untouched hardlink and add it to its
446 // hardlink group for re-adding later
447 LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
448 entry->GetUnionPath().c_str());
449 Remove(entry);
450 hl_group->second.AddHardlink(entry);
451 }
452 }
453 }
454
455
456 /**
457 * Create a recursion engine which DOES NOT recurse into directories.
458 * It basically goes through the current directory (in the union volume) and
459 * searches for legacy hardlinks which has to be connected to the new
460 * or edited ones.
461 */
462 void SyncMediator::CompleteHardlinks(SharedPtr<SyncItem> entry) {
463 assert(handle_hardlinks_);
464
465 // If no hardlink in this directory was changed, we can skip this
466 if (GetHardlinkMap().empty())
467 return;
468
469 LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
470 entry->GetUnionPath().c_str());
471
472 // Look for legacy hardlinks
473 FileSystemTraversal<SyncMediator> traversal(this, union_engine_->union_path(),
474 false);
475 traversal.fn_new_file =
476 &SyncMediator::LegacyRegularHardlinkCallback;
477 traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
478 traversal.fn_new_character_dev =
479 &SyncMediator::LegacyCharacterDeviceHardlinkCallback;
480 traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
481 traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
482 traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
483 traversal.Recurse(entry->GetUnionPath());
484 }
485
486
487 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
488 const string &file_name)
489 {
490 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
491 InsertLegacyHardlink(entry);
492 }
493
494
495 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
496 const string &file_name)
497 {
498 SharedPtr<SyncItem> entry =
499 CreateSyncItem(parent_dir, file_name, kItemSymlink);
500 InsertLegacyHardlink(entry);
501 }
502
503 void SyncMediator::LegacyCharacterDeviceHardlinkCallback(
504 const string &parent_dir,
505 const string &file_name)
506 {
507 SharedPtr<SyncItem> entry =
508 CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
509 InsertLegacyHardlink(entry);
510 }
511
512 void SyncMediator::LegacyBlockDeviceHardlinkCallback(
513 const string &parent_dir,
514 const string &file_name)
515 {
516 SharedPtr<SyncItem> entry =
517 CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
518 InsertLegacyHardlink(entry);
519 }
520
521 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
522 const string &file_name)
523 {
524 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
525 InsertLegacyHardlink(entry);
526 }
527
528 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
529 const string &file_name)
530 {
531 SharedPtr<SyncItem> entry =
532 CreateSyncItem(parent_dir, file_name, kItemSocket);
533 InsertLegacyHardlink(entry);
534 }
535
536
537 void SyncMediator::AddDirectoryRecursively(SharedPtr<SyncItem> entry) {
538 AddDirectory(entry);
539
540 // Create a recursion engine, which recursively adds all entries in a newly
541 // created directory
542 FileSystemTraversal<SyncMediator> traversal(
543 this, union_engine_->scratch_path(), true);
544 traversal.fn_enter_dir = &SyncMediator::EnterAddedDirectoryCallback;
545 traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
546 traversal.fn_new_file = &SyncMediator::AddFileCallback;
547 traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
548 traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
549 traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
550 traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
551 traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
552 traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
553 traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
554 traversal.Recurse(entry->GetScratchPath());
555 }
556
557
558 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
559 const std::string &dir_name)
560 {
561 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
562 AddDirectory(entry);
563 return true; // The recursion engine should recurse deeper here
564 }
565
566
567 void SyncMediator::AddFileCallback(const std::string &parent_dir,
568 const std::string &file_name)
569 {
570 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
571 Add(entry);
572 }
573
574
575 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
576 const std::string &file_name)
577 {
578 SharedPtr<SyncItem> entry =
579 CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
580 Add(entry);
581 }
582
583 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
584 const std::string &file_name)
585 {
586 SharedPtr<SyncItem> entry =
587 CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
588 Add(entry);
589 }
590
591 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
592 const std::string &file_name)
593 {
594 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
595 Add(entry);
596 }
597
598 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
599 const std::string &file_name) {
600 SharedPtr<SyncItem> entry =
601 CreateSyncItem(parent_dir, file_name, kItemSocket);
602 Add(entry);
603 }
604
605 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
606 const std::string &link_name)
607 {
608 SharedPtr<SyncItem> entry =
609 CreateSyncItem(parent_dir, link_name, kItemSymlink);
610 Add(entry);
611 }
612
613
614 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
615 const std::string &dir_name)
616 {
617 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
618 EnterDirectory(entry);
619 }
620
621
622 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
623 const std::string &dir_name)
624 {
625 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
626 LeaveDirectory(entry);
627 }
628
629
630 void SyncMediator::RemoveDirectoryRecursively(SharedPtr<SyncItem> entry) {
631 // Delete a directory AFTER it was emptied here,
632 // because it would start up another recursion
633
634 const bool recurse = false;
635 FileSystemTraversal<SyncMediator> traversal(
636 this, union_engine_->rdonly_path(), recurse);
637 traversal.fn_new_file = &SyncMediator::RemoveFileCallback;
638 traversal.fn_new_dir_postfix =
639 &SyncMediator::RemoveDirectoryCallback;
640 traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
641 traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
642 traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
643 traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
644 traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
645 traversal.Recurse(entry->GetRdOnlyPath());
646
647 // The given directory was emptied recursively and can now itself be deleted
648 RemoveDirectory(entry);
649 }
650
651
652 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
653 const std::string &file_name)
654 {
655 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
656 Remove(entry);
657 }
658
659
660 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
661 const std::string &link_name)
662 {
663 SharedPtr<SyncItem> entry =
664 CreateSyncItem(parent_dir, link_name, kItemSymlink);
665 Remove(entry);
666 }
667
668 void SyncMediator::RemoveCharacterDeviceCallback(
669 const std::string &parent_dir,
670 const std::string &link_name)
671 {
672 SharedPtr<SyncItem> entry =
673 CreateSyncItem(parent_dir, link_name, kItemCharacterDevice);
674 Remove(entry);
675 }
676
677 void SyncMediator::RemoveBlockDeviceCallback(
678 const std::string &parent_dir,
679 const std::string &link_name)
680 {
681 SharedPtr<SyncItem> entry =
682 CreateSyncItem(parent_dir, link_name, kItemBlockDevice);
683 Remove(entry);
684 }
685
686 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
687 const std::string &link_name)
688 {
689 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name, kItemFifo);
690 Remove(entry);
691 }
692
693 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
694 const std::string &link_name)
695 {
696 SharedPtr<SyncItem> entry =
697 CreateSyncItem(parent_dir, link_name, kItemSocket);
698 Remove(entry);
699 }
700
701 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
702 const std::string &dir_name)
703 {
704 SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
705 RemoveDirectoryRecursively(entry);
706 }
707
708
709 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
710 const std::string &file_name)
711 {
712 if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
713 return true;
714 }
715
716 SharedPtr<SyncItem> entry =
717 CreateSyncItem(parent_dir, file_name, kItemUnknown);
718 return entry->IsWhiteout();
719 }
720
721 SharedPtr<SyncItem> SyncMediator::CreateSyncItem(
722 const std::string &relative_parent_path, const std::string &filename,
723 const SyncItemType entry_type) const {
724 return union_engine_->CreateSyncItem(relative_parent_path, filename,
725 entry_type);
726 }
727
728 void SyncMediator::PublishFilesCallback(const upload::SpoolerResult &result) {
729 LogCvmfs(kLogPublish, kLogVerboseMsg,
730 "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
731 result.local_path.c_str(),
732 result.content_hash.ToString().c_str(),
733 result.file_chunks.size(),
734 result.return_code);
735 if (result.return_code != 0) {
736 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
737 result.return_code);
738 }
739
740 SyncItemList::iterator itr;
741 {
742 MutexLockGuard guard(lock_file_queue_);
743 itr = file_queue_.find(result.local_path);
744 }
745
746 assert(itr != file_queue_.end());
747
748 SyncItem &item = *itr->second;
749 item.SetContentHash(result.content_hash);
750 item.SetCompressionAlgorithm(result.compression_alg);
751
752 XattrList *xattrs = &default_xattrs_;
753 if (params_->include_xattrs) {
754 xattrs = XattrList::CreateFromFile(result.local_path);
755 assert(xattrs != NULL);
756 }
757
758 if (result.IsChunked()) {
759 catalog_manager_->AddChunkedFile(
760 item.CreateBasicCatalogDirent(),
761 *xattrs,
762 item.relative_parent_path(),
763 result.file_chunks);
764 } else {
765 catalog_manager_->AddFile(
766 item.CreateBasicCatalogDirent(),
767 *xattrs,
768 item.relative_parent_path());
769 }
770
771 if (xattrs != &default_xattrs_)
772 free(xattrs);
773 }
774
775
776 void SyncMediator::PublishHardlinksCallback(
777 const upload::SpoolerResult &result)
778 {
779 LogCvmfs(kLogPublish, kLogVerboseMsg,
780 "Spooler callback for hardlink %s, digest %s, retval %d",
781 result.local_path.c_str(),
782 result.content_hash.ToString().c_str(),
783 result.return_code);
784 if (result.return_code != 0) {
785 PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
786 result.return_code);
787 }
788
789 bool found = false;
790 for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
791 if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
792 found = true;
793 hardlink_queue_[i].master->SetContentHash(result.content_hash);
794 SyncItemList::iterator j, jend;
795 for (j = hardlink_queue_[i].hardlinks.begin(),
796 jend = hardlink_queue_[i].hardlinks.end();
797 j != jend; ++j)
798 {
799 j->second->SetContentHash(result.content_hash);
800 j->second->SetCompressionAlgorithm(result.compression_alg);
801 }
802 if (result.IsChunked())
803 hardlink_queue_[i].file_chunks = result.file_chunks;
804
805 break;
806 }
807 }
808
809 assert(found);
810 }
811
812
813 void SyncMediator::CreateNestedCatalog(SharedPtr<SyncItem> directory) {
814 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
815 reporter_->OnAdd(notice, catalog::DirectoryEntry());
816
817 if (!params_->dry_run) {
818 catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
819 }
820 }
821
822
823 void SyncMediator::RemoveNestedCatalog(SharedPtr<SyncItem> directory) {
824 const std::string notice = "Nested catalog at " + directory->GetUnionPath();
825 reporter_->OnRemove(notice, catalog::DirectoryEntry());
826
827 if (!params_->dry_run) {
828 catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
829 }
830 }
831
832 void SyncDiffReporter::OnInit(const history::History::Tag & /*from_tag*/,
833 const history::History::Tag & /*to_tag*/) {}
834
835 void SyncDiffReporter::OnStats(const catalog::DeltaCounters & /*delta*/) {}
836
837 void SyncDiffReporter::OnAdd(const std::string &path,
838 const catalog::DirectoryEntry & /*entry*/) {
839 changed_items_++;
840 AddImpl(path);
841 }
842 void SyncDiffReporter::OnRemove(const std::string &path,
843 const catalog::DirectoryEntry & /*entry*/) {
844 changed_items_++;
845 RemoveImpl(path);
846 }
847 void SyncDiffReporter::OnModify(const std::string &path,
848 const catalog::DirectoryEntry & /*entry_from*/,
849 const catalog::DirectoryEntry & /*entry_to*/) {
850 changed_items_++;
851 ModifyImpl(path);
852 }
853
854 void SyncDiffReporter::CommitReport() {
855 if (print_action_ == kPrintDots) {
856 if (changed_items_ >= processing_dot_interval_) {
857 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, "\n");
858 }
859 }
860 }
861
862 void SyncDiffReporter::PrintDots() {
863 if (changed_items_ % processing_dot_interval_ == 0) {
864 LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, ".");
865 }
866 }
867
868 void SyncDiffReporter::AddImpl(const std::string &path) {
869 const char *action_label;
870
871 switch (print_action_) {
872 case kPrintChanges:
873 if (path.at(0) != '/') {
874 action_label = "[x-catalog-add]";
875 } else {
876 action_label = "[add]";
877 }
878 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
879 break;
880
881 case kPrintDots:
882 PrintDots();
883 break;
884 default:
885 assert("Invalid print action.");
886 }
887 }
888
889 void SyncDiffReporter::RemoveImpl(const std::string &path) {
890 const char *action_label;
891
892 switch (print_action_) {
893 case kPrintChanges:
894 if (path.at(0) != '/') {
895 action_label = "[x-catalog-rem]";
896 } else {
897 action_label = "[rem]";
898 }
899
900 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
901 break;
902
903 case kPrintDots:
904 PrintDots();
905 break;
906 default:
907 assert("Invalid print action.");
908 }
909 }
910
911 void SyncDiffReporter::ModifyImpl(const std::string &path) {
912 const char *action_label;
913
914 switch (print_action_) {
915 case kPrintChanges:
916 action_label = "[mod]";
917 LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
918 break;
919
920 case kPrintDots:
921 PrintDots();
922 break;
923 default:
924 assert("Invalid print action.");
925 }
926 }
927
928 void SyncMediator::AddFile(SharedPtr<SyncItem> entry) {
929 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
930
931 if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
932 assert(!entry->HasGraftMarker());
933 // Symlinks and special files are completely stored in the catalog
934 XattrList *xattrs = &default_xattrs_;
935 if (params_->include_xattrs) {
936 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
937 assert(xattrs);
938 }
939 catalog_manager_->AddFile(entry->CreateBasicCatalogDirent(), *xattrs,
940 entry->relative_parent_path());
941 if (xattrs != &default_xattrs_)
942 free(xattrs);
943 } else if (entry->HasGraftMarker() && !params_->dry_run) {
944 if (entry->IsValidGraft()) {
945 // Graft files are added to catalog immediately.
946 if (entry->IsChunkedGraft()) {
947 catalog_manager_->AddChunkedFile(
948 entry->CreateBasicCatalogDirent(), default_xattrs_,
949 entry->relative_parent_path(), *(entry->GetGraftChunks()));
950 } else {
951 catalog_manager_->AddFile(
952 entry->CreateBasicCatalogDirent(),
953 default_xattrs_, // TODO(bbockelm): For now, use default xattrs
954 // on grafted files.
955 entry->relative_parent_path());
956 }
957 } else {
958 // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
959 // the graft file is missing information. It's not clear that continuing
960 // forward with the publish is the correct thing to do; abort for now.
961 PANIC(kLogStderr,
962 "Encountered a grafted file (%s) with "
963 "invalid grafting information; check contents of .cvmfsgraft-*"
964 " file. Aborting publish.",
965 entry->GetRelativePath().c_str());
966 }
967 } else if (entry->relative_parent_path().empty() &&
968 entry->IsCatalogMarker()) {
969 PANIC(kLogStderr, "Error: nested catalog marker in root directory");
970 } else if (!params_->dry_run) {
971 {
972 // Push the file to the spooler, remember the entry for the path
973 MutexLockGuard m(&lock_file_queue_);
974 file_queue_[entry->GetUnionPath()] = entry;
975 }
976 // Spool the file
977 params_->spooler->Process(entry->CreateIngestionSource());
978 }
979
980 // publish statistics counting for new file
981 if (entry->IsNew()) {
982 if (entry->IsSymlink()) {
983 perf::Inc(counters_->n_symlinks_added);
984 } else {
985 perf::Inc(counters_->n_files_added);
986 perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
987 }
988 }
989 }
990
991 void SyncMediator::RemoveFile(SharedPtr<SyncItem> entry) {
992 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
993
994 if (!params_->dry_run) {
995 if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
996 LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
997 entry->GetUnionPath().c_str());
998 catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
999 }
1000 catalog_manager_->RemoveFile(entry->GetRelativePath());
1001 }
1002
1003 // Counting nr of removed files and removed bytes
1004 if (entry->WasSymlink()) {
1005 perf::Inc(counters_->n_symlinks_removed);
1006 } else {
1007 perf::Inc(counters_->n_files_removed);
1008 }
1009 perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
1010 }
1011
1012 void SyncMediator::AddUnmaterializedDirectory(SharedPtr<SyncItem> entry) {
1013 AddDirectory(entry);
1014 }
1015
1016 void SyncMediator::AddDirectory(SharedPtr<SyncItem> entry) {
1017 if (entry->IsBundleSpec()) {
1018 PANIC(kLogStderr, "Illegal directory name: .cvmfsbundles (%s). "
1019 ".cvmfsbundles is reserved for bundles specification files",
1020 entry->GetUnionPath().c_str());
1021 }
1022
1023 reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1024
1025 perf::Inc(counters_->n_directories_added);
1026 assert(!entry->HasGraftMarker());
1027 if (!params_->dry_run) {
1028 XattrList *xattrs = &default_xattrs_;
1029 if (params_->include_xattrs) {
1030 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1031 assert(xattrs);
1032 }
1033 catalog_manager_->AddDirectory(entry->CreateBasicCatalogDirent(), *xattrs,
1034 entry->relative_parent_path());
1035 if (xattrs != &default_xattrs_)
1036 free(xattrs);
1037 }
1038
1039 if (entry->HasCatalogMarker() &&
1040 !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1041 CreateNestedCatalog(entry);
1042 }
1043 }
1044
1045
1046 /**
1047 * this method deletes a single directory entry! Make sure to empty it
1048 * before you call this method or simply use
1049 * SyncMediator::RemoveDirectoryRecursively instead.
1050 */
1051 void SyncMediator::RemoveDirectory(SharedPtr<SyncItem> entry) {
1052 const std::string directory_path = entry->GetRelativePath();
1053
1054 if (catalog_manager_->IsTransitionPoint(directory_path)) {
1055 RemoveNestedCatalog(entry);
1056 }
1057
1058 reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1059 if (!params_->dry_run) {
1060 catalog_manager_->RemoveDirectory(directory_path);
1061 }
1062
1063 perf::Inc(counters_->n_directories_removed);
1064 }
1065
1066 void SyncMediator::TouchDirectory(SharedPtr<SyncItem> entry) {
1067 reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1068 catalog::DirectoryEntry());
1069
1070 const std::string directory_path = entry->GetRelativePath();
1071
1072 if (!params_->dry_run) {
1073 XattrList *xattrs = &default_xattrs_;
1074 if (params_->include_xattrs) {
1075 xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1076 assert(xattrs);
1077 }
1078 catalog_manager_->TouchDirectory(entry->CreateBasicCatalogDirent(), *xattrs,
1079 directory_path);
1080 if (xattrs != &default_xattrs_) free(xattrs);
1081 }
1082
1083 if (entry->HasCatalogMarker() &&
1084 !catalog_manager_->IsTransitionPoint(directory_path)) {
1085 CreateNestedCatalog(entry);
1086 } else if (!entry->HasCatalogMarker() &&
1087 catalog_manager_->IsTransitionPoint(directory_path)) {
1088 RemoveNestedCatalog(entry);
1089 }
1090 }
1091
1092 /**
1093 * All hardlinks in the current directory have been picked up. Now they are
1094 * added to the catalogs.
1095 */
1096 void SyncMediator::AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks) {
1097 assert(handle_hardlinks_);
1098
1099 for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1100 iEnd = hardlinks.end(); i != iEnd; ++i)
1101 {
1102 if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount() &&
1103 !params_->ignore_xdir_hardlinks) {
1104 PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1105 i->second.master->GetUnionPath().c_str());
1106 }
1107
1108 if (params_->print_changeset) {
1109 for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1110 jEnd = i->second.hardlinks.end(); j != jEnd; ++j)
1111 {
1112 std::string changeset_notice =
1113 GetParentPath(i->second.master->GetUnionPath()) + "/" +
1114 j->second->filename();
1115 reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1116 }
1117 }
1118
1119 if (params_->dry_run)
1120 continue;
1121
1122 if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1123 AddHardlinkGroup(i->second);
1124 else
1125 hardlink_queue_.push_back(i->second);
1126 }
1127 }
1128
1129
1130 void SyncMediator::AddHardlinkGroup(const HardlinkGroup &group) {
1131 assert(handle_hardlinks_);
1132
1133 // Create a DirectoryEntry list out of the hardlinks
1134 catalog::DirectoryEntryBaseList hardlinks;
1135 for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1136 iEnd = group.hardlinks.end(); i != iEnd; ++i)
1137 {
1138 hardlinks.push_back(i->second->CreateBasicCatalogDirent());
1139 }
1140 XattrList *xattrs = &default_xattrs_;
1141 if (params_->include_xattrs) {
1142 xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1143 assert(xattrs);
1144 }
1145 catalog_manager_->AddHardlinkGroup(
1146 hardlinks,
1147 *xattrs,
1148 group.master->relative_parent_path(),
1149 group.file_chunks);
1150 if (xattrs != &default_xattrs_)
1151 free(xattrs);
1152 }
1153
1154 } // namespace publish
1155