CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sync_mediator.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "sync_mediator.h"
8 
9 #include <fcntl.h>
10 #include <inttypes.h>
11 #include <unistd.h>
12 
13 #include <cassert>
14 #include <cstdio>
15 #include <cstdlib>
16 
17 #include "catalog_virtual.h"
19 #include "crypto/hash.h"
20 #include "directory_entry.h"
21 #include "json_document.h"
22 #include "publish/repository.h"
23 #include "sync_union.h"
24 #include "upload.h"
25 #include "util/concurrency.h"
26 #include "util/exception.h"
27 #include "util/fs_traversal.h"
28 #include "util/posix.h"
29 #include "util/smalloc.h"
30 #include "util/string.h"
31 
32 using namespace std; // NOLINT
33 
34 namespace publish {
35 
36 AbstractSyncMediator::~AbstractSyncMediator() { }
37 
38 SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
39  const SyncParameters *params,
40  perf::StatisticsTemplate statistics)
41  : catalog_manager_(catalog_manager)
42  , union_engine_(NULL)
43  , handle_hardlinks_(false)
44  , params_(params)
45  , reporter_(new SyncDiffReporter(params_->print_changeset
46  ? SyncDiffReporter::kPrintChanges
47  : SyncDiffReporter::kPrintDots)) {
48  const int retval = pthread_mutex_init(&lock_file_queue_, NULL);
49  assert(retval == 0);
50 
51  params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
52 
53  counters_ = new perf::FsCounters(statistics);
54 }
55 
56 SyncMediator::~SyncMediator() { pthread_mutex_destroy(&lock_file_queue_); }
57 
58 
60  union_engine_ = engine;
62 }
63 
64 
71  const bool ignore_case_setting = false;
72  const string relative_path = entry->GetRelativePath();
73  if ((relative_path == string(catalog::VirtualCatalog::kVirtualPath))
74  || (HasPrefix(relative_path,
76  ignore_case_setting))) {
77  PANIC(kLogStderr, "[ERROR] invalid attempt to modify %s",
78  relative_path.c_str());
79  }
80 }
81 
82 
88  EnsureAllowed(entry);
89 
90  if (entry->IsDirectory()) {
92  return;
93  }
94 
95  // .cvmfsbundles file type
96  if (entry->IsBundleSpec()) {
97  PrintWarning(".cvmfsbundles file encountered. "
98  "Bundles is currently an experimental feature.");
99 
100  if (!entry->IsRegularFile()) {
101  PANIC(kLogStderr, "Error: .cvmfsbundles file must be a regular file");
102  }
103  if (entry->HasHardlinks()) {
104  PANIC(kLogStderr, "Error: .cvmfsbundles file must not be a hard link");
105  }
106 
107  const std::string parent_path = GetParentPath(entry->GetUnionPath());
108  if (parent_path != union_engine_->union_path()) {
110  "Error: .cvmfsbundles file must be in the root"
111  " directory of the repository. Found in %s",
112  parent_path.c_str());
113  }
114 
115  std::string json_string;
116 
117  const int fd = open(entry->GetUnionPath().c_str(), O_RDONLY);
118  if (fd < 0) {
119  PANIC(kLogStderr, "Could not open file: %s",
120  entry->GetUnionPath().c_str());
121  }
122  if (!SafeReadToString(fd, &json_string)) {
123  PANIC(kLogStderr, "Could not read contents of file: %s",
124  entry->GetUnionPath().c_str());
125  }
126  const UniquePtr<JsonDocument> json(JsonDocument::Create(json_string));
127 
128  AddFile(entry);
129  return;
130  }
131 
132  if (entry->IsRegularFile() || entry->IsSymlink()) {
133  // A file is a hard link if the link count is greater than 1
134  if (entry->HasHardlinks() && handle_hardlinks_)
135  InsertHardlink(entry);
136  else
137  AddFile(entry);
138  return;
139  } else if (entry->IsGraftMarker()) {
140  LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
141  return; // Ignore markers.
142  }
143 
144  // In OverlayFS whiteouts can be represented as character devices with major
145  // and minor numbers equal to 0. Special files will be ignored except if they
146  // are whiteout files.
147  if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
149  PrintWarning("'" + entry->GetRelativePath()
150  + "' "
151  "is a special file, ignoring.");
152  } else {
153  if (entry->HasHardlinks() && handle_hardlinks_)
154  InsertHardlink(entry);
155  else
156  AddFile(entry);
157  }
158  return;
159  }
160 
161  PrintWarning("'" + entry->GetRelativePath()
162  + "' cannot be added. Unrecognized file type.");
163 }
164 
165 
170  EnsureAllowed(entry);
171 
172  if (entry->IsGraftMarker()) {
173  return;
174  }
175  if (entry->IsDirectory()) {
176  TouchDirectory(entry);
177  perf::Inc(counters_->n_directories_changed);
178  return;
179  }
180 
181  if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
182  Replace(entry); // This way, hardlink processing is correct
183  // Replace calls Remove; cancel Remove's actions:
184  perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
185 
186  // Count only the difference between the old and new file
187  // Symlinks do not count into added or removed bytes
188  int64_t dif = 0;
189 
190  // Need to handle 4 cases (symlink->symlink, symlink->regular,
191  // regular->symlink, regular->regular)
192  if (entry->WasSymlink()) {
193  // Replace calls Remove; cancel Remove's actions:
194  perf::Dec(counters_->n_symlinks_removed);
195 
196  if (entry->IsSymlink()) {
197  perf::Inc(counters_->n_symlinks_changed);
198  } else {
199  perf::Inc(counters_->n_symlinks_removed);
200  perf::Inc(counters_->n_files_added);
201  dif += entry->GetScratchSize();
202  }
203  } else {
204  // Replace calls Remove; cancel Remove's actions:
205  perf::Dec(counters_->n_files_removed);
206  dif -= entry->GetRdOnlySize();
207  if (entry->IsSymlink()) {
208  perf::Inc(counters_->n_files_removed);
209  perf::Inc(counters_->n_symlinks_added);
210  } else {
211  perf::Inc(counters_->n_files_changed);
212  dif += entry->GetScratchSize();
213  }
214  }
215 
216  if (dif > 0) { // added bytes
217  perf::Xadd(counters_->sz_added_bytes, dif);
218  } else { // removed bytes
219  perf::Xadd(counters_->sz_removed_bytes, -dif);
220  }
221  return;
222  }
223 
224  PrintWarning("'" + entry->GetRelativePath()
225  + "' cannot be touched. Unrecognized file type.");
226 }
227 
228 
233  EnsureAllowed(entry);
234 
235  if (entry->WasDirectory()) {
237  return;
238  }
239 
240  if (entry->WasBundleSpec()) {
241  // for now remove using RemoveFile()
242  RemoveFile(entry);
243  return;
244  }
245 
246  if (entry->WasRegularFile() || entry->WasSymlink()
247  || entry->WasSpecialFile()) {
248  RemoveFile(entry);
249  return;
250  }
251 
252  PrintWarning("'" + entry->GetRelativePath()
253  + "' cannot be deleted. Unrecognized file type.");
254 }
255 
256 
261  // EnsureAllowed(entry); <-- Done by Remove() and Add()
262  Remove(entry);
263  Add(entry);
264 }
265 
266 void SyncMediator::Clone(const std::string from, const std::string to) {
267  catalog_manager_->Clone(from, to);
268 }
269 
271  if (!handle_hardlinks_) {
272  return;
273  }
274 
275  const HardlinkGroupMap new_map;
276  hardlink_stack_.push(new_map);
277 }
278 
279 
281  if (!handle_hardlinks_) {
282  return;
283  }
284 
285  CompleteHardlinks(entry);
287  hardlink_stack_.pop();
288 }
289 
290 
296  reporter_->CommitReport();
297 
298  if (!params_->dry_run) {
300  "Waiting for upload of files before committing...");
301  params_->spooler->WaitForUpload();
302  }
303 
304  if (!hardlink_queue_.empty()) {
306 
307  LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
308  params_->spooler->UnregisterListeners();
310  this);
311 
312  // TODO(rmeusel): Revise that for Thread Safety!
313  // This loop will spool hardlinks into the spooler, which will then
314  // process them.
315  // On completion of every hardlink the spooler will asynchronously
316  // emit callbacks (SyncMediator::PublishHardlinksCallback) which
317  // might happen while this for-loop goes through the hardlink_queue_
318  //
319  // For the moment this seems not to be a problem, but it's an accident
320  // just waiting to happen.
321  //
322  // Note: Just wrapping this loop in a mutex might produce a dead lock
323  // since the spooler does not fill it's processing queue to an
324  // unlimited size. Meaning that it might be flooded with hard-
325  // links and waiting for the queue to be processed while proces-
326  // sing is stalled because the callback is waiting for this
327  // mutex.
328  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
329  iEnd = hardlink_queue_.end();
330  i != iEnd;
331  ++i) {
332  LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
333  i->master->GetUnionPath().c_str());
335  i->master->GetUnionPath());
336  params_->spooler->Process(source);
337  }
338 
339  params_->spooler->WaitForUpload();
340 
341  for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
342  iEnd = hardlink_queue_.end();
343  i != iEnd;
344  ++i) {
345  LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
346  i->master->GetUnionPath().c_str());
347  AddHardlinkGroup(*i);
348  }
349  }
350 
351  if (union_engine_)
353 
354  params_->spooler->UnregisterListeners();
355 
356  if (params_->dry_run) {
357  manifest = NULL;
358  return true;
359  }
360 
361  LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
362  if (params_->spooler->GetNumberOfErrors() > 0) {
363  LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
364  return false;
365  }
366 
372  // Commit empty string to ensure that the "content" of the auto catalog
373  // markers is present in the repository.
374  const string empty_file =
375  CreateTempPath(params_->dir_temp + "/empty", 0600);
376  IngestionSource *source = new FileIngestionSource(empty_file);
377  params_->spooler->Process(source);
378  params_->spooler->WaitForUpload();
379  unlink(empty_file.c_str());
380  if (params_->spooler->GetNumberOfErrors() > 0) {
381  LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
382  return false;
383  }
384  }
386  return catalog_manager_->Commit(
388 }
389 
390 
393 
394  const uint64_t inode = entry->GetUnionInode();
395  LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
396  inode, entry->GetUnionPath().c_str());
397 
398  // Find the hard link group in the lists
399  const HardlinkGroupMap::iterator hardlink_group =
400  GetHardlinkMap().find(inode);
401 
402  if (hardlink_group == GetHardlinkMap().end()) {
403  // Create a new hardlink group
404  GetHardlinkMap().insert(
405  HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
406  } else {
407  // Append the file to the appropriate hardlink group
408  hardlink_group->second.AddHardlink(entry);
409  }
410 
411  // publish statistics counting for new file
412  if (entry->IsNew()) {
413  perf::Inc(counters_->n_files_added);
414  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
415  }
416 }
417 
418 
420  // Check if found file has hardlinks (nlink > 1)
421  // As we are looking through all files in one directory here, there might be
422  // completely untouched hardlink groups, which we can safely skip.
423  // Finally we have to see if the hardlink is already part of this group
424 
426 
427  if (entry->GetUnionLinkcount() < 2)
428  return;
429 
430  const uint64_t inode = entry->GetUnionInode();
431  HardlinkGroupMap::iterator hl_group;
432  hl_group = GetHardlinkMap().find(inode);
433 
434  if (hl_group != GetHardlinkMap().end()) { // touched hardlinks in this group?
435  bool found = false;
436 
437  // search for the entry in this group
438  for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
439  iEnd = hl_group->second.hardlinks.end();
440  i != iEnd;
441  ++i) {
442  if (*(i->second) == *entry) {
443  found = true;
444  break;
445  }
446  }
447 
448  if (!found) {
449  // Hardlink already in the group?
450  // If one element of a hardlink group is edited, all elements must be
451  // replaced. Here, we remove an untouched hardlink and add it to its
452  // hardlink group for re-adding later
453  LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
454  entry->GetUnionPath().c_str());
455  Remove(entry);
456  hl_group->second.AddHardlink(entry);
457  }
458  }
459 }
460 
461 
470 
471  // If no hardlink in this directory was changed, we can skip this
472  if (GetHardlinkMap().empty())
473  return;
474 
475  LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
476  entry->GetUnionPath().c_str());
477 
478  // Look for legacy hardlinks
480  false);
482  traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
483  traversal.fn_new_character_dev = &SyncMediator::
485  traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
486  traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
487  traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
488  traversal.Recurse(entry->GetUnionPath());
489 }
490 
491 
492 void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
493  const string &file_name) {
494  const SharedPtr<SyncItem> entry =
495  CreateSyncItem(parent_dir, file_name, kItemFile);
496  InsertLegacyHardlink(entry);
497 }
498 
499 
500 void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
501  const string &file_name) {
502  const SharedPtr<SyncItem> entry =
503  CreateSyncItem(parent_dir, file_name, kItemSymlink);
504  InsertLegacyHardlink(entry);
505 }
506 
508  const string &parent_dir, const string &file_name) {
509  const SharedPtr<SyncItem> entry =
510  CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
511  InsertLegacyHardlink(entry);
512 }
513 
515  const string &file_name) {
516  const SharedPtr<SyncItem> entry =
517  CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
518  InsertLegacyHardlink(entry);
519 }
520 
521 void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
522  const string &file_name) {
523  const SharedPtr<SyncItem> entry =
524  CreateSyncItem(parent_dir, file_name, kItemFifo);
525  InsertLegacyHardlink(entry);
526 }
527 
528 void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
529  const string &file_name) {
530  const SharedPtr<SyncItem> entry =
531  CreateSyncItem(parent_dir, file_name, kItemSocket);
532  InsertLegacyHardlink(entry);
533 }
534 
535 
537  AddDirectory(entry);
538 
539  // Create a recursion engine, which recursively adds all entries in a newly
540  // created directory
542  this, union_engine_->scratch_path(), true);
544  traversal.fn_leave_dir = &SyncMediator::LeaveAddedDirectoryCallback;
545  traversal.fn_new_file = &SyncMediator::AddFileCallback;
546  traversal.fn_new_symlink = &SyncMediator::AddSymlinkCallback;
547  traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
548  traversal.fn_ignore_file = &SyncMediator::IgnoreFileCallback;
549  traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
550  traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
551  traversal.fn_new_fifo = &SyncMediator::AddFifoCallback;
552  traversal.fn_new_socket = &SyncMediator::AddSocketCallback;
553  traversal.Recurse(entry->GetScratchPath());
554 }
555 
556 
557 bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
558  const std::string &dir_name) {
559  const SharedPtr<SyncItem> entry =
560  CreateSyncItem(parent_dir, dir_name, kItemDir);
561  AddDirectory(entry);
562  return true; // The recursion engine should recurse deeper here
563 }
564 
565 
566 void SyncMediator::AddFileCallback(const std::string &parent_dir,
567  const std::string &file_name) {
568  const SharedPtr<SyncItem> entry =
569  CreateSyncItem(parent_dir, file_name, kItemFile);
570  Add(entry);
571 }
572 
573 
574 void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
575  const std::string &file_name) {
576  const SharedPtr<SyncItem> entry =
577  CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
578  Add(entry);
579 }
580 
581 void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
582  const std::string &file_name) {
583  const SharedPtr<SyncItem> entry =
584  CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
585  Add(entry);
586 }
587 
588 void SyncMediator::AddFifoCallback(const std::string &parent_dir,
589  const std::string &file_name) {
590  const SharedPtr<SyncItem> entry =
591  CreateSyncItem(parent_dir, file_name, kItemFifo);
592  Add(entry);
593 }
594 
595 void SyncMediator::AddSocketCallback(const std::string &parent_dir,
596  const std::string &file_name) {
597  const SharedPtr<SyncItem> entry =
598  CreateSyncItem(parent_dir, file_name, kItemSocket);
599  Add(entry);
600 }
601 
602 void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
603  const std::string &link_name) {
604  const SharedPtr<SyncItem> entry =
605  CreateSyncItem(parent_dir, link_name, kItemSymlink);
606  Add(entry);
607 }
608 
609 
610 void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
611  const std::string &dir_name) {
612  const SharedPtr<SyncItem> entry =
613  CreateSyncItem(parent_dir, dir_name, kItemDir);
614  EnterDirectory(entry);
615 }
616 
617 
618 void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
619  const std::string &dir_name) {
620  const SharedPtr<SyncItem> entry =
621  CreateSyncItem(parent_dir, dir_name, kItemDir);
622  LeaveDirectory(entry);
623 }
624 
625 
627  // Delete a directory AFTER it was emptied here,
628  // because it would start up another recursion
629 
630  const bool recurse = false;
632  this, union_engine_->rdonly_path(), recurse);
634  traversal.fn_new_dir_postfix = &SyncMediator::RemoveDirectoryCallback;
635  traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
636  traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
637  traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
638  traversal.fn_new_fifo = &SyncMediator::RemoveFifoCallback;
639  traversal.fn_new_socket = &SyncMediator::RemoveSocketCallback;
640  traversal.Recurse(entry->GetRdOnlyPath());
641 
642  // The given directory was emptied recursively and can now itself be deleted
643  RemoveDirectory(entry);
644 }
645 
646 
647 void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
648  const std::string &file_name) {
649  const SharedPtr<SyncItem> entry =
650  CreateSyncItem(parent_dir, file_name, kItemFile);
651  Remove(entry);
652 }
653 
654 
655 void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
656  const std::string &link_name) {
657  const SharedPtr<SyncItem> entry =
658  CreateSyncItem(parent_dir, link_name, kItemSymlink);
659  Remove(entry);
660 }
661 
662 void SyncMediator::RemoveCharacterDeviceCallback(const std::string &parent_dir,
663  const std::string &link_name) {
664  const SharedPtr<SyncItem> entry =
665  CreateSyncItem(parent_dir, link_name, kItemCharacterDevice);
666  Remove(entry);
667 }
668 
669 void SyncMediator::RemoveBlockDeviceCallback(const std::string &parent_dir,
670  const std::string &link_name) {
671  const SharedPtr<SyncItem> entry =
672  CreateSyncItem(parent_dir, link_name, kItemBlockDevice);
673  Remove(entry);
674 }
675 
676 void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
677  const std::string &link_name) {
678  const SharedPtr<SyncItem> entry =
679  CreateSyncItem(parent_dir, link_name, kItemFifo);
680  Remove(entry);
681 }
682 
683 void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
684  const std::string &link_name) {
685  const SharedPtr<SyncItem> entry =
686  CreateSyncItem(parent_dir, link_name, kItemSocket);
687  Remove(entry);
688 }
689 
690 void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
691  const std::string &dir_name) {
692  const SharedPtr<SyncItem> entry =
693  CreateSyncItem(parent_dir, dir_name, kItemDir);
695 }
696 
697 
698 bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
699  const std::string &file_name) {
700  if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
701  return true;
702  }
703 
704  const SharedPtr<SyncItem> entry =
705  CreateSyncItem(parent_dir, file_name, kItemUnknown);
706  return entry->IsWhiteout();
707 }
708 
710  const std::string &relative_parent_path, const std::string &filename,
711  const SyncItemType entry_type) const {
712  return union_engine_->CreateSyncItem(relative_parent_path, filename,
713  entry_type);
714 }
715 
718  "Spooler callback for %s, digest %s, produced %lu chunks, retval %d",
719  result.local_path.c_str(), result.content_hash.ToString().c_str(),
720  result.file_chunks.size(), result.return_code);
721  if (result.return_code != 0) {
722  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
723  result.return_code);
724  }
725 
726  SyncItemList::iterator itr;
727  {
728  const MutexLockGuard guard(lock_file_queue_);
729  itr = file_queue_.find(result.local_path);
730  }
731 
732  assert(itr != file_queue_.end());
733 
734  SyncItem &item = *itr->second;
735  item.SetContentHash(result.content_hash);
736  item.SetCompressionAlgorithm(result.compression_alg);
737 
738  XattrList *xattrs = &default_xattrs_;
739  if (params_->include_xattrs) {
740  xattrs = XattrList::CreateFromFile(result.local_path);
741  assert(xattrs != NULL);
742  }
743 
744  if (result.IsChunked()) {
746  item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
747  *xattrs,
748  item.relative_parent_path(),
749  result.file_chunks);
750  } else {
752  item.CreateBasicCatalogDirent(params_->enable_mtime_ns),
753  *xattrs,
754  item.relative_parent_path());
755  }
756 
757  if (xattrs != &default_xattrs_)
758  free(xattrs);
759 }
760 
761 
763  const upload::SpoolerResult &result) {
765  "Spooler callback for hardlink %s, digest %s, retval %d",
766  result.local_path.c_str(), result.content_hash.ToString().c_str(),
767  result.return_code);
768  if (result.return_code != 0) {
769  PANIC(kLogStderr, "Spool failure for %s (%d)", result.local_path.c_str(),
770  result.return_code);
771  }
772 
773  bool found = false;
774  for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
775  if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
776  found = true;
777  hardlink_queue_[i].master->SetContentHash(result.content_hash);
778  SyncItemList::iterator j, jend;
779  for (j = hardlink_queue_[i].hardlinks.begin(),
780  jend = hardlink_queue_[i].hardlinks.end();
781  j != jend;
782  ++j) {
783  j->second->SetContentHash(result.content_hash);
784  j->second->SetCompressionAlgorithm(result.compression_alg);
785  }
786  if (result.IsChunked())
787  hardlink_queue_[i].file_chunks = result.file_chunks;
788 
789  break;
790  }
791  }
792 
793  assert(found);
794 }
795 
796 
798  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
799  reporter_->OnAdd(notice, catalog::DirectoryEntry());
800 
801  if (!params_->dry_run) {
802  catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
803  }
804 }
805 
806 
808  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
809  reporter_->OnRemove(notice, catalog::DirectoryEntry());
810 
811  if (!params_->dry_run) {
812  catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
813  }
814 }
815 
817  const history::History::Tag & /*to_tag*/) { }
818 
820 
821 void SyncDiffReporter::OnAdd(const std::string &path,
822  const catalog::DirectoryEntry & /*entry*/) {
823  changed_items_++;
824  AddImpl(path);
825 }
826 void SyncDiffReporter::OnRemove(const std::string &path,
827  const catalog::DirectoryEntry & /*entry*/) {
828  changed_items_++;
829  RemoveImpl(path);
830 }
831 void SyncDiffReporter::OnModify(const std::string &path,
832  const catalog::DirectoryEntry & /*entry_from*/,
833  const catalog::DirectoryEntry & /*entry_to*/) {
834  changed_items_++;
835  ModifyImpl(path);
836 }
837 
839  if (print_action_ == kPrintDots) {
842  }
843  }
844 }
845 
849  }
850 }
851 
852 void SyncDiffReporter::AddImpl(const std::string &path) {
853  const char *action_label;
854 
855  switch (print_action_) {
856  case kPrintChanges:
857  if (path.at(0) != '/') {
858  action_label = "[x-catalog-add]";
859  } else {
860  action_label = "[add]";
861  }
862  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
863  break;
864 
865  case kPrintDots:
866  PrintDots();
867  break;
868  default:
869  assert("Invalid print action.");
870  }
871 }
872 
873 void SyncDiffReporter::RemoveImpl(const std::string &path) {
874  const char *action_label;
875 
876  switch (print_action_) {
877  case kPrintChanges:
878  if (path.at(0) != '/') {
879  action_label = "[x-catalog-rem]";
880  } else {
881  action_label = "[rem]";
882  }
883 
884  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
885  break;
886 
887  case kPrintDots:
888  PrintDots();
889  break;
890  default:
891  assert("Invalid print action.");
892  }
893 }
894 
895 void SyncDiffReporter::ModifyImpl(const std::string &path) {
896  const char *action_label;
897 
898  switch (print_action_) {
899  case kPrintChanges:
900  action_label = "[mod]";
901  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, path.c_str());
902  break;
903 
904  case kPrintDots:
905  PrintDots();
906  break;
907  default:
908  assert("Invalid print action.");
909  }
910 }
911 
913  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
914 
915  if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
916  assert(!entry->HasGraftMarker());
917  // Symlinks and special files are completely stored in the catalog
918  XattrList *xattrs = &default_xattrs_;
919  if (params_->include_xattrs) {
920  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
921  assert(xattrs);
922  }
924  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
925  *xattrs,
926  entry->relative_parent_path());
927  if (xattrs != &default_xattrs_)
928  free(xattrs);
929  } else if (entry->HasGraftMarker() && !params_->dry_run) {
930  if (entry->IsValidGraft()) {
931  // Graft files are added to catalog immediately.
932  if (entry->IsChunkedGraft()) {
934  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
936  entry->relative_parent_path(),
937  *(entry->GetGraftChunks()));
938  } else {
940  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns),
941  default_xattrs_, // TODO(bbockelm): For now, use default xattrs
942  // on grafted files.
943  entry->relative_parent_path());
944  }
945  } else {
946  // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
947  // the graft file is missing information. It's not clear that continuing
948  // forward with the publish is the correct thing to do; abort for now.
950  "Encountered a grafted file (%s) with "
951  "invalid grafting information; check contents of .cvmfsgraft-*"
952  " file. Aborting publish.",
953  entry->GetRelativePath().c_str());
954  }
955  } else if (entry->relative_parent_path().empty()
956  && entry->IsCatalogMarker()) {
957  PANIC(kLogStderr, "Error: nested catalog marker in root directory");
958  } else if (!params_->dry_run) {
959  {
960  // Push the file to the spooler, remember the entry for the path
962  file_queue_[entry->GetUnionPath()] = entry;
963  }
964  // Spool the file
965  params_->spooler->Process(entry->CreateIngestionSource());
966  }
967 
968  // publish statistics counting for new file
969  if (entry->IsNew()) {
970  if (entry->IsSymlink()) {
971  perf::Inc(counters_->n_symlinks_added);
972  } else {
973  perf::Inc(counters_->n_files_added);
974  perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
975  }
976  }
977 }
978 
980  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
981 
982  if (!params_->dry_run) {
983  if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
984  LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
985  entry->GetUnionPath().c_str());
986  catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
987  }
988  catalog_manager_->RemoveFile(entry->GetRelativePath());
989  }
990 
991  // Counting nr of removed files and removed bytes
992  if (entry->WasSymlink()) {
993  perf::Inc(counters_->n_symlinks_removed);
994  } else {
995  perf::Inc(counters_->n_files_removed);
996  }
997  perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
998 }
999 
1001  AddDirectory(entry);
1002 }
1003 
1005  if (entry->IsBundleSpec()) {
1006  PANIC(kLogStderr,
1007  "Illegal directory name: .cvmfsbundles (%s). "
1008  ".cvmfsbundles is reserved for bundles specification files",
1009  entry->GetUnionPath().c_str());
1010  }
1011 
1012  reporter_->OnAdd(entry->GetUnionPath(), catalog::DirectoryEntry());
1013 
1014  perf::Inc(counters_->n_directories_added);
1015  assert(!entry->HasGraftMarker());
1016  if (!params_->dry_run) {
1017  XattrList *xattrs = &default_xattrs_;
1018  if (params_->include_xattrs) {
1019  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1020  assert(xattrs);
1021  }
1023  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1024  entry->relative_parent_path());
1025  if (xattrs != &default_xattrs_)
1026  free(xattrs);
1027  }
1028 
1029  if (entry->HasCatalogMarker()
1030  && !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
1031  CreateNestedCatalog(entry);
1032  }
1033 }
1034 
1035 
1042  const std::string directory_path = entry->GetRelativePath();
1043 
1044  if (catalog_manager_->IsTransitionPoint(directory_path)) {
1045  RemoveNestedCatalog(entry);
1046  }
1047 
1048  reporter_->OnRemove(entry->GetUnionPath(), catalog::DirectoryEntry());
1049  if (!params_->dry_run) {
1050  catalog_manager_->RemoveDirectory(directory_path);
1051  }
1052 
1053  perf::Inc(counters_->n_directories_removed);
1054 }
1055 
1057  reporter_->OnModify(entry->GetUnionPath(), catalog::DirectoryEntry(),
1059 
1060  const std::string directory_path = entry->GetRelativePath();
1061 
1062  if (!params_->dry_run) {
1063  XattrList *xattrs = &default_xattrs_;
1064  if (params_->include_xattrs) {
1065  xattrs = XattrList::CreateFromFile(entry->GetUnionPath());
1066  assert(xattrs);
1067  }
1069  entry->CreateBasicCatalogDirent(params_->enable_mtime_ns), *xattrs,
1070  directory_path);
1071  if (xattrs != &default_xattrs_)
1072  free(xattrs);
1073  }
1074 
1075  if (entry->HasCatalogMarker()
1076  && !catalog_manager_->IsTransitionPoint(directory_path)) {
1077  CreateNestedCatalog(entry);
1078  } else if (!entry->HasCatalogMarker()
1079  && catalog_manager_->IsTransitionPoint(directory_path)) {
1080  RemoveNestedCatalog(entry);
1081  }
1082 }
1083 
1090 
1091  for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
1092  iEnd = hardlinks.end();
1093  i != iEnd;
1094  ++i) {
1095  if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()
1097  PANIC(kLogSyslogErr | kLogDebug, "Hardlinks across directories (%s)",
1098  i->second.master->GetUnionPath().c_str());
1099  }
1100 
1101  if (params_->print_changeset) {
1102  for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
1103  jEnd = i->second.hardlinks.end();
1104  j != jEnd;
1105  ++j) {
1106  const std::string changeset_notice =
1107  GetParentPath(i->second.master->GetUnionPath()) + "/" +
1108  j->second->filename();
1109  reporter_->OnAdd(changeset_notice, catalog::DirectoryEntry());
1110  }
1111  }
1112 
1113  if (params_->dry_run)
1114  continue;
1115 
1116  if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
1117  AddHardlinkGroup(i->second);
1118  else
1119  hardlink_queue_.push_back(i->second);
1120  }
1121 }
1122 
1123 
1126 
1127  // Create a DirectoryEntry list out of the hardlinks
1129  for (SyncItemList::const_iterator i = group.hardlinks.begin(),
1130  iEnd = group.hardlinks.end();
1131  i != iEnd;
1132  ++i) {
1133  hardlinks.push_back(
1134  i->second->CreateBasicCatalogDirent(params_->enable_mtime_ns));
1135  }
1136  XattrList *xattrs = &default_xattrs_;
1137  if (params_->include_xattrs) {
1138  xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1139  assert(xattrs);
1140  }
1142  *xattrs,
1143  group.master->relative_parent_path(),
1144  group.file_chunks);
1145  if (xattrs != &default_xattrs_)
1146  free(xattrs);
1147 }
1148 
1149 } // namespace publish
bool Commit(manifest::Manifest *manifest)
int return_code
the return value of the spooler operation
void RemoveSocketCallback(const std::string &parent_dir, const std::string &link_name)
void ModifyImpl(const std::string &path)
void Dec(class Counter *counter)
Definition: statistics.h:49
void AddChunkedFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
const manifest::Manifest * manifest() const
Definition: repository.h:125
void AddDirectoryRecursively(SharedPtr< SyncItem > entry)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
void AddHardlinkGroup(const DirectoryEntryBaseList &entries, const XattrList &xattrs, const std::string &parent_directory, const FileChunkList &file_chunks)
HardlinkGroupMapStack hardlink_stack_
void AddBlockDeviceCallback(const std::string &parent_dir, const std::string &file_name)
FileChunkList file_chunks
the file chunks generated during processing
void AddDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
virtual bool SupportsHardlinks() const
Definition: sync_union.h:140
void RemoveFile(SharedPtr< SyncItem > entry)
void CompleteHardlinks(SharedPtr< SyncItem > entry)
void RemoveNestedCatalog(SharedPtr< SyncItem > directory)
void RemoveDirectory(const std::string &directory_path)
#define PANIC(...)
Definition: exception.h:29
void CreateNestedCatalog(SharedPtr< SyncItem > directory)
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
UniquePtr< SyncDiffReporter > reporter_
virtual void OnInit(const history::History::Tag &from_tag, const history::History::Tag &to_tag)
virtual void OnModify(const std::string &path, const catalog::DirectoryEntry &entry_from, const catalog::DirectoryEntry &entry_to)
void RemoveDirectoryRecursively(SharedPtr< SyncItem > entry)
HardlinkGroupMap & GetHardlinkMap()
void LegacyCharacterDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void Replace(SharedPtr< SyncItem > entry)
std::string CreateTempPath(const std::string &path_prefix, const int mode)
Definition: posix.cc:1042
std::string scratch_path() const
Definition: sync_union.h:98
void PublishHardlinksCallback(const upload::SpoolerResult &result)
void EnterDirectory(SharedPtr< SyncItem > entry)
A simple recursion engine to abstract the recursion of directories. It provides several callback hook...
Definition: fs_traversal.h:36
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
UniquePtr< perf::FsCounters > counters_
assert((mem||(size==0))&&"Out Of Memory")
bool Commit(const bool stop_for_tweaks, const uint64_t manual_revision, manifest::Manifest *manifest)
void LegacySymlinkHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddDirectory(SharedPtr< SyncItem > entry)
void RemoveImpl(const std::string &path)
HardlinkGroupList hardlink_queue_
void RemoveCharacterDeviceCallback(const std::string &parent_dir, const std::string &link_name)
void AddFileCallback(const std::string &parent_dir, const std::string &file_name)
virtual void PostUpload()
Definition: sync_union.h:81
void LegacySocketHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
Definition: sync_union.cc:30
void RemoveFileCallback(const std::string &parent_dir, const std::string &file_name)
void PublishFilesCallback(const upload::SpoolerResult &result)
bool IsTransitionPoint(const std::string &mountpoint)
void Clone(const std::string from, const std::string to)
void TouchDirectory(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &directory_path)
void AddFile(SharedPtr< SyncItem > entry)
std::string dir_temp
void InsertLegacyHardlink(SharedPtr< SyncItem > entry)
void AddUnmaterializedDirectory(SharedPtr< SyncItem > entry)
std::string local_path
the local_path previously given as input
unsigned int processing_dot_interval_
Definition: sync_mediator.h:86
std::string union_path() const
Definition: sync_union.h:97
VoidCallback fn_new_file
Definition: fs_traversal.h:46
zlib::Algorithms compression_alg
SyncUnion * union_engine_
static JsonDocument * Create(const std::string &text)
void ShrinkHardlinkGroup(const std::string &remove_path)
SyncItemList file_queue_
const SyncParameters * params_
VoidCallback fn_enter_dir
Definition: fs_traversal.h:44
virtual bool IgnoreFilePredicate(const std::string &parent_dir, const std::string &filename)
Definition: sync_union.cc:57
void RemoveFifoCallback(const std::string &parent_dir, const std::string &link_name)
virtual void OnStats(const catalog::DeltaCounters &delta)
upload::Spooler * spooler
void AddImpl(const std::string &path)
void AddSocketCallback(const std::string &parent_dir, const std::string &file_name)
static const int kActionNone
void Touch(SharedPtr< SyncItem > entry)
void AddFifoCallback(const std::string &parent_dir, const std::string &file_name)
void AddSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
void EnterAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void PrintWarning(const string &message)
Definition: logging.cc:560
void Add(SharedPtr< SyncItem > entry)
void Inc(class Counter *counter)
Definition: statistics.h:50
bool AddDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
bool HasPrefix(const string &str, const string &prefix, const bool ignore_case)
Definition: string.cc:279
void Clone(const std::string from, const std::string to)
bool IgnoreFileCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveFile(const std::string &file_path)
bool stop_for_catalog_tweaks
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2117
void RemoveNestedCatalog(const std::string &mountpoint, const bool merge=true)
uint64_t manual_revision
void LegacyFifoHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void AddCharacterDeviceCallback(const std::string &parent_dir, const std::string &file_name)
void AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks)
pthread_mutex_t lock_file_queue_
void InsertHardlink(SharedPtr< SyncItem > entry)
catalog::WritableCatalogManager * catalog_manager_
void LeaveDirectory(SharedPtr< SyncItem > entry)
virtual void OnRemove(const std::string &path, const catalog::DirectoryEntry &entry)
SyncItemType
Definition: sync_item.h:29
Definition: mutex.h:42
void SetContentHash(const shash::Any &hash)
Definition: sync_item.h:120
PathString GetParentPath(const PathString &path)
Definition: shortstring.cc:14
void RemoveSymlinkCallback(const std::string &parent_dir, const std::string &link_name)
std::vector< DirectoryEntryBase > DirectoryEntryBaseList
void RemoveBlockDeviceCallback(const std::string &parent_dir, const std::string &link_name)
static XattrList * CreateFromFile(const std::string &path)
Definition: xattr.cc:32
const int kLogVerboseMsg
void LegacyBlockDeviceHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RemoveDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)
void TouchDirectory(SharedPtr< SyncItem > entry)
static const char * kVirtualPath
bool ignore_xdir_hardlinks
void AddFile(const DirectoryEntryBase &entry, const XattrList &xattrs, const std::string &parent_directory)
void EnsureAllowed(SharedPtr< SyncItem > entry)
void LegacyRegularHardlinkCallback(const std::string &parent_dir, const std::string &file_name)
void RegisterUnionEngine(SyncUnion *engine)
void AddHardlinkGroup(const HardlinkGroup &group)
std::string rdonly_path() const
Definition: sync_union.h:96
void RemoveDirectory(SharedPtr< SyncItem > entry)
std::map< uint64_t, HardlinkGroup > HardlinkGroupMap
void CreateNestedCatalog(const std::string &mountpoint)
size_t size() const
Definition: bigvector.h:117
virtual void OnAdd(const std::string &path, const catalog::DirectoryEntry &entry)
void Remove(SharedPtr< SyncItem > entry)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545
void LeaveAddedDirectoryCallback(const std::string &parent_dir, const std::string &dir_name)