CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sync_union_tarball.cc
Go to the documentation of this file.
1 
5 #define __STDC_FORMAT_MACROS
6 
7 #include "sync_union_tarball.h"
8 
9 #include <pthread.h>
10 #include <unistd.h>
11 
12 #include <cassert>
13 #include <cstdio>
14 #include <list>
15 #include <set>
16 #include <string>
17 #include <vector>
18 
19 #include "duplex_libarchive.h"
20 #include "sync_item.h"
21 #include "sync_item_dummy.h"
22 #include "sync_item_tar.h"
23 #include "sync_mediator.h"
24 #include "sync_union.h"
25 #include "util/concurrency.h"
26 #include "util/exception.h"
27 #include "util/fs_traversal.h"
28 #include "util/posix.h"
29 #include "util/smalloc.h"
30 
31 namespace publish {
32 
34  const std::string &rdonly_path,
35  const std::string &tarball_path,
36  const std::string &base_directory,
37  const uid_t uid,
38  const gid_t gid,
39  const std::string &to_delete,
40  const bool create_catalog_on_root,
41  const std::string &path_delimiter)
42  : SyncUnion(mediator, rdonly_path, "", "")
43  , src(NULL)
44  , tarball_path_(tarball_path)
45  , base_directory_(base_directory)
46  , uid_(uid)
47  , gid_(gid)
48  , to_delete_(to_delete)
49  , create_catalog_on_root_(create_catalog_on_root)
50  , path_delimiter_(path_delimiter)
51  , read_archive_signal_(new Signal) { }
52 
54 
56  bool result;
57 
58  // We are just deleting entity from the repo
59  if (tarball_path_ == "") {
60  assert(NULL == src);
61  return SyncUnion::Initialize();
62  }
63 
64  src = archive_read_new();
65  assert(ARCHIVE_OK == archive_read_support_format_tar(src));
66  assert(ARCHIVE_OK == archive_read_support_format_empty(src));
67 
68  if (tarball_path_ == "-") {
69  result = archive_read_open_filename(src, NULL, kBlockSize);
70  } else {
71  std::string tarball_absolute_path = GetAbsolutePath(tarball_path_);
72  result = archive_read_open_filename(src, tarball_absolute_path.c_str(),
73  kBlockSize);
74  }
75 
76  if (result != ARCHIVE_OK) {
77  LogCvmfs(kLogUnionFs, kLogStderr, "Impossible to open the archive: %s",
78  archive_error_string(src));
79  return false;
80  }
81 
82  return SyncUnion::Initialize();
83 }
84 
85 /*
86  * Libarchive is not thread aware, so we need to make sure that before
87  * to read/"open" the next header in the archive the content of the
88  *
89  * present header is been consumed completely.
90  * Different thread read/"open" the header from the one that consumes
91  * it so we opted for a Signal that is backed by a conditional variable.
92  * We wait for the signal just before to read the header.
93  * Then when we have done with the header the Signal is fired.
94  * The Signal can be fired inside the main loop if we don't need to read
95  * data, or when the IngestionSource get closed, which means that we are
96  * not reading data anymore from there.
97  * This whole process is not necessary for directories since we don't
98  * actually need to read data from them.
99  *
100  * It may be needed to add a catalog as a root of the archive.
101  * A possible way to do it is by creating an virtual `.cvmfscatalog` file and
102  * push it into the usual pipeline.
103  * This operation must be done only once, and it seems like a good idea to do
104  * it at the first iteration of the loop, hence this logic is managed by the
105  * `first_iteration` boolean flag.
106  */
109  assert(this->IsInitialized());
110 
111  /*
112  * As first step we eliminate the requested directories.
113  */
114  if (to_delete_ != "") {
115  vector<std::string> to_eliminate_vec = SplitStringMultiChar(
117 
118  for (vector<string>::iterator s = to_eliminate_vec.begin();
119  s != to_eliminate_vec.end();
120  ++s) {
121  std::string parent_path;
122  std::string filename;
123  SplitPath(*s, &parent_path, &filename);
124  if (parent_path == ".")
125  parent_path = "";
126  SharedPtr<SyncItem> sync_entry = CreateSyncItem(parent_path, filename,
127  kItemDir);
128  mediator_->Remove(sync_entry);
129  }
130  }
131 
132  // we are simply deleting entity from the repo
133  if (NULL == src)
134  return;
135 
136  struct archive_entry *entry = archive_entry_new();
137  while (true) {
138  // Get the lock, wait if lock is not available yet
140 
141  int result = archive_read_next_header2(src, entry);
142 
143  switch (result) {
144  case ARCHIVE_FATAL: {
145  PANIC(kLogStderr, "Fatal error in reading the archive.\n%s\n",
146  archive_error_string(src));
147  break; // Only exit point with error
148  }
149 
150  case ARCHIVE_RETRY: {
152  "Error in reading the header, retrying.\n%s\n",
153  archive_error_string(src));
154  continue;
155  break;
156  }
157 
158  case ARCHIVE_EOF: {
159  if (create_catalog_on_root_ && (base_directory_ != "/")) {
160  CreateDirectories(base_directory_); // necessary for empty archives
163  ProcessFile(catalog);
165  }
166  for (set<string>::iterator dir = to_create_catalog_dirs_.begin();
167  dir != to_create_catalog_dirs_.end();
168  ++dir) {
169  assert(dirs_.find(*dir) != dirs_.end());
170  SharedPtr<SyncItem> to_mark = dirs_[*dir];
171  assert(to_mark->IsDirectory());
172  to_mark->SetCatalogMarker();
173  to_mark->MakePlaceholderDirectory();
174  ProcessDirectory(to_mark);
175  }
176  return; // Only successful exit point
177  break;
178  }
179 
180  case ARCHIVE_WARN: {
182  "Warning in uncompression reading, going on.\n %s",
183  archive_error_string(src));
184  // We actually want this to enter the ARCHIVE_OK case
185  }
186 
187  case ARCHIVE_OK: {
188  ProcessArchiveEntry(entry);
189  break;
190  }
191 
192  default: {
193  // We should never enter in this branch, but just for safeness we prefer
194  // to abort in case we hit a case we don't how to manage.
195  PANIC(kLogStderr, "Enter in unknown state. Aborting.\nError: %s\n",
196  result, archive_error_string(src));
197  }
198  }
199  }
200 }
201 
202 void SyncUnionTarball::ProcessArchiveEntry(struct archive_entry *entry) {
203  std::string archive_file_path(archive_entry_pathname(entry));
204  archive_file_path = SanitizePath(archive_file_path);
205 
206  std::string complete_path = base_directory_ != "/"
208  + archive_file_path)
209  : MakeCanonicalPath(archive_file_path);
210 
211  std::string parent_path;
212  std::string filename;
213  SplitPath(complete_path, &parent_path, &filename);
214  if (parent_path == ".")
215  parent_path.clear();
216 
217  CreateDirectories(parent_path);
218 
220  new SyncItemTar(parent_path, filename, src, entry, read_archive_signal_,
221  this, uid_, gid_));
222 
223  if (NULL != archive_entry_hardlink(entry)) {
224  const std::string hardlink_name(
225  SanitizePath(archive_entry_hardlink(entry)));
226  const std::string hardlink = base_directory_ != "/"
227  ? base_directory_ + "/" + hardlink_name
228  : hardlink_name;
229 
230  if (hardlinks_.find(hardlink) != hardlinks_.end()) {
231  hardlinks_.find(hardlink)->second.push_back(complete_path);
232  } else {
233  std::list<std::string> to_hardlink;
234  to_hardlink.push_back(complete_path);
235  hardlinks_[hardlink] = to_hardlink;
236  }
237  if (filename == ".cvmfscatalog") {
238  // the file is created in the PostUpload phase
239  to_create_catalog_dirs_.insert(parent_path);
240  }
242  return;
243  }
244 
245  if (sync_entry->IsDirectory()) {
246  if (know_directories_.find(complete_path) != know_directories_.end()) {
247  sync_entry->MakePlaceholderDirectory();
248  }
249  ProcessUnmaterializedDirectory(sync_entry);
250  dirs_[complete_path] = sync_entry;
251  know_directories_.insert(complete_path);
252 
253  read_archive_signal_->Wakeup(); // We don't need to read data and we
254  // can read the next header
255 
256  } else if (sync_entry->IsRegularFile()) {
257  // inside the process pipeline we will wake up the signal
258  ProcessFile(sync_entry);
259  if (filename == ".cvmfscatalog") {
260  to_create_catalog_dirs_.insert(parent_path);
261  }
262 
263  } else if (sync_entry->IsSymlink() || sync_entry->IsFifo()
264  || sync_entry->IsSocket() || sync_entry->IsCharacterDevice()
265  || sync_entry->IsBlockDevice()) {
266  // we avoid to add an entity called as a catalog marker if it is not a
267  // regular file
268  if (filename != ".cvmfscatalog") {
269  ProcessFile(sync_entry);
270  } else {
272  "Found entity called as a catalog marker '%s' that however is "
273  "not a regular file, abort",
274  complete_path.c_str());
275  }
276 
277  // here we don't need to read data from the tar file so we can wake up
278  // immediately the signal
280 
281  } else {
282  PANIC(kLogStderr, "Fatal error found unexpected file: \n%s\n",
283  filename.c_str());
284  // if for any reason this code path change and we don't abort anymore,
285  // remember to wakeup the signal, otherwise we will be stuck in a deadlock
286  //
287  // read_archive_signal_->Wakeup();
288  }
289 }
290 
291 std::string SyncUnionTarball::SanitizePath(const std::string &path) {
292  if (path.length() >= 2) {
293  if (path[0] == '.' && path[1] == '/') {
294  return path.substr(2);
295  }
296  }
297  if (path.length() >= 1) {
298  if (path[0] == '/') {
299  return path.substr(1);
300  }
301  }
302  return path;
303 }
304 
306  std::map<const std::string, std::list<std::string> >::iterator hardlink;
307  for (hardlink = hardlinks_.begin(); hardlink != hardlinks_.end();
308  ++hardlink) {
309  std::list<std::string>::iterator entry;
310  for (entry = hardlink->second.begin(); entry != hardlink->second.end();
311  ++entry) {
312  mediator_->Clone(*entry, hardlink->first);
313  }
314  }
315 }
316 
318  SharedPtr<SyncItem> entry) const {
319  return entry->filename();
320 }
321 
323  return false;
324 }
325 
327  return false;
328 }
329 
330 /* Tar files are not necessarily traversed in order from root to leave.
331  * So it may happens that we are expanding the file `/a/b/c.txt` without
332  * having created yet the directory `/a/b/`.
333  * In order to overcome this limitation the following function create dummy
334  * directories that can be used as placeholder and that they will be overwritten
335  * as soon as the real directory is found in the tarball
336  */
337 void SyncUnionTarball::CreateDirectories(const std::string &target) {
338  if (know_directories_.find(target) != know_directories_.end())
339  return;
340  if (target == ".")
341  return;
342 
343  std::string dirname = "";
344  std::string filename = "";
345  SplitPath(target, &dirname, &filename);
346  CreateDirectories(dirname);
347 
348  if (dirname == ".")
349  dirname = "";
351  new SyncItemDummyDir(dirname, filename, this, kItemDir, uid_, gid_));
352 
354  dirs_[target] = dummy;
355  know_directories_.insert(target);
356 }
357 
358 } // namespace publish
std::map< const std::string, std::list< std::string > > hardlinks_
virtual bool ProcessUnmaterializedDirectory(SharedPtr< SyncItem > entry)
Definition: sync_union.cc:92
bool IsOpaqueDirectory(SharedPtr< SyncItem > directory) const
std::map< std::string, SharedPtr< SyncItem > > dirs_
const std::string base_directory_
#define PANIC(...)
Definition: exception.h:29
virtual bool Initialize()
Definition: sync_union.cc:24
gid_t gid_
Definition: loader.cc:135
void Wakeup()
Definition: concurrency.cc:60
assert((mem||(size==0))&&"Out Of Memory")
std::string UnwindWhiteoutFilename(SharedPtr< SyncItem > entry) const
void CreateDirectories(const std::string &target)
SharedPtr< SyncItem > CreateSyncItem(const std::string &relative_parent_path, const std::string &filename, const SyncItemType entry_type) const
Definition: sync_union.cc:30
const std::string tarball_path_
std::set< std::string > to_create_catalog_dirs_
SyncUnionTarball(AbstractSyncMediator *mediator, const std::string &rdonly_path, const std::string &tarball_path, const std::string &base_directory, const uid_t uid, const gid_t gid, const std::string &to_delete, const bool create_catalog_on_root, const std::string &path_delimiter=":")
std::string GetAbsolutePath(const std::string &path)
Definition: posix.cc:159
void SplitPath(const std::string &path, std::string *dirname, std::string *filename)
Definition: posix.cc:114
uid_t uid_
Definition: loader.cc:134
AbstractSyncMediator * mediator_
Definition: sync_union.h:147
bool IsInitialized() const
Definition: sync_union.h:139
std::string SanitizePath(const std::string &path)
const std::string to_delete_
entity to delete before to extract the tar
vector< string > SplitStringMultiChar(const string &str, const string &delim)
Definition: string.cc:341
void ProcessArchiveEntry(struct archive_entry *entry)
void ProcessFile(SharedPtr< SyncItem > entry)
Definition: sync_union.cc:116
void Wait()
Definition: concurrency.cc:50
virtual void Clone(const std::string from, const std::string to)=0
virtual void Remove(SharedPtr< SyncItem > entry)=0
virtual bool ProcessDirectory(const std::string &parent_dir, const std::string &dir_name)
static const size_t kBlockSize
const std::string path_delimiter_
delimiter used to split paths
bool IsWhiteoutEntry(SharedPtr< SyncItem > entry) const
std::set< std::string > know_directories_
directory that we know already exist
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:98
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545