CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sync_mediator.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "sync_mediator.h"
8 
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12 
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16 
17 #include "catalog_virtual.h"
19 #include "crypto/hash.h"
20 #include "directory_entry.h"
21 #include "json_document.h"
22 #include "publish/repository.h"
23 #include "sync_union.h"
24 #include "upload.h"
25 #include "util/concurrency.h"
26 #include "util/exception.h"
27 #include "util/fs_traversal.h"
28 #include "util/posix.h"
29 #include "util/smalloc.h"
30 #include "util/string.h"
31 
32 using namespace std; // NOLINT
33 
34 namespace publish {
35 
36 AbstractSyncMediator::~AbstractSyncMediator() { }
37 
38 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
39  const SyncParameters *params,
40  perf::StatisticsTemplate statistics)
41  : catalog_manager_(catalog_manager)
42  , union_engine_(NULL)
43  , handle_hardlinks_(false)
44  , params_(params)
45  , reporter_(new SyncDiffReporter(params_->print_changeset
46  ? SyncDiffReporter::kPrintChanges
47  : SyncDiffReporter::kPrintDots)) {
48  int retval = pthread_mutex_init(&lock_file_queue_, NULL);
49  assert(retval == 0);
50 
51  params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
52 
53  counters_ = new perf::FsCounters(statistics);
54 }
55 
56 SyncMediator::~SyncMediator() { pthread_mutex_destroy(&lock_file_queue_); }
57 
58 
60  union_engine_ = engine;
62 }
63 
64 
71  const bool ignore_case_setting = false;
72  string relative_path = entry->GetRelativePath();
73  if ((relative_path == string(catalog::VirtualCatalog::kVirtualPath))
74  || (HasPrefix(relative_path,
76  ignore_case_setting))) {
77  PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
78  relative_path.c_str());
79  }
80 }
81 
82 
88  EnsureAllowed(entry);
89 
90  if (entry->IsDirectory()) {
92  return;
93  }
94 
95  // .cvmfsbundles file type
96  if (entry->IsBundleSpec()) {
97  PrintWarning(".cvmfsbundles file encountered. "
98  "Bundles is currently an experimental feature.");
99 
100  if (!entry->IsRegularFile()) {
101  PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
102  }
103  if (entry->HasHardlinks()) {
104  PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
105  }
106 
107  std::string parent_path = GetParentPath(entry->GetUnionPath());
108  if (parent_path != union_engine_->union_path()) {
110  "Error: .cvmfsbundles file must be in the root"
111  " directory of the repository. Found in %s",
112  parent_path.c_str());
113  }
114 
115  std::string json_string;
116 
117  int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
118  if (fd < 0) {
119  PANIC(kLogStderr, "Could not open file: %s",
120  entry->GetUnionPath().c_str());
121  }
122  if (!SafeReadToString(fd, &json_string)) {
123  PANIC(kLogStderr, "Could not read contents of file: %s",
124  entry->GetUnionPath().c_str());
125  }
127 
128  AddFile(entry);
129  return;
130  }
131 
132  if (entry->IsRegularFile() || entry->IsSymlink()) {
133  // A file is a hard link if the link count is greater than 1
134  if (entry->HasHardlinks() && handle_hardlinks_)
135  InsertHardlink(entry);
136  else
137  AddFile(entry);
138  return;
139  } else if (entry->IsGraftMarker()) {
140  LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
141  return; // Ignore markers.
142  }
143 
144  // In OverlayFS whiteouts can be represented as character devices with major
145  // and minor numbers equal to 0. Special files will be ignored except if they
146  // are whiteout files.
147  if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
149  PrintWarning("'" + entry->GetRelativePath()
150  + "' "
151  "is a special file, ignoring.");
152  } else {
153  if (entry->HasHardlinks() && handle_hardlinks_)
154  InsertHardlink(entry);
155  else
156  AddFile(entry);
157  }
158  return;
159  }
160 
161  PrintWarning("'" + entry->GetRelativePath()
162  + "' cannot be added. Unrecognized file type.");
163 }
164 
165 
170  EnsureAllowed(entry);
171 
172  if (entry->IsGraftMarker()) {
173  return;
174  }
175  if (entry->IsDirectory()) {
176  TouchDirectory(entry);
177  perf::Inc(counters_->n_directories_changed);
178  return;
179  }
180 
181  if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
182  Replace(entry); // This way, hardlink processing is correct
183  // Replace calls Remove; cancel Remove's actions:
184  perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
185 
186  // Count only the difference between the old and new file
187  // Symlinks do not count into added or removed bytes
188  int64_t dif = 0;
189 
190  // Need to handle 4 cases (symlink->symlink, symlink->regular,
191  // regular->symlink, regular->regular)
192  if (entry->WasSymlink()) {
193  // Replace calls Remove; cancel Remove's actions:
194  perf::Dec(counters_->n_symlinks_removed);
195 
196  if (entry->IsSymlink()) {
197  perf::Inc(counters_->n_symlinks_changed);
198  } else {
199  perf::Inc(counters_->n_symlinks_removed);
200  perf::Inc(counters_->n_files_added);
201  dif += entry->GetScratchSize();
202  }
203  } else {
204  // Replace calls Remove; cancel Remove's actions:
205  perf::Dec(counters_->n_files_removed);
206  dif -= entry->GetRdOnlySize();
207  if (entry->IsSymlink()) {
208  perf::Inc(counters_->n_files_removed);
209  perf::Inc(counters_->n_symlinks_added);
210  } else {
211  perf::Inc(counters_->n_files_changed);
212  dif += entry->GetScratchSize();
213  }
214  }
215 
216  if (dif > 0) { // added bytes
217  perf::Xadd(counters_->sz_added_bytes, dif);
218  } else { // removed bytes
219  perf::Xadd(counters_->sz_removed_bytes, -dif);
220  }
221  return;
222  }
223 
224  PrintWarning("'" + entry->GetRelativePath()
225  + "' cannot be touched. Unrecognized file type.");
226 }
227 
228 
233  EnsureAllowed(entry);
234 
235  if (entry->WasDirectory()) {
237  return;
238  }
239 
240  if (entry->WasBundleSpec()) {
241  // for now remove using RemoveFile()
242  RemoveFile(entry);
243  return;
244  }
245 
246  if (entry->WasRegularFile() || entry->WasSymlink()
247  || entry->WasSpecialFile()) {
248  RemoveFile(entry);
249  return;
250  }
251 
252  PrintWarning("'" + entry->GetRelativePath()
253  + "' cannot be deleted. Unrecognized file type.");
254 }
255 
256 
261  // EnsureAllowed(entry); <-- Done by Remove() and Add()
262  Remove(entry);
263  Add(entry);
264 }
265 
266 void SyncMediator::Clone(const std::string from, const std::string to) {
267  catalog_manager_->Clone(from, to);
268 }
269 
271  if (!handle_hardlinks_) {
272  return;
273  }
274 
275  HardlinkGroupMap new_map;
276  hardlink_stack_.push(new_map);
277 }
278 
279 
281  if (!handle_hardlinks_) {
282  return;
283  }
284 
285  CompleteHardlinks(entry);
287  hardlink_stack_.pop();
288 }
289 
290 
296  reporter_->CommitReport();
297 
298  if (!params_->dry_run) {
300  "Waiting for upload of files before committing...");
301  params_->spooler->WaitForUpload();
302  }
303 
304  if (!hardlink_queue_.empty()) {
306 
307  LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
308  params_->spooler->UnregisterListeners();
310  this);
311 
312  // TODO(rmeusel): Revise that for Thread Safety!
313  // This loop will spool hardlinks into the spooler, which will then
314  // process them.
315  // On completion of every hardlink the spooler will asynchronously
316  // emit callbacks (SyncMediator::PublishHardlinksCallback) which
317  // might happen while this for-loop goes through the hardlink_queue_
318  //
319  // For the moment this seems not to be a problem, but it's an accident
320  // just waiting to happen.
321  //
322  // Note: Just wrapping this loop in a mutex might produce a dead lock
323  // since the spooler does not fill it's processing queue to an
324  // unlimited size. Meaning that it might be flooded with hard-
325  // links and waiting for the queue to be processed while proces-
326  // sing is stalled because the callback is waiting for this
327  // mutex.
328  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
329  iEnd = hardlink_queue_.end();
330  i != iEnd;
331  ++i) {
332  LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
333  i->master->GetUnionPath().c_str());
335  i->master->GetUnionPath());
336  params_->spooler->Process(source);
337  }
338 
339  params_->spooler->WaitForUpload();
340 
341  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
342  iEnd = hardlink_queue_.end();
343  i != iEnd;
344  ++i) {
345  LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
346  i->master->GetUnionPath().c_str());
347  AddHardlinkGroup(*i);
348  }
349  }
350 
351  if (union_engine_)
353 
354  params_->spooler->UnregisterListeners();
355 
356  if (params_->dry_run) {
357  manifest = NULL;
358  return true;
359  }
360 
361  LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
362  if (params_->spooler->GetNumberOfErrors() > 0) {
363  LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
364  return false;
365  }
366 
372  // Commit empty string to ensure that the "content" of the auto catalog
373  // markers is present in the repository.
374  string empty_file = CreateTempPath(params_->dir_temp + "/empty", 0600);
375  IngestionSource *source = new FileIngestionSource(empty_file);
376  params_->spooler->Process(source);
377  params_->spooler->WaitForUpload();
378  unlink(empty_file.c_str());
379  if (params_->spooler->GetNumberOfErrors() > 0) {
380  LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
381  return false;
382  }
383  }
385  return catalog_manager_->Commit(
387 }
388 
389 
392 
393  uint64_t inode = entry->GetUnionInode();
394  LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
395  inode, entry->GetUnionPath().c_str());
396 
397  // Find the hard link group in the lists
398  HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(inode);
399 
400  if (hardlink_group == GetHardlinkMap().end()) {
401  // Create a new hardlink group
402  GetHardlinkMap().insert(
403  HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
404  } else {
405  // Append the file to the appropriate hardlink group
406  hardlink_group->second.AddHardlink(entry);
407  }
408 
409  // publish statistics counting for new file
410  if (entry->IsNew()) {
411  perf::Inc(counters_->n_files_added);
412  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
413  }
414 }
415 
416 
418  // Check if found file has hardlinks (nlink > 1)
419  // As we are looking through all files in one directory here, there might be
420  // completely untouched hardlink groups, which we can safely skip.
421  // Finally we have to see if the hardlink is already part of this group
422 
424 
425  if (entry->GetUnionLinkcount() < 2)
426  return;
427 
428  uint64_t inode = entry->GetUnionInode();
429  HardlinkGroupMap::iterator hl_group;
430  hl_group = GetHardlinkMap().find(inode);
431 
432  if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
433  bool found = false;
434 
435  // search for the entry in this group
436  for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
437  iEnd = hl_group->second.hardlinks.end();
438  i != iEnd;
439  ++i) {
440  if (*(i->second) == *entry) {
441  found = true;
442  break;
443  }
444  }
445 
446  if (!found) {
447  // Hardlink already in the group?
448  // If one element of a hardlink group is edited, all elements must be
449  // replaced. Here, we remove an untouched hardlink and add it to its
450  // hardlink group for re-adding later
451  LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
452  entry->GetUnionPath().c_str());
453  Remove(entry);
454  hl_group->second.AddHardlink(entry);
455  }
456  }
457 }
458 
459 
468 
469  // If no hardlink in this directory was changed, we can skip this
470  if (GetHardlinkMap().empty())
471  return;
472 
473  LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
474  entry->GetUnionPath().c_str());
475 
476  // Look for legacy hardlinks
478  false);
480  traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
481  traversal.fn_new_character_dev = &SyncMediator::
483  traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
484  traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
485  traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
486  traversal.Recurse(entry->GetUnionPath());
487 }
488 
489 
490 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
491  const string &file_name) {
492  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
493  InsertLegacyHardlink(entry);
494 }
495 
496 
497 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
498  const string &file_name) {
499  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
500  kItemSymlink);
501  InsertLegacyHardlink(entry);
502 }
503 
505  const string &parent_dir, const string &file_name) {
506  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
508  InsertLegacyHardlink(entry);
509 }
510 
512  const string &file_name) {
513  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
515  InsertLegacyHardlink(entry);
516 }
517 
518 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
519  const string &file_name) {
520  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
521  InsertLegacyHardlink(entry);
522 }
523 
524 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
525  const string &file_name) {
526  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
527  kItemSocket);
528  InsertLegacyHardlink(entry);
529 }
530 
531 
533  AddDirectory(entry);
534 
535  // Create a recursion engine, which recursively adds all entries in a newly
536  // created directory
538  this, union_engine_->scratch_path(), true);
540  traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
541  traversal.fn_new_file = &SyncMediator::AddFileCallback;
542  traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
543  traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
544  traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
545  traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
546  traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
547  traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
548  traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
549  traversal.Recurse(entry->GetScratchPath());
550 }
551 
552 
553 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
554  const std::string &dir_name) {
555  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
556  AddDirectory(entry);
557  return true; // The recursion engine should recurse deeper here
558 }
559 
560 
561 void SyncMediator::AddFileCallback(const std::string &parent_dir,
562  const std::string &file_name) {
563  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
564  Add(entry);
565 }
566 
567 
568 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
569  const std::string &file_name) {
570  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
572  Add(entry);
573 }
574 
575 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
576  const std::string &file_name) {
577  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
579  Add(entry);
580 }
581 
582 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
583  const std::string &file_name) {
584  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
585  Add(entry);
586 }
587 
588 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
589  const std::string &file_name) {
590  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
591  kItemSocket);
592  Add(entry);
593 }
594 
595 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
596  const std::string &link_name) {
597  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
598  kItemSymlink);
599  Add(entry);
600 }
601 
602 
603 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
604  const std::string &dir_name) {
605  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
606  EnterDirectory(entry);
607 }
608 
609 
610 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
611  const std::string &dir_name) {
612  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
613  LeaveDirectory(entry);
614 }
615 
616 
618  // Delete a directory AFTER it was emptied here,
619  // because it would start up another recursion
620 
621  const bool recurse = false;
623  this, union_engine_->rdonly_path(), recurse);
625  traversal.fn_new_dir_postfix = &SyncMediator::RemoveDirectoryCallback;
626  traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
627  traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
628  traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
629  traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
630  traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
631  traversal.Recurse(entry->GetRdOnlyPath());
632 
633  // The given directory was emptied recursively and can now itself be deleted
634  RemoveDirectory(entry);
635 }
636 
637 
638 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
639  const std::string &file_name) {
640  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
641  Remove(entry);
642 }
643 
644 
645 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
646  const std::string &link_name) {
647  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
648  kItemSymlink);
649  Remove(entry);
650 }
651 
652 void SyncMediator::RemoveCharacterDeviceCallback(const std::string &parent_dir,
653  const std::string &link_name) {
654  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
656  Remove(entry);
657 }
658 
659 void SyncMediator::RemoveBlockDeviceCallback(const std::string &parent_dir,
660  const std::string &link_name) {
661  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
663  Remove(entry);
664 }
665 
666 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
667  const std::string &link_name) {
668  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name, kItemFifo);
669  Remove(entry);
670 }
671 
672 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
673  const std::string &link_name) {
674  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name,
675  kItemSocket);
676  Remove(entry);
677 }
678 
679 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
680  const std::string &dir_name) {
681  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
683 }
684 
685 
686 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
687  const std::string &file_name) {
688  if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
689  return true;
690  }
691 
692  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name,
693  kItemUnknown);
694  return entry->IsWhiteout();
695 }
696 
698  const std::string &relative_parent_path, const std::string &filename,
699  const SyncItemType entry_type) const {
700  return union_engine_->CreateSyncItem(relative_parent_path, filename,
701  entry_type);
702 }
703 
706  "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
707  result.local_path.c_str(), result.content_hash.ToString().c_str(),
708  result.file_chunks.size(), result.return_code);
709  if (result.return_code != 0) {
710  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
711  result.return_code);
712  }
713 
714  SyncItemList::iterator itr;
715  {
717  itr = file_queue_.find(result.local_path);
718  }
719 
720  assert(itr != file_queue_.end());
721 
722  SyncItem &item = *itr->second;
723  item.SetContentHash(result.content_hash);
724  item.SetCompressionAlgorithm(result.compression_alg);
725 
726  XattrList *xattrs = &default_xattrs_;
727  if (params_->include_xattrs) {
728  xattrs = XattrList::CreateFromFile(result.local_path);
729  assert(xattrs != NULL);
730  }
731 
732  if (result.IsChunked()) {
734  item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
735  *xattrs,
736  item.relative_parent_path(),
737  result.file_chunks);
738  } else {
740  item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
741  *xattrs,
742  item.relative_parent_path());
743  }
744 
745  if (xattrs != &default_xattrs_)
746  free(xattrs);
747 }
748 
749 
751  const upload::SpoolerResult &result) {
753  "Spooler callback for hardlink %s, digest %s, retval %d",
754  result.local_path.c_str(), result.content_hash.ToString().c_str(),
755  result.return_code);
756  if (result.return_code != 0) {
757  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
758  result.return_code);
759  }
760 
761  bool found = false;
762  for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
763  if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
764  found = true;
765  hardlink_queue_[i].master->SetContentHash(result.content_hash);
766  SyncItemList::iterator j, jend;
767  for (j = hardlink_queue_[i].hardlinks.begin(),
768  jend = hardlink_queue_[i].hardlinks.end();
769  j != jend;
770  ++j) {
771  j->second->SetContentHash(result.content_hash);
772  j->second->SetCompressionAlgorithm(result.compression_alg);
773  }
774  if (result.IsChunked())
775  hardlink_queue_[i].file_chunks = result.file_chunks;
776 
777  break;
778  }
779  }
780 
781  assert(found);
782 }
783 
784 
786  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
787  reporter_->OnAdd(notice, catalog::DirectoryEntry());
788 
789  if (!params_->dry_run) {
790  catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
791  }
792 }
793 
794 
796  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
797  reporter_->OnRemove(notice, catalog::DirectoryEntry());
798 
799  if (!params_->dry_run) {
800  catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
801  }
802 }
803 
805  const history::History::Tag & /*to_tag*/) { }
806 
808 
809 void SyncDiffReporter::OnAdd(const std::string &path,
810  const catalog::DirectoryEntry & /*entry*/) {
811  changed_items_++;
812  AddImpl(path);
813 }
814 void SyncDiffReporter::OnRemove(const std::string &path,
815  const catalog::DirectoryEntry & /*entry*/) {
816  changed_items_++;
817  RemoveImpl(path);
818 }
819 void SyncDiffReporter::OnModify(const std::string &path,
820  const catalog::DirectoryEntry & /*entry_from*/,
821  const catalog::DirectoryEntry & /*entry_to*/) {
822  changed_items_++;
823  ModifyImpl(path);
824 }
825 
827  if (print_action_ == kPrintDots) {
830  }
831  }
832 }
833 
837  }
838 }
839 
840 void SyncDiffReporter::AddImpl(const std::string &path) {
841  const char *action_label;
842 
843  switch (print_action_) {
844  case kPrintChanges:
845  if (path.at(0) != '/') {
846  action_label = "[x-catalog-add]";
847  } else {
848  action_label = "[add]";
849  }
850  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
851  break;
852 
853  case kPrintDots:
854  PrintDots();
855  break;
856  default:
857  assert("Invalid print action.");
858  }
859 }
860 
861 void SyncDiffReporter::RemoveImpl(const std::string &path) {
862  const char *action_label;
863 
864  switch (print_action_) {
865  case kPrintChanges:
866  if (path.at(0) != '/') {
867  action_label = "[x-catalog-rem]";
868  } else {
869  action_label = "[rem]";
870  }
871 
872  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
873  break;
874 
875  case kPrintDots:
876  PrintDots();
877  break;
878  default:
879  assert("Invalid print action.");
880  }
881 }
882 
883 void SyncDiffReporter::ModifyImpl(const std::string &path) {
884  const char *action_label;
885 
886  switch (print_action_) {
887  case kPrintChanges:
888  action_label = "[mod]";
889  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
890  break;
891 
892  case kPrintDots:
893  PrintDots();
894  break;
895  default:
896  assert("Invalid print action.");
897  }
898 }
899 
901  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
902 
903  if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
904  assert(!entry->HasGraftMarker());
905  // Symlinks and special files are completely stored in the catalog
906  XattrList *xattrs = &default_xattrs_;
907  if (params_->include_xattrs) {
908  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
909  assert(xattrs);
910  }
912  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
913  *xattrs,
914  entry->relative_parent_path());
915  if (xattrs != &default_xattrs_)
916  free(xattrs);
917  } else if (entry->HasGraftMarker() && !params_->dry_run) {
918  if (entry->IsValidGraft()) {
919  // Graft files are added to catalog immediately.
920  if (entry->IsChunkedGraft()) {
922  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
924  entry->relative_parent_path(),
925  *(entry->GetGraftChunks()));
926  } else {
928  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
929  default_xattrs_, // TODO(bbockelm): For now, use default xattrs
930  // on grafted files.
931  entry->relative_parent_path());
932  }
933  } else {
934  // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
935  // the graft file is missing information. It's not clear that continuing
936  // forward with the publish is the correct thing to do; abort for now.
938  "Encountered a grafted file (%s) with "
939  "invalid grafting information; check contents of .cvmfsgraft-*"
940  " file. Aborting publish.",
941  entry->GetRelativePath().c_str());
942  }
943  } else if (entry->relative_parent_path().empty()
944  && entry->IsCatalogMarker()) {
945  PANIC(kLogStderr, "Error: nested catalog marker in root directory");
946  } else if (!params_->dry_run) {
947  {
948  // Push the file to the spooler, remember the entry for the path
950  file_queue_[entry->GetUnionPath()] = entry;
951  }
952  // Spool the file
953  params_->spooler->Process(entry->CreateIngestionSource());
954  }
955 
956  // publish statistics counting for new file
957  if (entry->IsNew()) {
958  if (entry->IsSymlink()) {
959  perf::Inc(counters_->n_symlinks_added);
960  } else {
961  perf::Inc(counters_->n_files_added);
962  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
963  }
964  }
965 }
966 
968  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
969 
970  if (!params_->dry_run) {
971  if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
972  LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
973  entry->GetUnionPath().c_str());
974  catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
975  }
976  catalog_manager_->RemoveFile(entry->GetRelativePath());
977  }
978 
979  // Counting nr of removed files and removed bytes
980  if (entry->WasSymlink()) {
981  perf::Inc(counters_->n_symlinks_removed);
982  } else {
983  perf::Inc(counters_->n_files_removed);
984  }
985  perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
986 }
987 
989  AddDirectory(entry);
990 }
991 
993  if (entry->IsBundleSpec()) {
995  "Illegal directory name: .cvmfsbundles (%s). "
996  ".cvmfsbundles is reserved for bundles specification files",
997  entry->GetUnionPath().c_str());
998  }
999 
1000  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1001 
1002  perf::Inc(counters_->n_directories_added);
1003  assert(!entry->HasGraftMarker());
1004  if (!params_->dry_run) {
1005  XattrList *xattrs = &default_xattrs_;
1006  if (params_->include_xattrs) {
1007  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1008  assert(xattrs);
1009  }
1011  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1012  entry->relative_parent_path());
1013  if (xattrs != &default_xattrs_)
1014  free(xattrs);
1015  }
1016 
1017  if (entry->HasCatalogMarker()
1018  && !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1019  CreateNestedCatalog(entry);
1020  }
1021 }
1022 
1023 
1030  const std::string directory_path = entry->GetRelativePath();
1031 
1032  if (catalog_manager_->IsTransitionPoint(directory_path)) {
1033  RemoveNestedCatalog(entry);
1034  }
1035 
1036  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1037  if (!params_->dry_run) {
1038  catalog_manager_->RemoveDirectory(directory_path);
1039  }
1040 
1041  perf::Inc(counters_->n_directories_removed);
1042 }
1043 
1045  reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1047 
1048  const std::string directory_path = entry->GetRelativePath();
1049 
1050  if (!params_->dry_run) {
1051  XattrList *xattrs = &default_xattrs_;
1052  if (params_->include_xattrs) {
1053  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1054  assert(xattrs);
1055  }
1057  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1058  directory_path);
1059  if (xattrs != &default_xattrs_)
1060  free(xattrs);
1061  }
1062 
1063  if (entry->HasCatalogMarker()
1064  && !catalog_manager_->IsTransitionPoint(directory_path)) {
1065  CreateNestedCatalog(entry);
1066  } else if (!entry->HasCatalogMarker()
1067  && catalog_manager_->IsTransitionPoint(directory_path)) {
1068  RemoveNestedCatalog(entry);
1069  }
1070 }
1071 
1078 
1079  for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1080  iEnd = hardlinks.end();
1081  i != iEnd;
1082  ++i) {
1083  if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()
1085  PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1086  i->second.master->GetUnionPath().c_str());
1087  }
1088 
1089  if (params_->print_changeset) {
1090  for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1091  jEnd = i->second.hardlinks.end();
1092  j != jEnd;
1093  ++j) {
1094  std::string changeset_notice = GetParentPath(
1095  i->second.master->GetUnionPath())
1096  + "/" + j->second->filename();
1097  reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1098  }
1099  }
1100 
1101  if (params_->dry_run)
1102  continue;
1103 
1104  if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1105  AddHardlinkGroup(i->second);
1106  else
1107  hardlink_queue_.push_back(i->second);
1108  }
1109 }
1110 
1111 
1114 
1115  // Create a DirectoryEntry list out of the hardlinks
1117  for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1118  iEnd = group.hardlinks.end();
1119  i != iEnd;
1120  ++i) {
1121  hardlinks.push_back(
1122  i->second->CreateBasicCatalogDirent(params_->enable_mtime_ns));
1123  }
1124  XattrList *xattrs = &default_xattrs_;
1125  if (params_->include_xattrs) {
1126  xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1127  assert(xattrs);
1128  }
1130  *xattrs,
1131  group.master->relative_parent_path(),
1132  group.file_chunks);
1133  if (xattrs != &default_xattrs_)
1134  free(xattrs);
1135 }
1136 
1137 } // namespace publish
bool Commit(manifest::Manifest *manifest)
int return_code
the return value of the spooler operation
void RemoveSocketCallback(const std::string &parent_dir, const std::string &link_name)
void ModifyImpl(const std::string &path)
void Dec(class Counter *counter)
Definition: statistics.h:49
void AddChunkedFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
const manifest::Manifest * manifest() const
Definition: repository.h:125
void AddDirectoryRecursively(SharedPtr< SyncItem > entry)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
void AddHardlinkGroup(const DirectoryEntryBaseList &entries, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
HardlinkGroupMapStack hardlink_stack_
void AddBlockDeviceCallback(const std::string &parent_dir, const std::string &file_name)
FileChunkList file_chunks
the file chunks generated during processing
void AddDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
virtual bool SupportsHardlinks() const
Definition: sync_union.h:140
void RemoveFile(SharedPtr< SyncItem > entry)
void CompleteHardlinks(SharedPtr< SyncItem > entry)
void RemoveNestedCatalog(SharedPtr< SyncItem > directory)
void RemoveDirectory(const std::string &directory_path)
#define PANIC(...)
Definition: exception.h:29
void CreateNestedCatalog(SharedPtr< SyncItem > directory)
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
UniquePtr< SyncDiffReporter > reporter_
virtual void OnInit(const history::History::Tag &from_tag, const history::History::Tag &to_tag)
virtual void OnModify(const std::string &path, const catalog::DirectoryEntry &entry_from, const catalog::DirectoryEntry &entry_to)
void RemoveDirectoryRecursively(SharedPtr< SyncItem > entry)
HardlinkGroupMap & GetHardlinkMap()
void LegacyCharacterDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void Replace(SharedPtr< SyncItem > entry)
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1041
std::string scratch_path() const
Definition: sync_union.h:98
void PublishHardlinksCallback(const upload::SpoolerResult &result)
void EnterDirectory(SharedPtr< SyncItem > entry)
A simple recursion engine to abstract the recursion of directories. It provides several callback hook...
Definition: fs_traversal.h:36
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
UniquePtr< perf::FsCounters > counters_
assert((mem||(size==0))&&"Out Of Memory")
bool Commit(const bool stop_for_tweaks, const uint64_t manual_revision, manifest::Manifest *manifest)
void LegacySymlinkHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddDirectory(SharedPtr< SyncItem > entry)
void RemoveImpl(const std::string &path)
HardlinkGroupList hardlink_queue_
void RemoveCharacterDeviceCallback(const std::string &parent_dir, const std::string &link_name)
void AddFileCallback(const std::string &parent_dir, const std::string &file_name)
virtual void PostUpload()
Definition: sync_union.h:81
void LegacySocketHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
Definition: sync_union.cc:30
void RemoveFileCallback(const std::string &parent_dir, const std::string &file_name)
void PublishFilesCallback(const upload::SpoolerResult &result)
bool IsTransitionPoint(const std::string &mountpoint)
void Clone(const std::string from, const std::string to)
void TouchDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &directory_path)
void AddFile(SharedPtr< SyncItem > entry)
std::string dir_temp
void InsertLegacyHardlink(SharedPtr< SyncItem > entry)
void AddUnmaterializedDirectory(SharedPtr< SyncItem > entry)
std::string local_path
the local_path previously given as input
unsigned int processing_dot_interval_
Definition: sync_mediator.h:86
std::string union_path() const
Definition: sync_union.h:97
VoidCallback fn_new_file
Definition: fs_traversal.h:46
zlib::Algorithms compression_alg
SyncUnion * union_engine_
static JsonDocument * Create(const std::string &text)
void ShrinkHardlinkGroup(const std::string &remove_path)
SyncItemList file_queue_
const SyncParameters * params_
VoidCallback fn_enter_dir
Definition: fs_traversal.h:44
virtual bool IgnoreFilePredicate(const std::string &parent_dir, const std::string &filename)
Definition: sync_union.cc:57
void RemoveFifoCallback(const std::string &parent_dir, const std::string &link_name)
virtual void OnStats(const catalog::DeltaCounters &delta)
upload::Spooler * spooler
void AddImpl(const std::string &path)
void AddSocketCallback(const std::string &parent_dir, const std::string &file_name)
static const int kActionNone
void Touch(SharedPtr< SyncItem > entry)
void AddFifoCallback(const std::string &parent_dir, const std::string &file_name)
void AddSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
void EnterAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void PrintWarning(const string &message)
Definition: logging.cc:560
void Add(SharedPtr< SyncItem > entry)
void Inc(class Counter *counter)
Definition: statistics.h:50
bool AddDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
void Clone(const std::string from, const std::string to)
bool IgnoreFileCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveFile(const std::string &file_path)
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2116
void RemoveNestedCatalog(const std::string &mountpoint, const bool merge=true)
uint64_t manual_revision
void LegacyFifoHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddCharacterDeviceCallback(const std::string &parent_dir, const std::string &file_name)
void AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks)
pthread_mutex_t lock_file_queue_
void InsertHardlink(SharedPtr< SyncItem > entry)
catalog::WritableCatalogManager * catalog_manager_
void LeaveDirectory(SharedPtr< SyncItem > entry)
virtual void OnRemove(const std::string &path, const catalog::DirectoryEntry &entry)
SyncItemType
Definition: sync_item.h:29
Definition: mutex.h:42
void SetContentHash(const shash::Any &hash)
Definition: sync_item.h:120
PathString GetParentPath(const PathString &path)
Definition: shortstring.cc:14
void RemoveSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
std::vector< DirectoryEntryBase > DirectoryEntryBaseList
void RemoveBlockDeviceCallback(const std::string &parent_dir, const std::string &link_name)
static XattrList * CreateFromFile(const std::string &path)
Definition: xattr.cc:32
const int kLogVerboseMsg
void LegacyBlockDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void TouchDirectory(SharedPtr< SyncItem > entry)
static const char * kVirtualPath
bool ignore_xdir_hardlinks
void AddFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
void EnsureAllowed(SharedPtr< SyncItem > entry)
void LegacyRegularHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RegisterUnionEngine(SyncUnion *engine)
void AddHardlinkGroup(const HardlinkGroup &group)
std::string rdonly_path() const
Definition: sync_union.h:96
void RemoveDirectory(SharedPtr< SyncItem > entry)
std::map< uint64_t, HardlinkGroup > HardlinkGroupMap
void CreateNestedCatalog(const std::string &mountpoint)
size_t size() const
Definition: bigvector.h:117
virtual void OnAdd(const std::string &path, const catalog::DirectoryEntry &entry)
void Remove(SharedPtr< SyncItem > entry)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545
void LeaveAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)