CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sync_mediator.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "sync_mediator.h"
8 
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12 
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16 
17 #include "catalog_virtual.h"
18 #include "compression.h"
19 #include "directory_entry.h"
20 #include "fs_traversal.h"
21 #include "hash.h"
22 #include "publish/repository.h"
23 #include "smalloc.h"
24 #include "sync_union.h"
25 #include "upload.h"
26 #include "util/exception.h"
27 #include "util/posix.h"
28 #include "util/string.h"
29 #include "util_concurrency.h"
30 
31 using namespace std; // NOLINT
32 
33 namespace publish {
34 
35 AbstractSyncMediator::~AbstractSyncMediator() {}
36 
37 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
38  const SyncParameters *params,
39  perf::StatisticsTemplate statistics)
40  : catalog_manager_(catalog_manager)
41  , union_engine_(NULL)
42  , handle_hardlinks_(false)
43  , params_(params)
44  , reporter_(new SyncDiffReporter(params_->print_changeset
45  ? SyncDiffReporter::kPrintChanges
46  : SyncDiffReporter::kPrintDots)) {
47  int retval = pthread_mutex_init(&lock_file_queue_, NULL);
48  assert(retval == 0);
49 
50  params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
51 
52  counters_ = new perf::FsCounters(statistics);
53 }
54 
56  pthread_mutex_destroy(&lock_file_queue_);
57 }
58 
59 
61  union_engine_ = engine;
63 }
64 
65 
72  const bool ignore_case_setting = false;
73  string relative_path = entry->GetRelativePath();
74  if ( (relative_path == string(catalog::VirtualCatalog::kVirtualPath)) ||
75  (HasPrefix(relative_path,
77  ignore_case_setting)) )
78  {
79  PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
80  relative_path.c_str());
81  }
82 }
83 
84 
90  EnsureAllowed(entry);
91 
92  if (entry->IsDirectory()) {
94  return;
95  }
96 
97  if (entry->IsRegularFile() || entry->IsSymlink()) {
98  // A file is a hard link if the link count is greater than 1
99  if (entry->HasHardlinks() && handle_hardlinks_)
100  InsertHardlink(entry);
101  else
102  AddFile(entry);
103  return;
104  } else if (entry->IsGraftMarker()) {
105  LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
106  return; // Ignore markers.
107  }
108 
109  // In OverlayFS whiteouts can be represented as character devices with major
110  // and minor numbers equal to 0. Special files will be ignored except if they
111  // are whiteout files.
112  if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
114  PrintWarning("'" + entry->GetRelativePath() + "' "
115  "is a special file, ignoring.");
116  } else {
117  if (entry->HasHardlinks() && handle_hardlinks_)
118  InsertHardlink(entry);
119  else
120  AddFile(entry);
121  }
122  return;
123  }
124 
125  PrintWarning("'" + entry->GetRelativePath() +
126  "' cannot be added. Unrecognized file type.");
127 }
128 
129 
134  EnsureAllowed(entry);
135 
136  if (entry->IsGraftMarker()) {return;}
137  if (entry->IsDirectory()) {
138  TouchDirectory(entry);
139  perf::Inc(counters_->n_directories_changed);
140  return;
141  }
142 
143  if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
144  Replace(entry); // This way, hardlink processing is correct
145  // Replace calls Remove; cancel Remove's actions:
146  perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
147 
148  // Count only the diference between the old and new file
149  // Symlinks do not count into added or removed bytes
150  int64_t dif = 0;
151 
152  // Need to handle 4 cases (symlink->symlink, symlink->regular,
153  // regular->symlink, regular->regular)
154  if (entry->WasSymlink()) {
155  // Replace calls Remove; cancel Remove's actions:
156  perf::Dec(counters_->n_symlinks_removed);
157 
158  if (entry->IsSymlink()) {
159  perf::Inc(counters_->n_symlinks_changed);
160  } else {
161  perf::Inc(counters_->n_symlinks_removed);
162  perf::Inc(counters_->n_files_added);
163  dif += entry->GetScratchSize();
164  }
165  } else {
166  // Replace calls Remove; cancel Remove's actions:
167  perf::Dec(counters_->n_files_removed);
168  dif -= entry->GetRdOnlySize();
169  if (entry->IsSymlink()) {
170  perf::Inc(counters_->n_files_removed);
171  perf::Inc(counters_->n_symlinks_added);
172  } else {
173  perf::Inc(counters_->n_files_changed);
174  dif += entry->GetScratchSize();
175  }
176  }
177 
178  if (dif > 0) { // added bytes
179  perf::Xadd(counters_->sz_added_bytes, dif);
180  } else { // removed bytes
181  perf::Xadd(counters_->sz_removed_bytes, -dif);
182  }
183  return;
184  }
185 
186  PrintWarning("'" + entry->GetRelativePath() +
187  "' cannot be touched. Unrecognized file type.");
188 }
189 
190 
195  EnsureAllowed(entry);
196 
197  if (entry->WasDirectory()) {
199  return;
200  }
201 
202  if (entry->WasRegularFile() || entry->WasSymlink() ||
203  entry->WasSpecialFile()) {
204  RemoveFile(entry);
205  return;
206  }
207 
208  PrintWarning("'" + entry->GetRelativePath() +
209  "' cannot be deleted. Unrecognized file type.");
210 }
211 
212 
217  // EnsureAllowed(entry); <-- Done by Remove() and Add()
218  Remove(entry);
219  Add(entry);
220 }
221 
222 void SyncMediator::Clone(const std::string from, const std::string to) {
223  catalog_manager_->Clone(from, to);
224 }
225 
227  if (!handle_hardlinks_) {
228  return;
229  }
230 
231  HardlinkGroupMap new_map;
232  hardlink_stack_.push(new_map);
233 }
234 
235 
237 {
238  if (!handle_hardlinks_) {
239  return;
240  }
241 
242  CompleteHardlinks(entry);
244  hardlink_stack_.pop();
245 }
246 
247 
253  reporter_->CommitReport();
254 
255  if (!params_->dry_run) {
257  "Waiting for upload of files before committing...");
258  params_->spooler->WaitForUpload();
259  }
260 
261  if (!hardlink_queue_.empty()) {
263 
264  LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
265  params_->spooler->UnregisterListeners();
267  this);
268 
269  // TODO(rmeusel): Revise that for Thread Safety!
270  // This loop will spool hardlinks into the spooler, which will then
271  // process them.
272  // On completion of every hardlink the spooler will asynchronously
273  // emit callbacks (SyncMediator::PublishHardlinksCallback) which
274  // might happen while this for-loop goes through the hardlink_queue_
275  //
276  // For the moment this seems not to be a problem, but it's an accident
277  // just waiting to happen.
278  //
279  // Note: Just wrapping this loop in a mutex might produce a dead lock
280  // since the spooler does not fill it's processing queue to an
281  // unlimited size. Meaning that it might be flooded with hard-
282  // links and waiting for the queue to be processed while proces-
283  // sing is stalled because the callback is waiting for this
284  // mutex.
285  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
286  iEnd = hardlink_queue_.end(); i != iEnd; ++i)
287  {
288  LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
289  i->master->GetUnionPath().c_str());
290  IngestionSource *source =
291  new FileIngestionSource(i->master->GetUnionPath());
292  params_->spooler->Process(source);
293  }
294 
295  params_->spooler->WaitForUpload();
296 
297  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
298  iEnd = hardlink_queue_.end(); i != iEnd; ++i)
299  {
300  LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
301  i->master->GetUnionPath().c_str());
302  AddHardlinkGroup(*i);
303  }
304  }
305 
307 
308  params_->spooler->UnregisterListeners();
309 
310  if (params_->dry_run) {
311  manifest = NULL;
312  return true;
313  }
314 
315  LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
316  if (params_->spooler->GetNumberOfErrors() > 0) {
317  LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
318  return false;
319  }
320 
323  {
326  // Commit empty string to ensure that the "content" of the auto catalog
327  // markers is present in the repository.
328  string empty_file = CreateTempPath(params_->dir_temp + "/empty", 0600);
329  IngestionSource* source = new FileIngestionSource(empty_file);
330  params_->spooler->Process(source);
331  params_->spooler->WaitForUpload();
332  unlink(empty_file.c_str());
333  if (params_->spooler->GetNumberOfErrors() > 0) {
334  LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
335  return false;
336  }
337  }
341  manifest);
342 }
343 
344 
347 
348  uint64_t inode = entry->GetUnionInode();
349  LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
350  inode, entry->GetUnionPath().c_str());
351 
352  // Find the hard link group in the lists
353  HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(inode);
354 
355  if (hardlink_group == GetHardlinkMap().end()) {
356  // Create a new hardlink group
357  GetHardlinkMap().insert(
358  HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
359  } else {
360  // Append the file to the appropriate hardlink group
361  hardlink_group->second.AddHardlink(entry);
362  }
363 
364  // publish statistics counting for new file
365  if (entry->IsNew()) {
366  perf::Inc(counters_->n_files_added);
367  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
368  }
369 }
370 
371 
373  // Check if found file has hardlinks (nlink > 1)
374  // As we are looking through all files in one directory here, there might be
375  // completely untouched hardlink groups, which we can safely skip.
376  // Finally we have to see if the hardlink is already part of this group
377 
379 
380  if (entry->GetUnionLinkcount() < 2)
381  return;
382 
383  uint64_t inode = entry->GetUnionInode();
384  HardlinkGroupMap::iterator hl_group;
385  hl_group = GetHardlinkMap().find(inode);
386 
387  if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
388  bool found = false;
389 
390  // search for the entry in this group
391  for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
392  iEnd = hl_group->second.hardlinks.end(); i != iEnd; ++i)
393  {
394  if (*(i->second) == *entry) {
395  found = true;
396  break;
397  }
398  }
399 
400  if (!found) {
401  // Hardlink already in the group?
402  // If one element of a hardlink group is edited, all elements must be
403  // replaced. Here, we remove an untouched hardlink and add it to its
404  // hardlink group for re-adding later
405  LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
406  entry->GetUnionPath().c_str());
407  Remove(entry);
408  hl_group->second.AddHardlink(entry);
409  }
410  }
411 }
412 
413 
422 
423  // If no hardlink in this directory was changed, we can skip this
424  if (GetHardlinkMap().empty())
425  return;
426 
427  LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
428  entry->GetUnionPath().c_str());
429 
430  // Look for legacy hardlinks
432  false);
433  traversal.fn_new_file =
435  traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
436  traversal.fn_new_character_dev =
438  traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
439  traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
440  traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
441  traversal.Recurse(entry->GetUnionPath());
442 }
443 
444 
445 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
446  const string &file_name)
447 {
448  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
449  InsertLegacyHardlink(entry);
450 }
451 
452 
453 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
454  const string &file_name)
455 {
456  SharedPtr<SyncItem> entry =
457  CreateSyncItem(parent_dir, file_name, kItemSymlink);
458  InsertLegacyHardlink(entry);
459 }
460 
462  const string &parent_dir,
463  const string &file_name)
464 {
465  SharedPtr<SyncItem> entry =
466  CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
467  InsertLegacyHardlink(entry);
468 }
469 
471  const string &parent_dir,
472  const string &file_name)
473 {
474  SharedPtr<SyncItem> entry =
475  CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
476  InsertLegacyHardlink(entry);
477 }
478 
479 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
480  const string &file_name)
481 {
482  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
483  InsertLegacyHardlink(entry);
484 }
485 
486 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
487  const string &file_name)
488 {
489  SharedPtr<SyncItem> entry =
490  CreateSyncItem(parent_dir, file_name, kItemSocket);
491  InsertLegacyHardlink(entry);
492 }
493 
494 
496  AddDirectory(entry);
497 
498  // Create a recursion engine, which recursively adds all entries in a newly
499  // created directory
501  this, union_engine_->scratch_path(), true);
503  traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
504  traversal.fn_new_file = &SyncMediator::AddFileCallback;
505  traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
506  traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
507  traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
508  traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
509  traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
510  traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
511  traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
512  traversal.Recurse(entry->GetScratchPath());
513 }
514 
515 
516 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
517  const std::string &dir_name)
518 {
519  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
520  AddDirectory(entry);
521  return true; // The recursion engine should recurse deeper here
522 }
523 
524 
525 void SyncMediator::AddFileCallback(const std::string &parent_dir,
526  const std::string &file_name)
527 {
528  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
529  Add(entry);
530 }
531 
532 
533 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
534  const std::string &file_name)
535 {
536  SharedPtr<SyncItem> entry =
537  CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
538  Add(entry);
539 }
540 
541 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
542  const std::string &file_name)
543 {
544  SharedPtr<SyncItem> entry =
545  CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
546  Add(entry);
547 }
548 
549 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
550  const std::string &file_name)
551 {
552  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
553  Add(entry);
554 }
555 
556 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
557  const std::string &file_name) {
558  SharedPtr<SyncItem> entry =
559  CreateSyncItem(parent_dir, file_name, kItemSocket);
560  Add(entry);
561 }
562 
563 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
564  const std::string &link_name)
565 {
566  SharedPtr<SyncItem> entry =
567  CreateSyncItem(parent_dir, link_name, kItemSymlink);
568  Add(entry);
569 }
570 
571 
572 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
573  const std::string &dir_name)
574 {
575  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
576  EnterDirectory(entry);
577 }
578 
579 
580 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
581  const std::string &dir_name)
582 {
583  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
584  LeaveDirectory(entry);
585 }
586 
587 
589  // Delete a directory AFTER it was emptied here,
590  // because it would start up another recursion
591 
592  const bool recurse = false;
594  this, union_engine_->rdonly_path(), recurse);
596  traversal.fn_new_dir_postfix =
598  traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
599  traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
600  traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
601  traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
602  traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
603  traversal.Recurse(entry->GetRdOnlyPath());
604 
605  // The given directory was emptied recursively and can now itself be deleted
606  RemoveDirectory(entry);
607 }
608 
609 
610 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
611  const std::string &file_name)
612 {
613  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
614  Remove(entry);
615 }
616 
617 
618 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
619  const std::string &link_name)
620 {
621  SharedPtr<SyncItem> entry =
622  CreateSyncItem(parent_dir, link_name, kItemSymlink);
623  Remove(entry);
624 }
625 
627  const std::string &parent_dir,
628  const std::string &link_name)
629 {
630  SharedPtr<SyncItem> entry =
631  CreateSyncItem(parent_dir, link_name, kItemCharacterDevice);
632  Remove(entry);
633 }
634 
636  const std::string &parent_dir,
637  const std::string &link_name)
638 {
639  SharedPtr<SyncItem> entry =
640  CreateSyncItem(parent_dir, link_name, kItemBlockDevice);
641  Remove(entry);
642 }
643 
644 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
645  const std::string &link_name)
646 {
647  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name, kItemFifo);
648  Remove(entry);
649 }
650 
651 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
652  const std::string &link_name)
653 {
654  SharedPtr<SyncItem> entry =
655  CreateSyncItem(parent_dir, link_name, kItemSocket);
656  Remove(entry);
657 }
658 
659 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
660  const std::string &dir_name)
661 {
662  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
664 }
665 
666 
667 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
668  const std::string &file_name)
669 {
670  if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
671  return true;
672  }
673 
674  SharedPtr<SyncItem> entry =
675  CreateSyncItem(parent_dir, file_name, kItemUnknown);
676  return entry->IsWhiteout();
677 }
678 
680  const std::string &relative_parent_path, const std::string &filename,
681  const SyncItemType entry_type) const {
682  return union_engine_->CreateSyncItem(relative_parent_path, filename,
683  entry_type);
684 }
685 
688  "Spooler callback for %s, digest %s, produced %d chunks, retval %d",
689  result.local_path.c_str(),
690  result.content_hash.ToString().c_str(),
691  result.file_chunks.size(),
692  result.return_code);
693  if (result.return_code != 0) {
694  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
695  result.return_code);
696  }
697 
698  SyncItemList::iterator itr;
699  {
701  itr = file_queue_.find(result.local_path);
702  }
703 
704  assert(itr != file_queue_.end());
705 
706  SyncItem &item = *itr->second;
707  item.SetContentHash(result.content_hash);
708  item.SetCompressionAlgorithm(result.compression_alg);
709 
710  XattrList *xattrs = &default_xattrs_;
711  if (params_->include_xattrs) {
712  xattrs = XattrList::CreateFromFile(result.local_path);
713  assert(xattrs != NULL);
714  }
715 
716  if (result.IsChunked()) {
718  item.CreateBasicCatalogDirent(),
719  *xattrs,
720  item.relative_parent_path(),
721  result.file_chunks);
722  } else {
724  item.CreateBasicCatalogDirent(),
725  *xattrs,
726  item.relative_parent_path());
727  }
728 
729  if (xattrs != &default_xattrs_)
730  free(xattrs);
731 }
732 
733 
735  const upload::SpoolerResult &result)
736 {
738  "Spooler callback for hardlink %s, digest %s, retval %d",
739  result.local_path.c_str(),
740  result.content_hash.ToString().c_str(),
741  result.return_code);
742  if (result.return_code != 0) {
743  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
744  result.return_code);
745  }
746 
747  bool found = false;
748  for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
749  if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
750  found = true;
751  hardlink_queue_[i].master->SetContentHash(result.content_hash);
752  SyncItemList::iterator j, jend;
753  for (j = hardlink_queue_[i].hardlinks.begin(),
754  jend = hardlink_queue_[i].hardlinks.end();
755  j != jend; ++j)
756  {
757  j->second->SetContentHash(result.content_hash);
758  j->second->SetCompressionAlgorithm(result.compression_alg);
759  }
760  if (result.IsChunked())
761  hardlink_queue_[i].file_chunks = result.file_chunks;
762 
763  break;
764  }
765  }
766 
767  assert(found);
768 }
769 
770 
772  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
773  reporter_->OnAdd(notice, catalog::DirectoryEntry());
774 
775  if (!params_->dry_run) {
776  catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
777  }
778 }
779 
780 
782  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
783  reporter_->OnRemove(notice, catalog::DirectoryEntry());
784 
785  if (!params_->dry_run) {
786  catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
787  }
788 }
789 
791  const history::History::Tag & /*to_tag*/) {}
792 
794 
795 void SyncDiffReporter::OnAdd(const std::string &path,
796  const catalog::DirectoryEntry & /*entry*/) {
797  changed_items_++;
798  AddImpl(path);
799 }
800 void SyncDiffReporter::OnRemove(const std::string &path,
801  const catalog::DirectoryEntry & /*entry*/) {
802  changed_items_++;
803  RemoveImpl(path);
804 }
805 void SyncDiffReporter::OnModify(const std::string &path,
806  const catalog::DirectoryEntry & /*entry_from*/,
807  const catalog::DirectoryEntry & /*entry_to*/) {
808  changed_items_++;
809  ModifyImpl(path);
810 }
811 
813  if (print_action_ == kPrintDots) {
816  }
817  }
818 }
819 
823  }
824 }
825 
826 void SyncDiffReporter::AddImpl(const std::string &path) {
827  const char *action_label;
828 
829  switch (print_action_) {
830  case kPrintChanges:
831  if (path.at(0) != '/') {
832  action_label = "[x-catalog-add]";
833  } else {
834  action_label = "[add]";
835  }
836  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
837  break;
838 
839  case kPrintDots:
840  PrintDots();
841  break;
842  default:
843  assert("Invalid print action.");
844  }
845 }
846 
847 void SyncDiffReporter::RemoveImpl(const std::string &path) {
848  const char *action_label;
849 
850  switch (print_action_) {
851  case kPrintChanges:
852  if (path.at(0) != '/') {
853  action_label = "[x-catalog-rem]";
854  } else {
855  action_label = "[rem]";
856  }
857 
858  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
859  break;
860 
861  case kPrintDots:
862  PrintDots();
863  break;
864  default:
865  assert("Invalid print action.");
866  }
867 }
868 
869 void SyncDiffReporter::ModifyImpl(const std::string &path) {
870  const char *action_label;
871 
872  switch (print_action_) {
873  case kPrintChanges:
874  action_label = "[mod]";
875  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
876  break;
877 
878  case kPrintDots:
879  PrintDots();
880  break;
881  default:
882  assert("Invalid print action.");
883  }
884 }
885 
887  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
888 
889  if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
890  assert(!entry->HasGraftMarker());
891  // Symlinks and special files are completely stored in the catalog
892  XattrList *xattrs = &default_xattrs_;
893  if (params_->include_xattrs) {
894  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
895  assert(xattrs);
896  }
897  catalog_manager_->AddFile(entry->CreateBasicCatalogDirent(), *xattrs,
898  entry->relative_parent_path());
899  if (xattrs != &default_xattrs_)
900  free(xattrs);
901  } else if (entry->HasGraftMarker() && !params_->dry_run) {
902  if (entry->IsValidGraft()) {
903  // Graft files are added to catalog immediately.
904  if (entry->IsChunkedGraft()) {
906  entry->CreateBasicCatalogDirent(), default_xattrs_,
907  entry->relative_parent_path(), *(entry->GetGraftChunks()));
908  } else {
910  entry->CreateBasicCatalogDirent(),
911  default_xattrs_, // TODO(bbockelm): For now, use default xattrs
912  // on grafted files.
913  entry->relative_parent_path());
914  }
915  } else {
916  // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
917  // the graft file is missing information. It's not clear that continuing
918  // forward with the publish is the correct thing to do; abort for now.
920  "Encountered a grafted file (%s) with "
921  "invalid grafting information; check contents of .cvmfsgraft-*"
922  " file. Aborting publish.",
923  entry->GetRelativePath().c_str());
924  }
925  } else if (entry->relative_parent_path().empty() &&
926  entry->IsCatalogMarker()) {
927  PANIC(kLogStderr, "Error: nested catalog marker in root directory");
928  } else if (!params_->dry_run) {
929  {
930  // Push the file to the spooler, remember the entry for the path
932  file_queue_[entry->GetUnionPath()] = entry;
933  }
934  // Spool the file
935  params_->spooler->Process(entry->CreateIngestionSource());
936  }
937 
938  // publish statistics counting for new file
939  if (entry->IsNew()) {
940  if (entry->IsSymlink()) {
941  perf::Inc(counters_->n_symlinks_added);
942  } else {
943  perf::Inc(counters_->n_files_added);
944  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
945  }
946  }
947 }
948 
950  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
951 
952  if (!params_->dry_run) {
953  if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
954  LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
955  entry->GetUnionPath().c_str());
956  catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
957  }
958  catalog_manager_->RemoveFile(entry->GetRelativePath());
959  }
960 
961  // Counting nr of removed files and removed bytes
962  if (entry->WasSymlink()) {
963  perf::Inc(counters_->n_symlinks_removed);
964  } else {
965  perf::Inc(counters_->n_files_removed);
966  }
967  perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
968 }
969 
971  AddDirectory(entry);
972 }
973 
975  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
976 
977  perf::Inc(counters_->n_directories_added);
978  assert(!entry->HasGraftMarker());
979  if (!params_->dry_run) {
980  XattrList *xattrs = &default_xattrs_;
981  if (params_->include_xattrs) {
982  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
983  assert(xattrs);
984  }
985  catalog_manager_->AddDirectory(entry->CreateBasicCatalogDirent(), *xattrs,
986  entry->relative_parent_path());
987  if (xattrs != &default_xattrs_)
988  free(xattrs);
989  }
990 
991  if (entry->HasCatalogMarker() &&
992  !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
993  CreateNestedCatalog(entry);
994  }
995 }
996 
997 
1004  const std::string directory_path = entry->GetRelativePath();
1005 
1006  if (catalog_manager_->IsTransitionPoint(directory_path)) {
1007  RemoveNestedCatalog(entry);
1008  }
1009 
1010  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1011  if (!params_->dry_run) {
1012  catalog_manager_->RemoveDirectory(directory_path);
1013  }
1014 
1015  perf::Inc(counters_->n_directories_removed);
1016 }
1017 
1019  reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1021 
1022  const std::string directory_path = entry->GetRelativePath();
1023 
1024  if (!params_->dry_run) {
1025  XattrList *xattrs = &default_xattrs_;
1026  if (params_->include_xattrs) {
1027  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1028  assert(xattrs);
1029  }
1030  catalog_manager_->TouchDirectory(entry->CreateBasicCatalogDirent(), *xattrs,
1031  directory_path);
1032  if (xattrs != &default_xattrs_) free(xattrs);
1033  }
1034 
1035  if (entry->HasCatalogMarker() &&
1036  !catalog_manager_->IsTransitionPoint(directory_path)) {
1037  CreateNestedCatalog(entry);
1038  } else if (!entry->HasCatalogMarker() &&
1039  catalog_manager_->IsTransitionPoint(directory_path)) {
1040  RemoveNestedCatalog(entry);
1041  }
1042 }
1043 
1050 
1051  for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1052  iEnd = hardlinks.end(); i != iEnd; ++i)
1053  {
1054  if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount() &&
1056  PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1057  i->second.master->GetUnionPath().c_str());
1058  }
1059 
1060  if (params_->print_changeset) {
1061  for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1062  jEnd = i->second.hardlinks.end(); j != jEnd; ++j)
1063  {
1064  std::string changeset_notice =
1065  GetParentPath(i->second.master->GetUnionPath()) + "/" +
1066  j->second->filename();
1067  reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1068  }
1069  }
1070 
1071  if (params_->dry_run)
1072  continue;
1073 
1074  if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1075  AddHardlinkGroup(i->second);
1076  else
1077  hardlink_queue_.push_back(i->second);
1078  }
1079 }
1080 
1081 
1084 
1085  // Create a DirectoryEntry list out of the hardlinks
1087  for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1088  iEnd = group.hardlinks.end(); i != iEnd; ++i)
1089  {
1090  hardlinks.push_back(i->second->CreateBasicCatalogDirent());
1091  }
1092  XattrList *xattrs = &default_xattrs_;
1093  if (params_->include_xattrs) {
1094  xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1095  assert(xattrs);
1096  }
1098  hardlinks,
1099  *xattrs,
1100  group.master->relative_parent_path(),
1101  group.file_chunks);
1102  if (xattrs != &default_xattrs_)
1103  free(xattrs);
1104 }
1105 
1106 } // namespace publish
bool Commit(manifest::Manifest *manifest)
int return_code
the return value of the spooler operation
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
void RemoveSocketCallback(const std::string &parent_dir, const std::string &link_name)
void ModifyImpl(const std::string &path)
void Dec(class Counter *counter)
Definition: statistics.h:49
void AddChunkedFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
const manifest::Manifest * manifest() const
Definition: repository.h:123
void AddDirectoryRecursively(SharedPtr< SyncItem > entry)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
static const unsigned kActionNone
void AddHardlinkGroup(const DirectoryEntryBaseList &entries, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
HardlinkGroupMapStack hardlink_stack_
void AddBlockDeviceCallback(const std::string &parent_dir, const std::string &file_name)
FileChunkList file_chunks
the file chunks generated during processing
void AddDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
virtual bool SupportsHardlinks() const
Definition: sync_union.h:140
void RemoveFile(SharedPtr< SyncItem > entry)
void CompleteHardlinks(SharedPtr< SyncItem > entry)
void RemoveNestedCatalog(SharedPtr< SyncItem > directory)
void RemoveDirectory(const std::string &directory_path)
#define PANIC(...)
Definition: exception.h:26
void CreateNestedCatalog(SharedPtr< SyncItem > directory)
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:245
UniquePtr< SyncDiffReporter > reporter_
virtual void OnInit(const history::History::Tag &from_tag, const history::History::Tag &to_tag)
virtual void OnModify(const std::string &path, const catalog::DirectoryEntry &entry_from, const catalog::DirectoryEntry &entry_to)
void RemoveDirectoryRecursively(SharedPtr< SyncItem > entry)
HardlinkGroupMap & GetHardlinkMap()
void LegacyCharacterDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void Replace(SharedPtr< SyncItem > entry)
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1054
std::string scratch_path() const
Definition: sync_union.h:98
void PublishHardlinksCallback(const upload::SpoolerResult &result)
void EnterDirectory(SharedPtr< SyncItem > entry)
A simple recursion engine to abstract the recursion of directories. It provides several callback hook...
Definition: fs_traversal.h:37
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
UniquePtr< perf::FsCounters > counters_
assert((mem||(size==0))&&"Out Of Memory")
bool Commit(const bool stop_for_tweaks, const uint64_t manual_revision, manifest::Manifest *manifest)
void LegacySymlinkHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddDirectory(SharedPtr< SyncItem > entry)
void RemoveImpl(const std::string &path)
HardlinkGroupList hardlink_queue_
void RemoveCharacterDeviceCallback(const std::string &parent_dir, const std::string &link_name)
std::string GetParentPath(const std::string &path)
Definition: posix.cc:129
void AddFileCallback(const std::string &parent_dir, const std::string &file_name)
virtual void PostUpload()
Definition: sync_union.h:81
void LegacySocketHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
Definition: sync_union.cc:30
void RemoveFileCallback(const std::string &parent_dir, const std::string &file_name)
void PublishFilesCallback(const upload::SpoolerResult &result)
bool IsTransitionPoint(const std::string &mountpoint)
void Clone(const std::string from, const std::string to)
void TouchDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &directory_path)
void AddFile(SharedPtr< SyncItem > entry)
std::string dir_temp
void InsertLegacyHardlink(SharedPtr< SyncItem > entry)
void AddUnmaterializedDirectory(SharedPtr< SyncItem > entry)
std::string local_path
the local_path previously given as input
unsigned int processing_dot_interval_
Definition: sync_mediator.h:88
std::string union_path() const
Definition: sync_union.h:97
VoidCallback fn_new_file
Definition: fs_traversal.h:47
zlib::Algorithms compression_alg
SyncUnion * union_engine_
void ShrinkHardlinkGroup(const std::string &remove_path)
SyncItemList file_queue_
const SyncParameters * params_
VoidCallback fn_enter_dir
Definition: fs_traversal.h:45
virtual bool IgnoreFilePredicate(const std::string &parent_dir, const std::string &filename)
Definition: sync_union.cc:57
void RemoveFifoCallback(const std::string &parent_dir, const std::string &link_name)
virtual void OnStats(const catalog::DeltaCounters &delta)
upload::Spooler * spooler
void AddImpl(const std::string &path)
void AddSocketCallback(const std::string &parent_dir, const std::string &file_name)
void Touch(SharedPtr< SyncItem > entry)
void AddFifoCallback(const std::string &parent_dir, const std::string &file_name)
void AddSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
void EnterAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void PrintWarning(const string &message)
Definition: logging.cc:469
void Add(SharedPtr< SyncItem > entry)
void Inc(class Counter *counter)
Definition: statistics.h:50
bool AddDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:264
void Clone(const std::string from, const std::string to)
bool IgnoreFileCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveFile(const std::string &file_path)
bool stop_for_catalog_tweaks
void RemoveNestedCatalog(const std::string &mountpoint, const bool merge=true)
uint64_t manual_revision
void LegacyFifoHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddCharacterDeviceCallback(const std::string &parent_dir, const std::string &file_name)
void AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks)
pthread_mutex_t lock_file_queue_
void InsertHardlink(SharedPtr< SyncItem > entry)
catalog::WritableCatalogManager * catalog_manager_
void LeaveDirectory(SharedPtr< SyncItem > entry)
virtual void OnRemove(const std::string &path, const catalog::DirectoryEntry &entry)
SyncItemType
Definition: sync_item.h:29
void SetContentHash(const shash::Any &hash)
Definition: sync_item.h:116
void RemoveSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
std::vector< DirectoryEntryBase > DirectoryEntryBaseList
void RemoveBlockDeviceCallback(const std::string &parent_dir, const std::string &link_name)
static XattrList * CreateFromFile(const std::string &path)
Definition: xattr.cc:30
const int kLogVerboseMsg
unsigned virtual_dir_actions
void LegacyBlockDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void TouchDirectory(SharedPtr< SyncItem > entry)
static const char * kVirtualPath
bool ignore_xdir_hardlinks
void AddFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
void EnsureAllowed(SharedPtr< SyncItem > entry)
void LegacyRegularHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RegisterUnionEngine(SyncUnion *engine)
void AddHardlinkGroup(const HardlinkGroup &group)
std::string rdonly_path() const
Definition: sync_union.h:96
void RemoveDirectory(SharedPtr< SyncItem > entry)
std::map< uint64_t, HardlinkGroup > HardlinkGroupMap
void CreateNestedCatalog(const std::string &mountpoint)
size_t size() const
Definition: bigvector.h:100
virtual void OnAdd(const std::string &path, const catalog::DirectoryEntry &entry)
void Remove(SharedPtr< SyncItem > entry)
void LeaveAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)