GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/sync_union_tarball.cc Lines: 13 135 9.6 %
Date: 2019-02-03 02:48:13 Branches: 7 103 6.8 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System
3
 */
4
5
#define __STDC_FORMAT_MACROS
6
7
#include "sync_union_tarball.h"
8
9
#include <pthread.h>
10
#include <unistd.h>
11
12
#include <cassert>
13
#include <cstdio>
14
#include <list>
15
#include <set>
16
#include <string>
17
#include <vector>
18
19
#include "duplex_libarchive.h"
20
#include "fs_traversal.h"
21
#include "smalloc.h"
22
#include "sync_item.h"
23
#include "sync_item_dummy.h"
24
#include "sync_item_tar.h"
25
#include "sync_mediator.h"
26
#include "sync_union.h"
27
#include "util/posix.h"
28
#include "util_concurrency.h"
29
30
namespace publish {
31
32
1
SyncUnionTarball::SyncUnionTarball(AbstractSyncMediator *mediator,
33
                                   const std::string &rdonly_path,
34
                                   const std::string &tarball_path,
35
                                   const std::string &base_directory,
36
                                   const std::string &to_delete,
37
                                   const bool create_catalog_on_root)
38
    : SyncUnion(mediator, rdonly_path, "", ""),
39
      src(NULL),
40
      tarball_path_(tarball_path),
41
      base_directory_(base_directory),
42
      to_delete_(to_delete),
43
      create_catalog_on_root_(create_catalog_on_root),
44
1
      read_archive_signal_(new Signal) {}
45
46

1
SyncUnionTarball::~SyncUnionTarball() { delete read_archive_signal_; }
47
48
1
bool SyncUnionTarball::Initialize() {
49
  bool result;
50
51
  // We are just deleting entity from the repo
52
1
  if (tarball_path_ == "") {
53
    assert(NULL == src);
54
    return SyncUnion::Initialize();
55
  }
56
57
1
  src = archive_read_new();
58
1
  assert(ARCHIVE_OK == archive_read_support_format_tar(src));
59
1
  assert(ARCHIVE_OK == archive_read_support_format_empty(src));
60
61
1
  if (tarball_path_ == "-") {
62
    result = archive_read_open_filename(src, NULL, kBlockSize);
63
  } else {
64
1
    std::string tarball_absolute_path = GetAbsolutePath(tarball_path_);
65
    result = archive_read_open_filename(src, tarball_absolute_path.c_str(),
66
1
                                        kBlockSize);
67
  }
68
69
1
  if (result != ARCHIVE_OK) {
70
    LogCvmfs(kLogUnionFs, kLogStderr, "Impossible to open the archive.");
71
    return false;
72
  }
73
74
1
  return SyncUnion::Initialize();
75
}
76
77
/*
78
 * Libarchive is not thread aware, so we need to make sure that before
79
 * to read/"open" the next header in the archive the content of the
80
 *
81
 * present header is been consumed completely.
82
 * Different thread read/"open" the header from the one that consumes
83
 * it so we opted for a Signal that is backed by a conditional variable.
84
 * We wait for the signal just before to read the header.
85
 * Then when we have done with the header the Signal is fired.
86
 * The Signal can be fired inside the main loop if we don't need to read
87
 * data, or when the IngestionSource get closed, which means that we are
88
 * not reading data anymore from there.
89
 * This whole process is not necessary for directories since we don't
90
 * actually need to read data from them.
91
 *
92
 * It may be needed to add a catalog as a root of the archive.
93
 * A possible way to do it is by creating an virtual `.cvmfscatalog` file and
94
 * push it into the usual pipeline.
95
 * This operation must be done only once, and it seems like a good idea to do
96
 * it at the first iteration of the loop, hence this logic is managed by the
97
 * `first_iteration` boolean flag.
98
 */
99
void SyncUnionTarball::Traverse() {
100
  read_archive_signal_->Wakeup();
101
  assert(this->IsInitialized());
102
103
  /*
104
   * As first step we eliminate the requested directories.
105
   */
106
  if (to_delete_ != "") {
107
    vector<std::string> to_eliminate_vec = SplitString(to_delete_, ':');
108
109
    for (vector<string>::iterator s = to_eliminate_vec.begin();
110
         s != to_eliminate_vec.end(); ++s) {
111
      std::string parent_path;
112
      std::string filename;
113
      SplitPath(*s, &parent_path, &filename);
114
      if (parent_path == ".") parent_path = "";
115
      SharedPtr<SyncItem> sync_entry =
116
          CreateSyncItem(parent_path, filename, kItemDir);
117
      mediator_->Remove(sync_entry);
118
    }
119
  }
120
121
  // we are simplying deleting entity from  the repo
122
  if (NULL == src) return;
123
124
  bool first_iteration = true;
125
  struct archive_entry *entry = archive_entry_new();
126
  while (true) {
127
    // Get the lock, wait if lock is not available yet
128
    read_archive_signal_->Wait();
129
130
    int result = archive_read_next_header2(src, entry);
131
132
    switch (result) {
133
      case ARCHIVE_FATAL: {
134
        LogCvmfs(kLogUnionFs, kLogStderr,
135
                 "Fatal error in reading the archive.\n%s\n",
136
                 archive_error_string(src));
137
        abort();
138
        break;  // Only exit point with error
139
      }
140
141
      case ARCHIVE_RETRY: {
142
        LogCvmfs(kLogUnionFs, kLogStdout,
143
                 "Error in reading the header, retrying.\n%s\n",
144
                 archive_error_string(src));
145
        continue;
146
        break;
147
      }
148
149
      case ARCHIVE_EOF: {
150
        for (set<string>::iterator dir = to_create_catalog_dirs_.begin();
151
             dir != to_create_catalog_dirs_.end(); ++dir) {
152
          assert(dirs_.find(*dir) != dirs_.end());
153
          SharedPtr<SyncItem> to_mark = dirs_[*dir];
154
          assert(to_mark->IsDirectory());
155
          to_mark->SetCatalogMarker();
156
          to_mark->MakePlaceholderDirectory();
157
          ProcessDirectory(to_mark);
158
        }
159
        return;  // Only successful exit point
160
        break;
161
      }
162
163
      case ARCHIVE_WARN: {
164
        LogCvmfs(kLogUnionFs, kLogStderr,
165
                 "Warning in uncompression reading, going on.\n %s",
166
                 archive_error_string(src));
167
        // We actually want this to enter the ARCHIVE_OK case
168
      }
169
170
      case ARCHIVE_OK: {
171
        if (first_iteration && create_catalog_on_root_) {
172
          struct archive_entry *catalog = archive_entry_new();
173
          std::string catalog_path = ".cvmfscatalog";
174
          archive_entry_set_pathname(catalog, catalog_path.c_str());
175
          archive_entry_set_size(catalog, 0);
176
          archive_entry_set_filetype(catalog, AE_IFREG);
177
          archive_entry_set_perm(catalog, kDefaultFileMode);
178
          archive_entry_set_gid(catalog, getgid());
179
          archive_entry_set_uid(catalog, getuid());
180
          ProcessArchiveEntry(catalog);
181
          archive_entry_free(catalog);
182
          // The ProcessArchiveEntry does call Wakeup on the signal, in this
183
          // particular corner case we need to re-wait for it.
184
          read_archive_signal_->Wait();
185
        }
186
        first_iteration = false;
187
188
        ProcessArchiveEntry(entry);
189
        break;
190
      }
191
192
      default: {
193
        // We should never enter in this branch, but just for safeness we prefer
194
        // to abort in case we hit a case we don't how to manage.
195
        LogCvmfs(kLogUnionFs, kLogStderr,
196
                 "Enter in unknow state. Aborting.\nError: %s\n", result,
197
                 archive_error_string(src));
198
199
        abort();
200
      }
201
    }
202
  }
203
}
204
205
void SyncUnionTarball::ProcessArchiveEntry(struct archive_entry *entry) {
206
  std::string archive_file_path(archive_entry_pathname(entry));
207
  archive_file_path = SanitizePath(archive_file_path);
208
209
  std::string complete_path =
210
      MakeCanonicalPath(base_directory_ + "/" + archive_file_path);
211
212
  std::string parent_path;
213
  std::string filename;
214
  SplitPath(complete_path, &parent_path, &filename);
215
216
  CreateDirectories(parent_path);
217
218
  SharedPtr<SyncItem> sync_entry = SharedPtr<SyncItem>(new SyncItemTar(
219
      parent_path, filename, src, entry, read_archive_signal_, this));
220
221
  if (NULL != archive_entry_hardlink(entry)) {
222
    const std::string hardlink =
223
        base_directory_ + "/" + std::string(archive_entry_hardlink(entry));
224
225
    if (hardlinks_.find(hardlink) != hardlinks_.end()) {
226
      hardlinks_.find(hardlink)->second.push_back(complete_path);
227
    } else {
228
      std::list<std::string> to_hardlink;
229
      to_hardlink.push_back(complete_path);
230
      hardlinks_[hardlink] = to_hardlink;
231
    }
232
    read_archive_signal_->Wakeup();
233
    return;
234
  }
235
236
  if (sync_entry->IsDirectory()) {
237
    if (know_directories_.find(complete_path) != know_directories_.end()) {
238
      sync_entry->MakePlaceholderDirectory();
239
    }
240
    ProcessUnmaterializedDirectory(sync_entry);
241
    dirs_[complete_path] = sync_entry;
242
    know_directories_.insert(complete_path);
243
244
    read_archive_signal_->Wakeup();  // We don't need to read data and we
245
                                     // can read the next header
246
247
  } else if (sync_entry->IsRegularFile()) {
248
    // inside the process pipeline we will wake up the signal
249
    ProcessFile(sync_entry);
250
    if (filename == ".cvmfscatalog") {
251
      to_create_catalog_dirs_.insert(parent_path);
252
    }
253
254
  } else if (sync_entry->IsSymlink() || sync_entry->IsFifo() ||
255
             sync_entry->IsSocket() || sync_entry->IsCharacterDevice() ||
256
             sync_entry->IsBlockDevice()) {
257
    // we avoid to add an entity called as a catalog marker if it is not a
258
    // regular file
259
    if (filename != ".cvmfscatalog") {
260
      ProcessFile(sync_entry);
261
    } else {
262
      LogCvmfs(kLogUnionFs, kLogStderr,
263
               "Found entity called as a catalog marker '%s' that however is "
264
               "not a regular file, abort",
265
               complete_path.c_str());
266
      abort();
267
    }
268
269
    // here we don't need to read data from the tar file so we can wake up
270
    // immediately the signal
271
    read_archive_signal_->Wakeup();
272
273
  } else {
274
    LogCvmfs(kLogUnionFs, kLogStderr,
275
             "Fatal error found unexpected file: \n%s\n", filename.c_str());
276
    // if for any reason this code path change and we don't abort anymore,
277
    // remember to wakeup the signal, otherwise we will be stuck in a deadlock
278
    //
279
    // read_archive_signal_->Wakeup();
280
    abort();
281
  }
282
}
283
284
std::string SyncUnionTarball::SanitizePath(const std::string &path) {
285
  if (path.length() >= 2) {
286
    if (path[0] == '.' && path[1] == '/') {
287
      std::string to_return(path);
288
      to_return.erase(0, 2);
289
      return to_return;
290
    }
291
  }
292
  return path;
293
}
294
295
void SyncUnionTarball::PostUpload() {
296
  std::map<const std::string, std::list<std::string> >::iterator hardlink;
297
  for (hardlink = hardlinks_.begin(); hardlink != hardlinks_.end();
298
       ++hardlink) {
299
    std::list<std::string>::iterator entry;
300
    for (entry = hardlink->second.begin(); entry != hardlink->second.end();
301
         ++entry) {
302
      mediator_->Clone(*entry, hardlink->first);
303
    }
304
  }
305
}
306
307
std::string SyncUnionTarball::UnwindWhiteoutFilename(
308
    SharedPtr<SyncItem> entry) const {
309
  return entry->filename();
310
}
311
312
bool SyncUnionTarball::IsOpaqueDirectory(SharedPtr<SyncItem> directory) const {
313
  return false;
314
}
315
316
bool SyncUnionTarball::IsWhiteoutEntry(SharedPtr<SyncItem> entry) const {
317
  return false;
318
}
319
320
/* Tar files are not necessarly traversed in order from root to leave.
321
 * So it may happens that we are expanding the file `/a/b/c.txt` without
322
 * having created yet the directory `/a/b/`.
323
 * In order to overcome this limitation the following function create dummy
324
 * directories that can be used as placeholder and that they will be overwritten
325
 * as soon as the real directory is found in the tarball
326
 */
327
void SyncUnionTarball::CreateDirectories(const std::string &target) {
328
  if (know_directories_.find(target) != know_directories_.end()) return;
329
  if (target == ".") return;
330
331
  std::string dirname = "";
332
  std::string filename = "";
333
  SplitPath(target, &dirname, &filename);
334
  CreateDirectories(dirname);
335
336
  if (dirname == ".") dirname = "";
337
  SharedPtr<SyncItem> dummy = SharedPtr<SyncItem>(
338
      new SyncItemDummyDir(dirname, filename, this, kItemDir));
339
340
  ProcessUnmaterializedDirectory(dummy);
341
  dirs_[target] = dummy;
342
  know_directories_.insert(target);
343
}
344
345
}  // namespace publish