CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sync_mediator.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "sync_mediator.h"
8 
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12 
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16 
17 #include "catalog_virtual.h"
18 #include "compression.h"
19 #include "directory_entry.h"
20 #include "fs_traversal.h"
21 #include "hash.h"
22 #include "json_document.h"
23 #include "publish/repository.h"
24 #include "smalloc.h"
25 #include "sync_union.h"
26 #include "upload.h"
27 #include "util/exception.h"
28 #include "util/posix.h"
29 #include "util/string.h"
30 #include "util_concurrency.h"
31 
32 using namespace std; // NOLINT
33 
34 namespace publish {
35 
36 AbstractSyncMediator::~AbstractSyncMediator() {}
37 
38 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
39  const SyncParameters *params,
40  perf::StatisticsTemplate statistics)
41  : catalog_manager_(catalog_manager)
42  , union_engine_(NULL)
43  , handle_hardlinks_(false)
44  , params_(params)
45  , reporter_(new SyncDiffReporter(params_->print_changeset
46  ? SyncDiffReporter::kPrintChanges
47  : SyncDiffReporter::kPrintDots)) {
48  int retval = pthread_mutex_init(&lock_file_queue_, NULL);
49  assert(retval == 0);
50 
51  params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
52 
53  counters_ = new perf::FsCounters(statistics);
54 }
55 
57  pthread_mutex_destroy(&lock_file_queue_);
58 }
59 
60 
62  union_engine_ = engine;
64 }
65 
66 
73  const bool ignore_case_setting = false;
74  string relative_path = entry->GetRelativePath();
75  if ( (relative_path == string(catalog::VirtualCatalog::kVirtualPath)) ||
76  (HasPrefix(relative_path,
78  ignore_case_setting)) )
79  {
80  PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
81  relative_path.c_str());
82  }
83 }
84 
85 
91  EnsureAllowed(entry);
92 
93  if (entry->IsDirectory()) {
95  return;
96  }
97 
98  // .cvmfsbundles file type
99  if (entry->IsBundleSpec()) {
100  PrintWarning(".cvmfsbundles file encountered. "
101  "Bundles is currently an experimental feature.");
102 
103  if (!entry->IsRegularFile()) {
104  PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
105  }
106  if (entry->HasHardlinks()) {
107  PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
108  }
109 
110  std::string parent_path = GetParentPath(entry->GetUnionPath());
111  if (parent_path != union_engine_->union_path()) {
112  PANIC(kLogStderr, "Error: .cvmfsbundles file must be in the root"
113  " directory of the repository. Found in %s", parent_path.c_str());
114  }
115 
116  std::string json_string;
117 
118  int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
119  if (fd < 0) {
120  PANIC(kLogStderr, "Could not open file: %s",
121  entry->GetUnionPath().c_str());
122  }
123  if (!SafeReadToString(fd, &json_string)) {
124  PANIC(kLogStderr, "Could not read contents of file: %s",
125  entry->GetUnionPath().c_str());
126  }
128 
129  AddFile(entry);
130  return;
131  }
132 
133  if (entry->IsRegularFile() || entry->IsSymlink()) {
134  // A file is a hard link if the link count is greater than 1
135  if (entry->HasHardlinks() && handle_hardlinks_)
136  InsertHardlink(entry);
137  else
138  AddFile(entry);
139  return;
140  } else if (entry->IsGraftMarker()) {
141  LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
142  return; // Ignore markers.
143  }
144 
145  // In OverlayFS whiteouts can be represented as character devices with major
146  // and minor numbers equal to 0. Special files will be ignored except if they
147  // are whiteout files.
148  if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
150  PrintWarning("'" + entry->GetRelativePath() + "' "
151  "is a special file, ignoring.");
152  } else {
153  if (entry->HasHardlinks() && handle_hardlinks_)
154  InsertHardlink(entry);
155  else
156  AddFile(entry);
157  }
158  return;
159  }
160 
161  PrintWarning("'" + entry->GetRelativePath() +
162  "' cannot be added. Unrecognized file type.");
163 }
164 
165 
170  EnsureAllowed(entry);
171 
172  if (entry->IsGraftMarker()) {return;}
173  if (entry->IsDirectory()) {
174  TouchDirectory(entry);
175  perf::Inc(counters_->n_directories_changed);
176  return;
177  }
178 
179  if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
180  Replace(entry); // This way, hardlink processing is correct
181  // Replace calls Remove; cancel Remove's actions:
182  perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
183 
184  // Count only the diference between the old and new file
185  // Symlinks do not count into added or removed bytes
186  int64_t dif = 0;
187 
188  // Need to handle 4 cases (symlink->symlink, symlink->regular,
189  // regular->symlink, regular->regular)
190  if (entry->WasSymlink()) {
191  // Replace calls Remove; cancel Remove's actions:
192  perf::Dec(counters_->n_symlinks_removed);
193 
194  if (entry->IsSymlink()) {
195  perf::Inc(counters_->n_symlinks_changed);
196  } else {
197  perf::Inc(counters_->n_symlinks_removed);
198  perf::Inc(counters_->n_files_added);
199  dif += entry->GetScratchSize();
200  }
201  } else {
202  // Replace calls Remove; cancel Remove's actions:
203  perf::Dec(counters_->n_files_removed);
204  dif -= entry->GetRdOnlySize();
205  if (entry->IsSymlink()) {
206  perf::Inc(counters_->n_files_removed);
207  perf::Inc(counters_->n_symlinks_added);
208  } else {
209  perf::Inc(counters_->n_files_changed);
210  dif += entry->GetScratchSize();
211  }
212  }
213 
214  if (dif > 0) { // added bytes
215  perf::Xadd(counters_->sz_added_bytes, dif);
216  } else { // removed bytes
217  perf::Xadd(counters_->sz_removed_bytes, -dif);
218  }
219  return;
220  }
221 
222  PrintWarning("'" + entry->GetRelativePath() +
223  "' cannot be touched. Unrecognized file type.");
224 }
225 
226 
231  EnsureAllowed(entry);
232 
233  if (entry->WasDirectory()) {
235  return;
236  }
237 
238  if (entry->WasBundleSpec()) {
239  // for now remove using RemoveFile()
240  RemoveFile(entry);
241  return;
242  }
243 
244  if (entry->WasRegularFile() || entry->WasSymlink() ||
245  entry->WasSpecialFile()) {
246  RemoveFile(entry);
247  return;
248  }
249 
250  PrintWarning("'" + entry->GetRelativePath() +
251  "' cannot be deleted. Unrecognized file type.");
252 }
253 
254 
259  // EnsureAllowed(entry); <-- Done by Remove() and Add()
260  Remove(entry);
261  Add(entry);
262 }
263 
264 void SyncMediator::Clone(const std::string from, const std::string to) {
265  catalog_manager_->Clone(from, to);
266 }
267 
269  if (!handle_hardlinks_) {
270  return;
271  }
272 
273  HardlinkGroupMap new_map;
274  hardlink_stack_.push(new_map);
275 }
276 
277 
279 {
280  if (!handle_hardlinks_) {
281  return;
282  }
283 
284  CompleteHardlinks(entry);
286  hardlink_stack_.pop();
287 }
288 
289 
295  reporter_->CommitReport();
296 
297  if (!params_->dry_run) {
299  "Waiting for upload of files before committing...");
300  params_->spooler->WaitForUpload();
301  }
302 
303  if (!hardlink_queue_.empty()) {
305 
306  LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
307  params_->spooler->UnregisterListeners();
309  this);
310 
311  // TODO(rmeusel): Revise that for Thread Safety!
312  // This loop will spool hardlinks into the spooler, which will then
313  // process them.
314  // On completion of every hardlink the spooler will asynchronously
315  // emit callbacks (SyncMediator::PublishHardlinksCallback) which
316  // might happen while this for-loop goes through the hardlink_queue_
317  //
318  // For the moment this seems not to be a problem, but it's an accident
319  // just waiting to happen.
320  //
321  // Note: Just wrapping this loop in a mutex might produce a dead lock
322  // since the spooler does not fill it's processing queue to an
323  // unlimited size. Meaning that it might be flooded with hard-
324  // links and waiting for the queue to be processed while proces-
325  // sing is stalled because the callback is waiting for this
326  // mutex.
327  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
328  iEnd = hardlink_queue_.end(); i != iEnd; ++i)
329  {
330  LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
331  i->master->GetUnionPath().c_str());
332  IngestionSource *source =
333  new FileIngestionSource(i->master->GetUnionPath());
334  params_->spooler->Process(source);
335  }
336 
337  params_->spooler->WaitForUpload();
338 
339  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
340  iEnd = hardlink_queue_.end(); i != iEnd; ++i)
341  {
342  LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
343  i->master->GetUnionPath().c_str());
344  AddHardlinkGroup(*i);
345  }
346  }
347 
349 
350  params_->spooler->UnregisterListeners();
351 
352  if (params_->dry_run) {
353  manifest = NULL;
354  return true;
355  }
356 
357  LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
358  if (params_->spooler->GetNumberOfErrors() > 0) {
359  LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
360  return false;
361  }
362 
365  {
368  // Commit empty string to ensure that the "content" of the auto catalog
369  // markers is present in the repository.
370  string empty_file = CreateTempPath(params_->dir_temp + "/empty", 0600);
371  IngestionSource* source = new FileIngestionSource(empty_file);
372  params_->spooler->Process(source);
373  params_->spooler->WaitForUpload();
374  unlink(empty_file.c_str());
375  if (params_->spooler->GetNumberOfErrors() > 0) {
376  LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
377  return false;
378  }
379  }
383  manifest);
384 }
385 
386 
389 
390  uint64_t inode = entry->GetUnionInode();
391  LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
392  inode, entry->GetUnionPath().c_str());
393 
394  // Find the hard link group in the lists
395  HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(inode);
396 
397  if (hardlink_group == GetHardlinkMap().end()) {
398  // Create a new hardlink group
399  GetHardlinkMap().insert(
400  HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
401  } else {
402  // Append the file to the appropriate hardlink group
403  hardlink_group->second.AddHardlink(entry);
404  }
405 
406  // publish statistics counting for new file
407  if (entry->IsNew()) {
408  perf::Inc(counters_->n_files_added);
409  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
410  }
411 }
412 
413 
415  // Check if found file has hardlinks (nlink > 1)
416  // As we are looking through all files in one directory here, there might be
417  // completely untouched hardlink groups, which we can safely skip.
418  // Finally we have to see if the hardlink is already part of this group
419 
421 
422  if (entry->GetUnionLinkcount() < 2)
423  return;
424 
425  uint64_t inode = entry->GetUnionInode();
426  HardlinkGroupMap::iterator hl_group;
427  hl_group = GetHardlinkMap().find(inode);
428 
429  if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
430  bool found = false;
431 
432  // search for the entry in this group
433  for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
434  iEnd = hl_group->second.hardlinks.end(); i != iEnd; ++i)
435  {
436  if (*(i->second) == *entry) {
437  found = true;
438  break;
439  }
440  }
441 
442  if (!found) {
443  // Hardlink already in the group?
444  // If one element of a hardlink group is edited, all elements must be
445  // replaced. Here, we remove an untouched hardlink and add it to its
446  // hardlink group for re-adding later
447  LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
448  entry->GetUnionPath().c_str());
449  Remove(entry);
450  hl_group->second.AddHardlink(entry);
451  }
452  }
453 }
454 
455 
464 
465  // If no hardlink in this directory was changed, we can skip this
466  if (GetHardlinkMap().empty())
467  return;
468 
469  LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
470  entry->GetUnionPath().c_str());
471 
472  // Look for legacy hardlinks
474  false);
475  traversal.fn_new_file =
477  traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
478  traversal.fn_new_character_dev =
480  traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
481  traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
482  traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
483  traversal.Recurse(entry->GetUnionPath());
484 }
485 
486 
487 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
488  const string &file_name)
489 {
490  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
491  InsertLegacyHardlink(entry);
492 }
493 
494 
495 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
496  const string &file_name)
497 {
498  SharedPtr<SyncItem> entry =
499  CreateSyncItem(parent_dir, file_name, kItemSymlink);
500  InsertLegacyHardlink(entry);
501 }
502 
504  const string &parent_dir,
505  const string &file_name)
506 {
507  SharedPtr<SyncItem> entry =
508  CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
509  InsertLegacyHardlink(entry);
510 }
511 
513  const string &parent_dir,
514  const string &file_name)
515 {
516  SharedPtr<SyncItem> entry =
517  CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
518  InsertLegacyHardlink(entry);
519 }
520 
521 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
522  const string &file_name)
523 {
524  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
525  InsertLegacyHardlink(entry);
526 }
527 
528 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
529  const string &file_name)
530 {
531  SharedPtr<SyncItem> entry =
532  CreateSyncItem(parent_dir, file_name, kItemSocket);
533  InsertLegacyHardlink(entry);
534 }
535 
536 
538  AddDirectory(entry);
539 
540  // Create a recursion engine, which recursively adds all entries in a newly
541  // created directory
543  this, union_engine_->scratch_path(), true);
545  traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
546  traversal.fn_new_file = &SyncMediator::AddFileCallback;
547  traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
548  traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
549  traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
550  traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
551  traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
552  traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
553  traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
554  traversal.Recurse(entry->GetScratchPath());
555 }
556 
557 
558 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
559  const std::string &dir_name)
560 {
561  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
562  AddDirectory(entry);
563  return true; // The recursion engine should recurse deeper here
564 }
565 
566 
567 void SyncMediator::AddFileCallback(const std::string &parent_dir,
568  const std::string &file_name)
569 {
570  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
571  Add(entry);
572 }
573 
574 
575 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
576  const std::string &file_name)
577 {
578  SharedPtr<SyncItem> entry =
579  CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
580  Add(entry);
581 }
582 
583 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
584  const std::string &file_name)
585 {
586  SharedPtr<SyncItem> entry =
587  CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
588  Add(entry);
589 }
590 
591 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
592  const std::string &file_name)
593 {
594  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
595  Add(entry);
596 }
597 
598 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
599  const std::string &file_name) {
600  SharedPtr<SyncItem> entry =
601  CreateSyncItem(parent_dir, file_name, kItemSocket);
602  Add(entry);
603 }
604 
605 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
606  const std::string &link_name)
607 {
608  SharedPtr<SyncItem> entry =
609  CreateSyncItem(parent_dir, link_name, kItemSymlink);
610  Add(entry);
611 }
612 
613 
614 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
615  const std::string &dir_name)
616 {
617  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
618  EnterDirectory(entry);
619 }
620 
621 
622 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
623  const std::string &dir_name)
624 {
625  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
626  LeaveDirectory(entry);
627 }
628 
629 
631  // Delete a directory AFTER it was emptied here,
632  // because it would start up another recursion
633 
634  const bool recurse = false;
636  this, union_engine_->rdonly_path(), recurse);
638  traversal.fn_new_dir_postfix =
640  traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
641  traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
642  traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
643  traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
644  traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
645  traversal.Recurse(entry->GetRdOnlyPath());
646 
647  // The given directory was emptied recursively and can now itself be deleted
648  RemoveDirectory(entry);
649 }
650 
651 
652 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
653  const std::string &file_name)
654 {
655  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
656  Remove(entry);
657 }
658 
659 
660 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
661  const std::string &link_name)
662 {
663  SharedPtr<SyncItem> entry =
664  CreateSyncItem(parent_dir, link_name, kItemSymlink);
665  Remove(entry);
666 }
667 
669  const std::string &parent_dir,
670  const std::string &link_name)
671 {
672  SharedPtr<SyncItem> entry =
673  CreateSyncItem(parent_dir, link_name, kItemCharacterDevice);
674  Remove(entry);
675 }
676 
678  const std::string &parent_dir,
679  const std::string &link_name)
680 {
681  SharedPtr<SyncItem> entry =
682  CreateSyncItem(parent_dir, link_name, kItemBlockDevice);
683  Remove(entry);
684 }
685 
686 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
687  const std::string &link_name)
688 {
689  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name, kItemFifo);
690  Remove(entry);
691 }
692 
693 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
694  const std::string &link_name)
695 {
696  SharedPtr<SyncItem> entry =
697  CreateSyncItem(parent_dir, link_name, kItemSocket);
698  Remove(entry);
699 }
700 
701 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
702  const std::string &dir_name)
703 {
704  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
706 }
707 
708 
709 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
710  const std::string &file_name)
711 {
712  if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
713  return true;
714  }
715 
716  SharedPtr<SyncItem> entry =
717  CreateSyncItem(parent_dir, file_name, kItemUnknown);
718  return entry->IsWhiteout();
719 }
720 
722  const std::string &relative_parent_path, const std::string &filename,
723  const SyncItemType entry_type) const {
724  return union_engine_->CreateSyncItem(relative_parent_path, filename,
725  entry_type);
726 }
727 
730  "Spooler callback for %s, digest %s, produced %d chunks, retval %d",
731  result.local_path.c_str(),
732  result.content_hash.ToString().c_str(),
733  result.file_chunks.size(),
734  result.return_code);
735  if (result.return_code != 0) {
736  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
737  result.return_code);
738  }
739 
740  SyncItemList::iterator itr;
741  {
743  itr = file_queue_.find(result.local_path);
744  }
745 
746  assert(itr != file_queue_.end());
747 
748  SyncItem &item = *itr->second;
749  item.SetContentHash(result.content_hash);
750  item.SetCompressionAlgorithm(result.compression_alg);
751 
752  XattrList *xattrs = &default_xattrs_;
753  if (params_->include_xattrs) {
754  xattrs = XattrList::CreateFromFile(result.local_path);
755  assert(xattrs != NULL);
756  }
757 
758  if (result.IsChunked()) {
760  item.CreateBasicCatalogDirent(),
761  *xattrs,
762  item.relative_parent_path(),
763  result.file_chunks);
764  } else {
766  item.CreateBasicCatalogDirent(),
767  *xattrs,
768  item.relative_parent_path());
769  }
770 
771  if (xattrs != &default_xattrs_)
772  free(xattrs);
773 }
774 
775 
777  const upload::SpoolerResult &result)
778 {
780  "Spooler callback for hardlink %s, digest %s, retval %d",
781  result.local_path.c_str(),
782  result.content_hash.ToString().c_str(),
783  result.return_code);
784  if (result.return_code != 0) {
785  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
786  result.return_code);
787  }
788 
789  bool found = false;
790  for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
791  if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
792  found = true;
793  hardlink_queue_[i].master->SetContentHash(result.content_hash);
794  SyncItemList::iterator j, jend;
795  for (j = hardlink_queue_[i].hardlinks.begin(),
796  jend = hardlink_queue_[i].hardlinks.end();
797  j != jend; ++j)
798  {
799  j->second->SetContentHash(result.content_hash);
800  j->second->SetCompressionAlgorithm(result.compression_alg);
801  }
802  if (result.IsChunked())
803  hardlink_queue_[i].file_chunks = result.file_chunks;
804 
805  break;
806  }
807  }
808 
809  assert(found);
810 }
811 
812 
814  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
815  reporter_->OnAdd(notice, catalog::DirectoryEntry());
816 
817  if (!params_->dry_run) {
818  catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
819  }
820 }
821 
822 
824  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
825  reporter_->OnRemove(notice, catalog::DirectoryEntry());
826 
827  if (!params_->dry_run) {
828  catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
829  }
830 }
831 
833  const history::History::Tag & /*to_tag*/) {}
834 
836 
837 void SyncDiffReporter::OnAdd(const std::string &path,
838  const catalog::DirectoryEntry & /*entry*/) {
839  changed_items_++;
840  AddImpl(path);
841 }
842 void SyncDiffReporter::OnRemove(const std::string &path,
843  const catalog::DirectoryEntry & /*entry*/) {
844  changed_items_++;
845  RemoveImpl(path);
846 }
847 void SyncDiffReporter::OnModify(const std::string &path,
848  const catalog::DirectoryEntry & /*entry_from*/,
849  const catalog::DirectoryEntry & /*entry_to*/) {
850  changed_items_++;
851  ModifyImpl(path);
852 }
853 
855  if (print_action_ == kPrintDots) {
858  }
859  }
860 }
861 
865  }
866 }
867 
868 void SyncDiffReporter::AddImpl(const std::string &path) {
869  const char *action_label;
870 
871  switch (print_action_) {
872  case kPrintChanges:
873  if (path.at(0) != '/') {
874  action_label = "[x-catalog-add]";
875  } else {
876  action_label = "[add]";
877  }
878  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
879  break;
880 
881  case kPrintDots:
882  PrintDots();
883  break;
884  default:
885  assert("Invalid print action.");
886  }
887 }
888 
889 void SyncDiffReporter::RemoveImpl(const std::string &path) {
890  const char *action_label;
891 
892  switch (print_action_) {
893  case kPrintChanges:
894  if (path.at(0) != '/') {
895  action_label = "[x-catalog-rem]";
896  } else {
897  action_label = "[rem]";
898  }
899 
900  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
901  break;
902 
903  case kPrintDots:
904  PrintDots();
905  break;
906  default:
907  assert("Invalid print action.");
908  }
909 }
910 
911 void SyncDiffReporter::ModifyImpl(const std::string &path) {
912  const char *action_label;
913 
914  switch (print_action_) {
915  case kPrintChanges:
916  action_label = "[mod]";
917  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
918  break;
919 
920  case kPrintDots:
921  PrintDots();
922  break;
923  default:
924  assert("Invalid print action.");
925  }
926 }
927 
929  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
930 
931  if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
932  assert(!entry->HasGraftMarker());
933  // Symlinks and special files are completely stored in the catalog
934  XattrList *xattrs = &default_xattrs_;
935  if (params_->include_xattrs) {
936  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
937  assert(xattrs);
938  }
939  catalog_manager_->AddFile(entry->CreateBasicCatalogDirent(), *xattrs,
940  entry->relative_parent_path());
941  if (xattrs != &default_xattrs_)
942  free(xattrs);
943  } else if (entry->HasGraftMarker() && !params_->dry_run) {
944  if (entry->IsValidGraft()) {
945  // Graft files are added to catalog immediately.
946  if (entry->IsChunkedGraft()) {
948  entry->CreateBasicCatalogDirent(), default_xattrs_,
949  entry->relative_parent_path(), *(entry->GetGraftChunks()));
950  } else {
952  entry->CreateBasicCatalogDirent(),
953  default_xattrs_, // TODO(bbockelm): For now, use default xattrs
954  // on grafted files.
955  entry->relative_parent_path());
956  }
957  } else {
958  // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
959  // the graft file is missing information. It's not clear that continuing
960  // forward with the publish is the correct thing to do; abort for now.
962  "Encountered a grafted file (%s) with "
963  "invalid grafting information; check contents of .cvmfsgraft-*"
964  " file. Aborting publish.",
965  entry->GetRelativePath().c_str());
966  }
967  } else if (entry->relative_parent_path().empty() &&
968  entry->IsCatalogMarker()) {
969  PANIC(kLogStderr, "Error: nested catalog marker in root directory");
970  } else if (!params_->dry_run) {
971  {
972  // Push the file to the spooler, remember the entry for the path
974  file_queue_[entry->GetUnionPath()] = entry;
975  }
976  // Spool the file
977  params_->spooler->Process(entry->CreateIngestionSource());
978  }
979 
980  // publish statistics counting for new file
981  if (entry->IsNew()) {
982  if (entry->IsSymlink()) {
983  perf::Inc(counters_->n_symlinks_added);
984  } else {
985  perf::Inc(counters_->n_files_added);
986  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
987  }
988  }
989 }
990 
992  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
993 
994  if (!params_->dry_run) {
995  if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
996  LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
997  entry->GetUnionPath().c_str());
998  catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
999  }
1000  catalog_manager_->RemoveFile(entry->GetRelativePath());
1001  }
1002 
1003  // Counting nr of removed files and removed bytes
1004  if (entry->WasSymlink()) {
1005  perf::Inc(counters_->n_symlinks_removed);
1006  } else {
1007  perf::Inc(counters_->n_files_removed);
1008  }
1009  perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
1010 }
1011 
1013  AddDirectory(entry);
1014 }
1015 
1017  if (entry->IsBundleSpec()) {
1018  PANIC(kLogStderr, "Illegal directory name: .cvmfsbundles (%s). "
1019  ".cvmfsbundles is reserved for bundles specification files",
1020  entry->GetUnionPath().c_str());
1021  }
1022 
1023  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1024 
1025  perf::Inc(counters_->n_directories_added);
1026  assert(!entry->HasGraftMarker());
1027  if (!params_->dry_run) {
1028  XattrList *xattrs = &default_xattrs_;
1029  if (params_->include_xattrs) {
1030  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1031  assert(xattrs);
1032  }
1033  catalog_manager_->AddDirectory(entry->CreateBasicCatalogDirent(), *xattrs,
1034  entry->relative_parent_path());
1035  if (xattrs != &default_xattrs_)
1036  free(xattrs);
1037  }
1038 
1039  if (entry->HasCatalogMarker() &&
1040  !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1041  CreateNestedCatalog(entry);
1042  }
1043 }
1044 
1045 
1052  const std::string directory_path = entry->GetRelativePath();
1053 
1054  if (catalog_manager_->IsTransitionPoint(directory_path)) {
1055  RemoveNestedCatalog(entry);
1056  }
1057 
1058  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1059  if (!params_->dry_run) {
1060  catalog_manager_->RemoveDirectory(directory_path);
1061  }
1062 
1063  perf::Inc(counters_->n_directories_removed);
1064 }
1065 
1067  reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1069 
1070  const std::string directory_path = entry->GetRelativePath();
1071 
1072  if (!params_->dry_run) {
1073  XattrList *xattrs = &default_xattrs_;
1074  if (params_->include_xattrs) {
1075  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1076  assert(xattrs);
1077  }
1078  catalog_manager_->TouchDirectory(entry->CreateBasicCatalogDirent(), *xattrs,
1079  directory_path);
1080  if (xattrs != &default_xattrs_) free(xattrs);
1081  }
1082 
1083  if (entry->HasCatalogMarker() &&
1084  !catalog_manager_->IsTransitionPoint(directory_path)) {
1085  CreateNestedCatalog(entry);
1086  } else if (!entry->HasCatalogMarker() &&
1087  catalog_manager_->IsTransitionPoint(directory_path)) {
1088  RemoveNestedCatalog(entry);
1089  }
1090 }
1091 
1098 
1099  for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1100  iEnd = hardlinks.end(); i != iEnd; ++i)
1101  {
1102  if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount() &&
1104  PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1105  i->second.master->GetUnionPath().c_str());
1106  }
1107 
1108  if (params_->print_changeset) {
1109  for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1110  jEnd = i->second.hardlinks.end(); j != jEnd; ++j)
1111  {
1112  std::string changeset_notice =
1113  GetParentPath(i->second.master->GetUnionPath()) + "/" +
1114  j->second->filename();
1115  reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1116  }
1117  }
1118 
1119  if (params_->dry_run)
1120  continue;
1121 
1122  if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1123  AddHardlinkGroup(i->second);
1124  else
1125  hardlink_queue_.push_back(i->second);
1126  }
1127 }
1128 
1129 
1132 
1133  // Create a DirectoryEntry list out of the hardlinks
1135  for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1136  iEnd = group.hardlinks.end(); i != iEnd; ++i)
1137  {
1138  hardlinks.push_back(i->second->CreateBasicCatalogDirent());
1139  }
1140  XattrList *xattrs = &default_xattrs_;
1141  if (params_->include_xattrs) {
1142  xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1143  assert(xattrs);
1144  }
1146  hardlinks,
1147  *xattrs,
1148  group.master->relative_parent_path(),
1149  group.file_chunks);
1150  if (xattrs != &default_xattrs_)
1151  free(xattrs);
1152 }
1153 
1154 } // namespace publish
bool Commit(manifest::Manifest *manifest)
int return_code
the return value of the spooler operation
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
void RemoveSocketCallback(const std::string &parent_dir, const std::string &link_name)
void ModifyImpl(const std::string &path)
void Dec(class Counter *counter)
Definition: statistics.h:49
void AddChunkedFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
const manifest::Manifest * manifest() const
Definition: repository.h:123
void AddDirectoryRecursively(SharedPtr< SyncItem > entry)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
void AddHardlinkGroup(const DirectoryEntryBaseList &entries, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
HardlinkGroupMapStack hardlink_stack_
void AddBlockDeviceCallback(const std::string &parent_dir, const std::string &file_name)
FileChunkList file_chunks
the file chunks generated during processing
void AddDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
virtual bool SupportsHardlinks() const
Definition: sync_union.h:140
void RemoveFile(SharedPtr< SyncItem > entry)
void CompleteHardlinks(SharedPtr< SyncItem > entry)
void RemoveNestedCatalog(SharedPtr< SyncItem > directory)
void RemoveDirectory(const std::string &directory_path)
#define PANIC(...)
Definition: exception.h:26
void CreateNestedCatalog(SharedPtr< SyncItem > directory)
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
UniquePtr< SyncDiffReporter > reporter_
virtual void OnInit(const history::History::Tag &from_tag, const history::History::Tag &to_tag)
virtual void OnModify(const std::string &path, const catalog::DirectoryEntry &entry_from, const catalog::DirectoryEntry &entry_to)
void RemoveDirectoryRecursively(SharedPtr< SyncItem > entry)
HardlinkGroupMap & GetHardlinkMap()
void LegacyCharacterDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void Replace(SharedPtr< SyncItem > entry)
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1059
std::string scratch_path() const
Definition: sync_union.h:98
void PublishHardlinksCallback(const upload::SpoolerResult &result)
void EnterDirectory(SharedPtr< SyncItem > entry)
A simple recursion engine to abstract the recursion of directories. It provides several callback hook...
Definition: fs_traversal.h:37
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
UniquePtr< perf::FsCounters > counters_
assert((mem||(size==0))&&"Out Of Memory")
bool Commit(const bool stop_for_tweaks, const uint64_t manual_revision, manifest::Manifest *manifest)
void LegacySymlinkHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddDirectory(SharedPtr< SyncItem > entry)
void RemoveImpl(const std::string &path)
HardlinkGroupList hardlink_queue_
void RemoveCharacterDeviceCallback(const std::string &parent_dir, const std::string &link_name)
std::string GetParentPath(const std::string &path)
Definition: posix.cc:131
void AddFileCallback(const std::string &parent_dir, const std::string &file_name)
virtual void PostUpload()
Definition: sync_union.h:81
void LegacySocketHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
Definition: sync_union.cc:30
void RemoveFileCallback(const std::string &parent_dir, const std::string &file_name)
void PublishFilesCallback(const upload::SpoolerResult &result)
bool IsTransitionPoint(const std::string &mountpoint)
void Clone(const std::string from, const std::string to)
void TouchDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &directory_path)
void AddFile(SharedPtr< SyncItem > entry)
std::string dir_temp
void InsertLegacyHardlink(SharedPtr< SyncItem > entry)
void AddUnmaterializedDirectory(SharedPtr< SyncItem > entry)
std::string local_path
the local_path previously given as input
unsigned int processing_dot_interval_
Definition: sync_mediator.h:86
std::string union_path() const
Definition: sync_union.h:97
VoidCallback fn_new_file
Definition: fs_traversal.h:47
zlib::Algorithms compression_alg
SyncUnion * union_engine_
static JsonDocument * Create(const std::string &text)
void ShrinkHardlinkGroup(const std::string &remove_path)
SyncItemList file_queue_
const SyncParameters * params_
VoidCallback fn_enter_dir
Definition: fs_traversal.h:45
virtual bool IgnoreFilePredicate(const std::string &parent_dir, const std::string &filename)
Definition: sync_union.cc:57
void RemoveFifoCallback(const std::string &parent_dir, const std::string &link_name)
virtual void OnStats(const catalog::DeltaCounters &delta)
upload::Spooler * spooler
void AddImpl(const std::string &path)
void AddSocketCallback(const std::string &parent_dir, const std::string &file_name)
static const int kActionNone
void Touch(SharedPtr< SyncItem > entry)
void AddFifoCallback(const std::string &parent_dir, const std::string &file_name)
void AddSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
void EnterAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void PrintWarning(const string &message)
Definition: logging.cc:536
void Add(SharedPtr< SyncItem > entry)
void Inc(class Counter *counter)
Definition: statistics.h:50
bool AddDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:265
void Clone(const std::string from, const std::string to)
bool IgnoreFileCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveFile(const std::string &file_path)
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2011
void RemoveNestedCatalog(const std::string &mountpoint, const bool merge=true)
uint64_t manual_revision
void LegacyFifoHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddCharacterDeviceCallback(const std::string &parent_dir, const std::string &file_name)
void AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks)
pthread_mutex_t lock_file_queue_
void InsertHardlink(SharedPtr< SyncItem > entry)
catalog::WritableCatalogManager * catalog_manager_
void LeaveDirectory(SharedPtr< SyncItem > entry)
virtual void OnRemove(const std::string &path, const catalog::DirectoryEntry &entry)
SyncItemType
Definition: sync_item.h:29
Definition: mutex.h:42
void SetContentHash(const shash::Any &hash)
Definition: sync_item.h:122
void RemoveSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
std::vector< DirectoryEntryBase > DirectoryEntryBaseList
void RemoveBlockDeviceCallback(const std::string &parent_dir, const std::string &link_name)
static XattrList * CreateFromFile(const std::string &path)
Definition: xattr.cc:30
const int kLogVerboseMsg
void LegacyBlockDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void TouchDirectory(SharedPtr< SyncItem > entry)
static const char * kVirtualPath
bool ignore_xdir_hardlinks
void AddFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
void EnsureAllowed(SharedPtr< SyncItem > entry)
void LegacyRegularHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RegisterUnionEngine(SyncUnion *engine)
void AddHardlinkGroup(const HardlinkGroup &group)
std::string rdonly_path() const
Definition: sync_union.h:96
void RemoveDirectory(SharedPtr< SyncItem > entry)
std::map< uint64_t, HardlinkGroup > HardlinkGroupMap
void CreateNestedCatalog(const std::string &mountpoint)
size_t size() const
Definition: bigvector.h:100
virtual void OnAdd(const std::string &path, const catalog::DirectoryEntry &entry)
void Remove(SharedPtr< SyncItem > entry)
void LeaveAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)