CernVM-FS  2.11.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sync_union_tarball.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "sync_union_tarball.h"
8 
9 #include <pthread.h>
10 #include <unistd.h>
11 
12 #include <cassert>
13 #include <cstdio>
14 #include <list>
15 #include <set>
16 #include <string>
17 #include <vector>
18 
19 #include "duplex_libarchive.h"
20 #include "sync_item.h"
21 #include "sync_item_dummy.h"
22 #include "sync_item_tar.h"
23 #include "sync_mediator.h"
24 #include "sync_union.h"
25 #include "util/concurrency.h"
26 #include "util/exception.h"
27 #include "util/fs_traversal.h"
28 #include "util/posix.h"
29 #include "util/smalloc.h"
30 
31 namespace publish {
32 
34  const std::string &rdonly_path,
35  const std::string &tarball_path,
36  const std::string &base_directory,
37  const std::string &to_delete,
38  const bool create_catalog_on_root)
39  : SyncUnion(mediator, rdonly_path, "", ""),
40  src(NULL),
41  tarball_path_(tarball_path),
42  base_directory_(base_directory),
43  to_delete_(to_delete),
44  create_catalog_on_root_(create_catalog_on_root),
45  read_archive_signal_(new Signal) {}
46 
48 
50  bool result;
51 
52  // We are just deleting entity from the repo
53  if (tarball_path_ == "") {
54  assert(NULL == src);
55  return SyncUnion::Initialize();
56  }
57 
58  src = archive_read_new();
59  assert(ARCHIVE_OK == archive_read_support_format_tar(src));
60  assert(ARCHIVE_OK == archive_read_support_format_empty(src));
61 
62  if (tarball_path_ == "-") {
63  result = archive_read_open_filename(src, NULL, kBlockSize);
64  } else {
65  std::string tarball_absolute_path = GetAbsolutePath(tarball_path_);
66  result = archive_read_open_filename(src, tarball_absolute_path.c_str(),
67  kBlockSize);
68  }
69 
70  if (result != ARCHIVE_OK) {
71  LogCvmfs(kLogUnionFs, kLogStderr, "Impossible to open the archive.");
72  return false;
73  }
74 
75  return SyncUnion::Initialize();
76 }
77 
78 /*
79  * Libarchive is not thread aware, so we need to make sure that before
80  * to read/"open" the next header in the archive the content of the
81  *
82  * present header is been consumed completely.
83  * Different thread read/"open" the header from the one that consumes
84  * it so we opted for a Signal that is backed by a conditional variable.
85  * We wait for the signal just before to read the header.
86  * Then when we have done with the header the Signal is fired.
87  * The Signal can be fired inside the main loop if we don't need to read
88  * data, or when the IngestionSource get closed, which means that we are
89  * not reading data anymore from there.
90  * This whole process is not necessary for directories since we don't
91  * actually need to read data from them.
92  *
93  * It may be needed to add a catalog as a root of the archive.
94  * A possible way to do it is by creating an virtual `.cvmfscatalog` file and
95  * push it into the usual pipeline.
96  * This operation must be done only once, and it seems like a good idea to do
97  * it at the first iteration of the loop, hence this logic is managed by the
98  * `first_iteration` boolean flag.
99  */
102  assert(this->IsInitialized());
103 
104  /*
105  * As first step we eliminate the requested directories.
106  */
107  if (to_delete_ != "") {
108  vector<std::string> to_eliminate_vec = SplitString(to_delete_, ':');
109 
110  for (vector<string>::iterator s = to_eliminate_vec.begin();
111  s != to_eliminate_vec.end(); ++s) {
112  std::string parent_path;
113  std::string filename;
114  SplitPath(*s, &parent_path, &filename);
115  if (parent_path == ".") parent_path = "";
116  SharedPtr<SyncItem> sync_entry =
117  CreateSyncItem(parent_path, filename, kItemDir);
118  mediator_->Remove(sync_entry);
119  }
120  }
121 
122  // we are simplying deleting entity from the repo
123  if (NULL == src) return;
124 
125  struct archive_entry *entry = archive_entry_new();
126  while (true) {
127  // Get the lock, wait if lock is not available yet
129 
130  int result = archive_read_next_header2(src, entry);
131 
132  switch (result) {
133  case ARCHIVE_FATAL: {
134  PANIC(kLogStderr, "Fatal error in reading the archive.\n%s\n",
135  archive_error_string(src));
136  break; // Only exit point with error
137  }
138 
139  case ARCHIVE_RETRY: {
141  "Error in reading the header, retrying.\n%s\n",
142  archive_error_string(src));
143  continue;
144  break;
145  }
146 
147  case ARCHIVE_EOF: {
148  if (create_catalog_on_root_ && (base_directory_ != "/")) {
149  CreateDirectories(base_directory_); // necessary for empty archives
152  ProcessFile(catalog);
154  }
155  for (set<string>::iterator dir = to_create_catalog_dirs_.begin();
156  dir != to_create_catalog_dirs_.end(); ++dir) {
157  assert(dirs_.find(*dir) != dirs_.end());
158  SharedPtr<SyncItem> to_mark = dirs_[*dir];
159  assert(to_mark->IsDirectory());
160  to_mark->SetCatalogMarker();
161  to_mark->MakePlaceholderDirectory();
162  ProcessDirectory(to_mark);
163  }
164  return; // Only successful exit point
165  break;
166  }
167 
168  case ARCHIVE_WARN: {
170  "Warning in uncompression reading, going on.\n %s",
171  archive_error_string(src));
172  // We actually want this to enter the ARCHIVE_OK case
173  }
174 
175  case ARCHIVE_OK: {
176  ProcessArchiveEntry(entry);
177  break;
178  }
179 
180  default: {
181  // We should never enter in this branch, but just for safeness we prefer
182  // to abort in case we hit a case we don't how to manage.
183  PANIC(kLogStderr, "Enter in unknow state. Aborting.\nError: %s\n",
184  result, archive_error_string(src));
185  }
186  }
187  }
188 }
189 
190 void SyncUnionTarball::ProcessArchiveEntry(struct archive_entry *entry) {
191  std::string archive_file_path(archive_entry_pathname(entry));
192  archive_file_path = SanitizePath(archive_file_path);
193 
194  std::string complete_path =
195  base_directory_ != "/"
196  ? MakeCanonicalPath(base_directory_ + "/" + archive_file_path)
197  : MakeCanonicalPath(archive_file_path);
198 
199  std::string parent_path;
200  std::string filename;
201  SplitPath(complete_path, &parent_path, &filename);
202  if (parent_path == ".") parent_path.clear();
203 
204  CreateDirectories(parent_path);
205 
207  parent_path, filename, src, entry, read_archive_signal_, this));
208 
209  if (NULL != archive_entry_hardlink(entry)) {
210  const std::string hardlink_name(
211  SanitizePath(archive_entry_hardlink(entry)));
212  const std::string hardlink = base_directory_ != "/"
213  ? base_directory_ + "/" + hardlink_name
214  : hardlink_name;
215 
216  if (hardlinks_.find(hardlink) != hardlinks_.end()) {
217  hardlinks_.find(hardlink)->second.push_back(complete_path);
218  } else {
219  std::list<std::string> to_hardlink;
220  to_hardlink.push_back(complete_path);
221  hardlinks_[hardlink] = to_hardlink;
222  }
223  if (filename == ".cvmfscatalog") {
224  // the file is created in the PostUpload phase
225  to_create_catalog_dirs_.insert(parent_path);
226  }
228  return;
229  }
230 
231  if (sync_entry->IsDirectory()) {
232  if (know_directories_.find(complete_path) != know_directories_.end()) {
233  sync_entry->MakePlaceholderDirectory();
234  }
235  ProcessUnmaterializedDirectory(sync_entry);
236  dirs_[complete_path] = sync_entry;
237  know_directories_.insert(complete_path);
238 
239  read_archive_signal_->Wakeup(); // We don't need to read data and we
240  // can read the next header
241 
242  } else if (sync_entry->IsRegularFile()) {
243  // inside the process pipeline we will wake up the signal
244  ProcessFile(sync_entry);
245  if (filename == ".cvmfscatalog") {
246  to_create_catalog_dirs_.insert(parent_path);
247  }
248 
249  } else if (sync_entry->IsSymlink() || sync_entry->IsFifo() ||
250  sync_entry->IsSocket() || sync_entry->IsCharacterDevice() ||
251  sync_entry->IsBlockDevice()) {
252  // we avoid to add an entity called as a catalog marker if it is not a
253  // regular file
254  if (filename != ".cvmfscatalog") {
255  ProcessFile(sync_entry);
256  } else {
258  "Found entity called as a catalog marker '%s' that however is "
259  "not a regular file, abort",
260  complete_path.c_str());
261  }
262 
263  // here we don't need to read data from the tar file so we can wake up
264  // immediately the signal
266 
267  } else {
268  PANIC(kLogStderr, "Fatal error found unexpected file: \n%s\n",
269  filename.c_str());
270  // if for any reason this code path change and we don't abort anymore,
271  // remember to wakeup the signal, otherwise we will be stuck in a deadlock
272  //
273  // read_archive_signal_->Wakeup();
274  }
275 }
276 
277 std::string SyncUnionTarball::SanitizePath(const std::string &path) {
278  if (path.length() >= 2) {
279  if (path[0] == '.' && path[1] == '/') {
280  return path.substr(2);
281  }
282  }
283  if (path.length() >= 1) {
284  if (path[0] == '/') {
285  return path.substr(1);
286  }
287  }
288  return path;
289 }
290 
292  std::map<const std::string, std::list<std::string> >::iterator hardlink;
293  for (hardlink = hardlinks_.begin(); hardlink != hardlinks_.end();
294  ++hardlink) {
295  std::list<std::string>::iterator entry;
296  for (entry = hardlink->second.begin(); entry != hardlink->second.end();
297  ++entry) {
298  mediator_->Clone(*entry, hardlink->first);
299  }
300  }
301 }
302 
304  SharedPtr<SyncItem> entry) const {
305  return entry->filename();
306 }
307 
309  return false;
310 }
311 
313  return false;
314 }
315 
316 /* Tar files are not necessarly traversed in order from root to leave.
317  * So it may happens that we are expanding the file `/a/b/c.txt` without
318  * having created yet the directory `/a/b/`.
319  * In order to overcome this limitation the following function create dummy
320  * directories that can be used as placeholder and that they will be overwritten
321  * as soon as the real directory is found in the tarball
322  */
323 void SyncUnionTarball::CreateDirectories(const std::string &target) {
324  if (know_directories_.find(target) != know_directories_.end()) return;
325  if (target == ".") return;
326 
327  std::string dirname = "";
328  std::string filename = "";
329  SplitPath(target, &dirname, &filename);
330  CreateDirectories(dirname);
331 
332  if (dirname == ".") dirname = "";
334  new SyncItemDummyDir(dirname, filename, this, kItemDir));
335 
337  dirs_[target] = dummy;
338  know_directories_.insert(target);
339 }
340 
341 } // namespace publish
#define LogCvmfs(source, mask,...)
Definition: logging.h:22
std::map< const std::string, std::list< std::string > > hardlinks_
virtual bool ProcessUnmaterializedDirectory(SharedPtr< SyncItem > entry)
Definition: sync_union.cc:92
bool IsOpaqueDirectory(SharedPtr< SyncItem > directory) const
std::map< std::string, SharedPtr< SyncItem > > dirs_
const std::string base_directory_
#define PANIC(...)
Definition: exception.h:27
virtual bool Initialize()
Definition: sync_union.cc:24
void Wakeup()
Definition: concurrency.cc:59
assert((mem||(size==0))&&"Out Of Memory")
std::string UnwindWhiteoutFilename(SharedPtr< SyncItem > entry) const
void CreateDirectories(const std::string &target)
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
Definition: sync_union.cc:30
const std::string tarball_path_
std::set< std::string > to_create_catalog_dirs_
std::string GetAbsolutePath(const std::string &path)
Definition: posix.cc:158
void SplitPath(const std::string &path, std::string *dirname, std::string *filename)
Definition: posix.cc:111
AbstractSyncMediator * mediator_
Definition: sync_union.h:147
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
bool IsInitialized() const
Definition: sync_union.h:139
std::string SanitizePath(const std::string &path)
const std::string to_delete_
entity to delete before to extract the tar
void ProcessArchiveEntry(struct archive_entry *entry)
void ProcessFile(SharedPtr< SyncItem > entry)
Definition: sync_union.cc:116
void Wait()
Definition: concurrency.cc:49
virtual void Clone(const std::string from, const std::string to)=0
virtual void Remove(SharedPtr< SyncItem > entry)=0
virtual bool ProcessDirectory(const std::string &parent_dir, const std::string &dir_name)
static const size_t kBlockSize
bool IsWhiteoutEntry(SharedPtr< SyncItem > entry) const
std::set< std::string > know_directories_
directory that we know already exist
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:96
SyncUnionTarball(AbstractSyncMediator *mediator, const std::string &rdonly_path, const std::string &tarball_path, const std::string &base_directory, const std::string &to_delete, const bool create_catalog_on_root)