GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/swissknife_sync.cc Lines: 0 454 0.0 %
Date: 2019-02-03 02:48:13 Branches: 0 310 0.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System
3
 *
4
 * This tool figures out the changes made to a cvmfs repository by means
5
 * of a union file system mounted on top of a cvmfs volume.
6
 * We take all three volumes (namely union, overlay and repository) into
7
 * account to sync the changes back into the repository.
8
 *
9
 * On the repository side we have a catalogs directory that mimicks the
10
 * shadow directory structure and stores compressed and uncompressed
11
 * versions of all catalogs.  The raw data are stored in the data
12
 * subdirectory in zlib-compressed form.  They are named with their SHA-1
13
 * hash of the compressed file (like in CVMFS client cache, but with a
14
 * 2-level cache hierarchy).  Symlinks from the catalog directory to the
15
 * data directory form the connection. If necessary, add a .htaccess file
16
 * to allow Apache to follow the symlinks.
17
 */
18
19
#define _FILE_OFFSET_BITS 64
20
21
#define __STDC_FORMAT_MACROS
22
23
#include "swissknife_sync.h"
24
#include "cvmfs_config.h"
25
26
#include <errno.h>
27
#include <fcntl.h>
28
#include <glob.h>
29
#include <inttypes.h>
30
#include <limits.h>
31
#include <sys/capability.h>
32
33
#include <cstdio>
34
#include <cstdlib>
35
#include <string>
36
#include <vector>
37
38
#include "catalog_mgr_ro.h"
39
#include "catalog_mgr_rw.h"
40
#include "catalog_virtual.h"
41
#include "download.h"
42
#include "logging.h"
43
#include "manifest.h"
44
#include "path_filters/dirtab.h"
45
#include "platform.h"
46
#include "reflog.h"
47
#include "sanitizer.h"
48
#include "statistics.h"
49
#include "sync_mediator.h"
50
#include "sync_union.h"
51
#include "sync_union_aufs.h"
52
#include "sync_union_overlayfs.h"
53
#include "util/string.h"
54
55
using namespace std;  // NOLINT
56
57
bool swissknife::CommandSync::CheckParams(const SyncParameters &p) {
58
  if (!DirectoryExists(p.dir_scratch)) {
59
    PrintError("overlay (copy on write) directory does not exist");
60
    return false;
61
  }
62
  if (!DirectoryExists(p.dir_union)) {
63
    PrintError("union volume does not exist");
64
    return false;
65
  }
66
  if (!DirectoryExists(p.dir_rdonly)) {
67
    PrintError("cvmfs read/only repository does not exist");
68
    return false;
69
  }
70
  if (p.stratum0 == "") {
71
    PrintError("Stratum0 url missing");
72
    return false;
73
  }
74
75
  if (p.manifest_path == "") {
76
    PrintError("manifest output required");
77
    return false;
78
  }
79
  if (!DirectoryExists(p.dir_temp)) {
80
    PrintError("data store directory does not exist");
81
    return false;
82
  }
83
84
  if (p.min_file_chunk_size >= p.avg_file_chunk_size ||
85
      p.avg_file_chunk_size >= p.max_file_chunk_size) {
86
    PrintError("file chunk size values are not sane");
87
    return false;
88
  }
89
90
  if (HasPrefix(p.spooler_definition, "gw", false)) {
91
    if (p.session_token_file.empty()) {
92
      PrintError(
93
          "Session token file has to be provided "
94
          "when upstream type is gw.");
95
      return false;
96
    }
97
  }
98
99
  return true;
100
}
101
102
int swissknife::CommandCreate::Main(const swissknife::ArgumentList &args) {
103
  const string manifest_path = *args.find('o')->second;
104
  const string dir_temp = *args.find('t')->second;
105
  const string spooler_definition = *args.find('r')->second;
106
  const string repo_name = *args.find('n')->second;
107
  const string reflog_chksum_path = *args.find('R')->second;
108
  if (args.find('l') != args.end()) {
109
    unsigned log_level =
110
        1 << (kLogLevel0 + String2Uint64(*args.find('l')->second));
111
    if (log_level > kLogNone) {
112
      LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
113
      return 1;
114
    }
115
    SetLogVerbosity(static_cast<LogLevels>(log_level));
116
  }
117
  shash::Algorithms hash_algorithm = shash::kSha1;
118
  if (args.find('a') != args.end()) {
119
    hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
120
    if (hash_algorithm == shash::kAny) {
121
      PrintError("unknown hash algorithm");
122
      return 1;
123
    }
124
  }
125
126
  const bool volatile_content = (args.count('v') > 0);
127
  const bool garbage_collectable = (args.count('z') > 0);
128
  std::string voms_authz;
129
  if (args.find('V') != args.end()) {
130
    voms_authz = *args.find('V')->second;
131
  }
132
133
  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm,
134
                                     zlib::kZlibDefault);
135
  UniquePtr<upload::Spooler> spooler(upload::Spooler::Construct(sd));
136
  assert(spooler.IsValid());
137
138
  UniquePtr<manifest::Manifest> manifest(
139
      catalog::WritableCatalogManager::CreateRepository(
140
          dir_temp, volatile_content, voms_authz, spooler));
141
  if (!manifest.IsValid()) {
142
    PrintError("Failed to create new repository");
143
    return 1;
144
  }
145
146
  UniquePtr<manifest::Reflog> reflog(CreateEmptyReflog(dir_temp, repo_name));
147
  if (!reflog.IsValid()) {
148
    PrintError("Failed to create fresh Reflog");
149
    return 1;
150
  }
151
152
  reflog->DropDatabaseFileOwnership();
153
  string reflog_path = reflog->database_file();
154
  reflog.Destroy();
155
  shash::Any reflog_hash(hash_algorithm);
156
  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
157
  spooler->UploadReflog(reflog_path);
158
  spooler->WaitForUpload();
159
  unlink(reflog_path.c_str());
160
  if (spooler->GetNumberOfErrors()) {
161
    LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload reflog");
162
    return 4;
163
  }
164
  assert(!reflog_chksum_path.empty());
165
  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
166
167
  // set optional manifest fields
168
  const bool needs_bootstrap_shortcuts = !voms_authz.empty();
169
  manifest->set_garbage_collectability(garbage_collectable);
170
  manifest->set_has_alt_catalog_path(needs_bootstrap_shortcuts);
171
172
  if (!manifest->Export(manifest_path)) {
173
    PrintError("Failed to create new repository");
174
    return 5;
175
  }
176
177
  return 0;
178
}
179
180
int swissknife::CommandUpload::Main(const swissknife::ArgumentList &args) {
181
  const string source = *args.find('i')->second;
182
  const string dest = *args.find('o')->second;
183
  const string spooler_definition = *args.find('r')->second;
184
  shash::Algorithms hash_algorithm = shash::kSha1;
185
  if (args.find('a') != args.end()) {
186
    hash_algorithm = shash::ParseHashAlgorithm(*args.find('a')->second);
187
    if (hash_algorithm == shash::kAny) {
188
      PrintError("unknown hash algorithm");
189
      return 1;
190
    }
191
  }
192
193
  const upload::SpoolerDefinition sd(spooler_definition, hash_algorithm);
194
  upload::Spooler *spooler = upload::Spooler::Construct(sd);
195
  assert(spooler);
196
  spooler->Upload(source, dest);
197
  spooler->WaitForUpload();
198
199
  if (spooler->GetNumberOfErrors() > 0) {
200
    LogCvmfs(kLogCatalog, kLogStderr, "failed to upload %s", source.c_str());
201
    return 1;
202
  }
203
204
  delete spooler;
205
206
  return 0;
207
}
208
209
int swissknife::CommandPeek::Main(const swissknife::ArgumentList &args) {
210
  const string file_to_peek = *args.find('d')->second;
211
  const string spooler_definition = *args.find('r')->second;
212
213
  // Hash doesn't matter
214
  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
215
  upload::Spooler *spooler = upload::Spooler::Construct(sd);
216
  assert(spooler);
217
  const bool success = spooler->Peek(file_to_peek);
218
219
  if (spooler->GetNumberOfErrors() > 0) {
220
    LogCvmfs(kLogCatalog, kLogStderr, "failed to peek for %s",
221
             file_to_peek.c_str());
222
    return 2;
223
  }
224
  if (!success) {
225
    LogCvmfs(kLogCatalog, kLogStdout, "%s not found", file_to_peek.c_str());
226
    return 1;
227
  }
228
  LogCvmfs(kLogCatalog, kLogStdout, "%s available", file_to_peek.c_str());
229
230
  delete spooler;
231
232
  return 0;
233
}
234
235
int swissknife::CommandRemove::Main(const ArgumentList &args) {
236
  const string file_to_delete = *args.find('o')->second;
237
  const string spooler_definition = *args.find('r')->second;
238
239
  // Hash doesn't matter
240
  const upload::SpoolerDefinition sd(spooler_definition, shash::kAny);
241
  upload::Spooler *spooler = upload::Spooler::Construct(sd);
242
  assert(spooler);
243
  spooler->RemoveAsync(file_to_delete);
244
  spooler->WaitForUpload();
245
246
  if (spooler->GetNumberOfErrors() > 0) {
247
    LogCvmfs(kLogCatalog, kLogStderr, "failed to delete %s",
248
             file_to_delete.c_str());
249
    return 1;
250
  }
251
252
  delete spooler;
253
254
  return 0;
255
}
256
257
int swissknife::CommandApplyDirtab::Main(const ArgumentList &args) {
258
  const string dirtab_file = *args.find('d')->second;
259
  union_dir_ = MakeCanonicalPath(*args.find('u')->second);
260
  scratch_dir_ = MakeCanonicalPath(*args.find('s')->second);
261
  const shash::Any base_hash = shash::MkFromHexPtr(
262
      shash::HexPtr(*args.find('b')->second), shash::kSuffixCatalog);
263
  const string stratum0 = *args.find('w')->second;
264
  const string dir_temp = *args.find('t')->second;
265
  verbose_ = (args.find('x') != args.end());
266
267
  // check if there is a dirtab file
268
  if (!FileExists(dirtab_file)) {
269
    LogCvmfs(kLogCatalog, kLogVerboseMsg,
270
             "Didn't find a dirtab at '%s'. Skipping...", dirtab_file.c_str());
271
    return 0;
272
  }
273
274
  // parse dirtab file
275
  catalog::Dirtab *dirtab = catalog::Dirtab::Create(dirtab_file);
276
  if (!dirtab->IsValid()) {
277
    LogCvmfs(kLogCatalog, kLogStderr, "Invalid or not readable dirtab '%s'",
278
             dirtab_file.c_str());
279
    return 1;
280
  }
281
  LogCvmfs(kLogCatalog, kLogVerboseMsg, "Found %d rules in dirtab '%s'",
282
           dirtab->RuleCount(), dirtab_file.c_str());
283
284
  // initialize catalog infrastructure
285
  const bool auto_manage_catalog_files = true;
286
  const bool follow_redirects = (args.count('L') > 0);
287
  if (!InitDownloadManager(follow_redirects)) {
288
    return 1;
289
  }
290
  catalog::SimpleCatalogManager catalog_manager(
291
      base_hash, stratum0, dir_temp, download_manager(), statistics(),
292
      auto_manage_catalog_files);
293
  catalog_manager.Init();
294
295
  vector<string> new_nested_catalogs;
296
  DetermineNestedCatalogCandidates(*dirtab, &catalog_manager,
297
                                   &new_nested_catalogs);
298
  const bool success = CreateCatalogMarkers(new_nested_catalogs);
299
  delete dirtab;
300
301
  return (success) ? 0 : 1;
302
}
303
304
305
namespace {
306
307
// Overwrite directory traversal in the globbing in order to avoid breaking out
308
// the repository tree
309
310
std::string *g_glob_uniondir = NULL;
311
312
bool GlobCheckPath(const char *name) {
313
  char resolved_cstr[PATH_MAX];
314
  char *retval = realpath(name, resolved_cstr);
315
  if (retval == NULL) return false;
316
317
  std::string resolved(resolved_cstr);
318
  if (resolved == *g_glob_uniondir) return true;
319
  if (!HasPrefix(resolved, (*g_glob_uniondir) + "/", false /*ignore_case*/)) {
320
    errno = EACCES;
321
    return false;
322
  }
323
  return true;
324
}
325
326
void *GlobOpendir(const char *name) {
327
  if (!GlobCheckPath(name)) return NULL;
328
  return opendir(name);
329
}
330
331
void GlobClosedir(void *dirp) {
332
  closedir(static_cast<DIR *>(dirp));
333
}
334
335
struct dirent *GlobReaddir(void *dirp) {
336
  return readdir(static_cast<DIR *>(dirp));
337
}
338
339
int GlobLstat(const char *name, struct stat *st) {
340
  if (!GlobCheckPath(name)) return -1;
341
  return lstat(name, st);
342
}
343
344
int GlobStat(const char *name, struct stat *st) {
345
  if (!GlobCheckPath(name)) return -1;
346
  return stat(name, st);
347
}
348
349
350
}  // anonymous namespace
351
352
void swissknife::CommandApplyDirtab::DetermineNestedCatalogCandidates(
353
    const catalog::Dirtab &dirtab,
354
    catalog::SimpleCatalogManager *catalog_manager,
355
    vector<string> *nested_catalog_candidates) {
356
  // find possible new nested catalog locations
357
  const catalog::Dirtab::Rules &lookup_rules = dirtab.positive_rules();
358
  catalog::Dirtab::Rules::const_iterator i = lookup_rules.begin();
359
  const catalog::Dirtab::Rules::const_iterator iend = lookup_rules.end();
360
  for (; i != iend; ++i) {
361
    assert(!i->is_negation);
362
363
    // run a glob using the current dirtab rule on the current repository
364
    // state
365
    const std::string &glob_string = i->pathspec.GetGlobString();
366
    const std::string &glob_string_abs = union_dir_ + glob_string;
367
    const int glob_flags = GLOB_ONLYDIR | GLOB_NOSORT | GLOB_PERIOD |
368
                           GLOB_ALTDIRFUNC;
369
    glob_t glob_res;
370
    g_glob_uniondir = new std::string(union_dir_);
371
    glob_res.gl_opendir = GlobOpendir;
372
    glob_res.gl_readdir = GlobReaddir;
373
    glob_res.gl_closedir = GlobClosedir;
374
    glob_res.gl_lstat = GlobLstat;
375
    glob_res.gl_stat = GlobStat;
376
    const int glob_retval =
377
        glob(glob_string_abs.c_str(), glob_flags, NULL, &glob_res);
378
    delete g_glob_uniondir;
379
    g_glob_uniondir = NULL;
380
381
    if (glob_retval == 0) {
382
      // found some candidates... filtering by cvmfs catalog structure
383
      LogCvmfs(kLogCatalog, kLogDebug, "Found %d entries for pathspec (%s)",
384
               glob_res.gl_pathc, glob_string.c_str());
385
      FilterCandidatesFromGlobResult(dirtab, glob_res.gl_pathv,
386
                                     glob_res.gl_pathc, catalog_manager,
387
                                     nested_catalog_candidates);
388
    } else if (glob_retval == GLOB_NOMATCH) {
389
      LogCvmfs(kLogCvmfs, kLogStderr, "WARNING: cannot apply pathspec %s",
390
               glob_string.c_str());
391
    } else {
392
      LogCvmfs(kLogCvmfs, kLogStderr, "Failed to run glob matching (%s)",
393
               glob_string.c_str());
394
    }
395
396
    globfree(&glob_res);
397
  }
398
}
399
400
void swissknife::CommandApplyDirtab::FilterCandidatesFromGlobResult(
401
    const catalog::Dirtab &dirtab, char **paths, const size_t npaths,
402
    catalog::SimpleCatalogManager *catalog_manager,
403
    std::vector<std::string> *nested_catalog_candidates) {
404
  // go through the paths produced by glob() and filter them
405
  for (size_t i = 0; i < npaths; ++i) {
406
    // process candidate paths
407
    const std::string candidate(paths[i]);
408
    const std::string candidate_rel = candidate.substr(union_dir_.size());
409
410
    // check if path points to a directory
411
    platform_stat64 candidate_info;
412
    const int lstat_retval = platform_lstat(candidate.c_str(), &candidate_info);
413
    assert(lstat_retval == 0);
414
    if (!S_ISDIR(candidate_info.st_mode)) {
415
      // The GLOB_ONLYDIR flag is only a hint, non-directories can still be
416
      // returned
417
      LogCvmfs(kLogCatalog, kLogDebug,
418
               "The '%s' dirtab entry does not point to a directory "
419
               "but to a file or a symbolic link",
420
               candidate_rel.c_str());
421
      continue;
422
    }
423
424
    // check if the path is a meta-directory (. or ..)
425
    assert(candidate_rel.size() >= 2);
426
    if (candidate_rel.substr(candidate_rel.size() - 2) == "/." ||
427
        candidate_rel.substr(candidate_rel.size() - 3) == "/..") {
428
      continue;
429
    }
430
431
    // check that the path isn't excluded in the dirtab
432
    if (dirtab.IsOpposing(candidate_rel)) {
433
      LogCvmfs(kLogCatalog, kLogDebug, "Candidate '%s' is excluded by dirtab",
434
               candidate_rel.c_str());
435
      continue;
436
    }
437
438
    // lookup the path in the catalog structure to find out if it already
439
    // points to a nested catalog transition point. Furthermore it could be
440
    // a new directory and thus not in any catalog yet.
441
    catalog::DirectoryEntry dirent;
442
    const bool lookup_success = catalog_manager->LookupPath(
443
        candidate_rel, catalog::kLookupSole, &dirent);
444
    if (!lookup_success) {
445
      LogCvmfs(kLogCatalog, kLogDebug,
446
               "Didn't find '%s' in catalogs, could "
447
               "be a new directory and nested catalog.",
448
               candidate_rel.c_str());
449
      nested_catalog_candidates->push_back(candidate);
450
    } else if (!dirent.IsNestedCatalogMountpoint() &&
451
               !dirent.IsNestedCatalogRoot()) {
452
      LogCvmfs(kLogCatalog, kLogDebug,
453
               "Found '%s' in catalogs but is not a "
454
               "nested catalog yet.",
455
               candidate_rel.c_str());
456
      nested_catalog_candidates->push_back(candidate);
457
    } else {
458
      // check if the nested catalog marker is still there, we might need to
459
      // recreate the catalog after manual marker removal
460
      // Note: First we check if the parent directory shows up in the scratch
461
      //       space to verify that it was touched (copy-on-write)
462
      //       Otherwise we would force the cvmfs client behind the union
463
      //       file-
464
      //       system to (potentially) unncessarily fetch catalogs
465
      if (DirectoryExists(scratch_dir_ + candidate_rel) &&
466
          !FileExists(union_dir_ + candidate_rel + "/.cvmfscatalog")) {
467
        LogCvmfs(kLogCatalog, kLogStdout,
468
                 "WARNING: '%s' should be a nested "
469
                 "catalog according to the dirtab. "
470
                 "Recreating...",
471
                 candidate_rel.c_str());
472
        nested_catalog_candidates->push_back(candidate);
473
      } else {
474
        LogCvmfs(kLogCatalog, kLogDebug,
475
                 "Found '%s' in catalogs and it already is a nested catalog.",
476
                 candidate_rel.c_str());
477
      }
478
    }
479
  }
480
}
481
482
bool swissknife::CommandApplyDirtab::CreateCatalogMarkers(
483
    const std::vector<std::string> &new_nested_catalogs) {
484
  // go through the new nested catalog paths and create .cvmfscatalog markers
485
  // where necessary
486
  bool success = true;
487
  std::vector<std::string>::const_iterator k = new_nested_catalogs.begin();
488
  const std::vector<std::string>::const_iterator kend =
489
      new_nested_catalogs.end();
490
  for (; k != kend; ++k) {
491
    assert(!k->empty() && k->size() > union_dir_.size());
492
493
    // was the marker already created by hand?
494
    const std::string marker_path = *k + "/.cvmfscatalog";
495
    if (FileExists(marker_path)) {
496
      continue;
497
    }
498
499
    // create a nested catalog marker
500
    const mode_t mode = kDefaultFileMode;
501
    const int fd = open(marker_path.c_str(), O_CREAT, mode);
502
    if (fd < 0) {
503
      LogCvmfs(kLogCvmfs, kLogStderr,
504
               "Failed to create nested catalog marker "
505
               "at '%s' (errno: %d)",
506
               marker_path.c_str(), errno);
507
      success = false;
508
      continue;
509
    }
510
    close(fd);
511
512
    // inform the user if requested
513
    if (verbose_) {
514
      LogCvmfs(kLogCvmfs, kLogStdout, "Auto-creating nested catalog in %s",
515
               k->c_str());
516
    }
517
  }
518
519
  return success;
520
}
521
522
struct chunk_arg {
523
  chunk_arg(char param, size_t *save_to) : param(param), save_to(save_to) {}
524
  char param;
525
  size_t *save_to;
526
};
527
528
bool swissknife::CommandSync::ReadFileChunkingArgs(
529
    const swissknife::ArgumentList &args, SyncParameters *params) {
530
  typedef std::vector<chunk_arg> ChunkArgs;
531
532
  // define where to store the value of which file chunk argument
533
  ChunkArgs chunk_args;
534
  chunk_args.push_back(chunk_arg('a', &params->avg_file_chunk_size));
535
  chunk_args.push_back(chunk_arg('l', &params->min_file_chunk_size));
536
  chunk_args.push_back(chunk_arg('h', &params->max_file_chunk_size));
537
538
  // read the arguments
539
  ChunkArgs::const_iterator i = chunk_args.begin();
540
  ChunkArgs::const_iterator iend = chunk_args.end();
541
  for (; i != iend; ++i) {
542
    swissknife::ArgumentList::const_iterator arg = args.find(i->param);
543
544
    if (arg != args.end()) {
545
      size_t arg_value = static_cast<size_t>(String2Uint64(*arg->second));
546
      if (arg_value > 0)
547
        *i->save_to = arg_value;
548
      else
549
        return false;
550
    }
551
  }
552
553
  // check if argument values are sane
554
  return true;
555
}
556
557
bool swissknife::CommandSync::ObtainDacReadSearchCapability() {
558
  cap_value_t cap = CAP_DAC_READ_SEARCH;
559
#ifdef CAP_IS_SUPPORTED
560
  assert(CAP_IS_SUPPORTED(cap));
561
#endif
562
563
  cap_t caps_proc = cap_get_proc();
564
  assert(caps_proc != NULL);
565
566
  cap_flag_value_t cap_state;
567
  int retval = cap_get_flag(caps_proc, cap, CAP_EFFECTIVE, &cap_state);
568
  assert(retval == 0);
569
570
  if (cap_state == CAP_SET) {
571
    cap_free(caps_proc);
572
    return true;
573
  }
574
575
  retval = cap_get_flag(caps_proc, cap, CAP_PERMITTED, &cap_state);
576
  assert(retval == 0);
577
  if (cap_state != CAP_SET) {
578
    LogCvmfs(kLogCvmfs, kLogStdout,
579
             "Warning: CAP_DAC_READ_SEARCH cannot be obtained. "
580
             "It's not in the process's permitted set.");
581
    cap_free(caps_proc);
582
    return false;
583
  }
584
585
  retval = cap_set_flag(caps_proc, CAP_EFFECTIVE, 1, &cap, CAP_SET);
586
  assert(retval == 0);
587
588
  retval = cap_set_proc(caps_proc);
589
  cap_free(caps_proc);
590
591
  if (retval != 0) {
592
    LogCvmfs(kLogCvmfs, kLogStderr,
593
             "Cannot reset capabilities for current process "
594
             "(errno: %d)",
595
             errno);
596
    return false;
597
  }
598
599
  return true;
600
}
601
602
int swissknife::CommandSync::Main(const swissknife::ArgumentList &args) {
603
  SyncParameters params;
604
605
  // Initialization
606
  params.dir_union = MakeCanonicalPath(*args.find('u')->second);
607
  params.dir_scratch = MakeCanonicalPath(*args.find('s')->second);
608
  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
609
  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
610
  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
611
                                         shash::kSuffixCatalog);
612
  params.stratum0 = *args.find('w')->second;
613
  params.manifest_path = *args.find('o')->second;
614
  params.spooler_definition = *args.find('r')->second;
615
616
  params.public_keys = *args.find('K')->second;
617
  params.repo_name = *args.find('N')->second;
618
619
  params.ttl_seconds = catalog::Catalog::kDefaultTTL;
620
621
  if (args.find('f') != args.end())
622
    params.union_fs_type = *args.find('f')->second;
623
  if (args.find('A') != args.end()) params.is_balanced = true;
624
  if (args.find('x') != args.end()) params.print_changeset = true;
625
  if (args.find('y') != args.end()) params.dry_run = true;
626
  if (args.find('m') != args.end()) params.mucatalogs = true;
627
  if (args.find('i') != args.end()) params.ignore_xdir_hardlinks = true;
628
  if (args.find('d') != args.end()) params.stop_for_catalog_tweaks = true;
629
  if (args.find('V') != args.end()) params.voms_authz = true;
630
  if (args.find('F') != args.end()) params.authz_file = *args.find('F')->second;
631
  if (args.find('k') != args.end()) params.include_xattrs = true;
632
  if (args.find('Y') != args.end()) params.external_data = true;
633
  if (args.find('S') != args.end()) {
634
    bool retval = catalog::VirtualCatalog::ParseActions(
635
        *args.find('S')->second, &params.virtual_dir_actions);
636
    if (!retval) {
637
      LogCvmfs(kLogCvmfs, kLogStderr, "invalid virtual catalog options: %s",
638
               args.find('S')->second->c_str());
639
      return 1;
640
    }
641
  }
642
  if (args.find('z') != args.end()) {
643
    unsigned log_level =
644
        1 << (kLogLevel0 + String2Uint64(*args.find('z')->second));
645
    if (log_level > kLogNone) {
646
      LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
647
      return 1;
648
    }
649
    SetLogVerbosity(static_cast<LogLevels>(log_level));
650
  }
651
652
  if (args.find('X') != args.end())
653
    params.max_weight = String2Uint64(*args.find('X')->second);
654
  if (args.find('M') != args.end())
655
    params.min_weight = String2Uint64(*args.find('M')->second);
656
657
  if (args.find('p') != args.end()) {
658
    params.use_file_chunking = true;
659
    if (!ReadFileChunkingArgs(args, &params)) {
660
      PrintError("Failed to read file chunk size values");
661
      return 2;
662
    }
663
  }
664
  if (args.find('O') != args.end()) {
665
    params.generate_legacy_bulk_chunks = true;
666
  }
667
  shash::Algorithms hash_algorithm = shash::kSha1;
668
  if (args.find('e') != args.end()) {
669
    hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
670
    if (hash_algorithm == shash::kAny) {
671
      PrintError("unknown hash algorithm");
672
      return 1;
673
    }
674
  }
675
  if (args.find('Z') != args.end()) {
676
    params.compression_alg =
677
        zlib::ParseCompressionAlgorithm(*args.find('Z')->second);
678
  }
679
680
  if (args.find('C') != args.end()) {
681
    params.trusted_certs = *args.find('C')->second;
682
  }
683
684
  if (args.find('E') != args.end()) params.enforce_limits = true;
685
  if (args.find('Q') != args.end()) {
686
    params.nested_kcatalog_limit = String2Uint64(*args.find('Q')->second);
687
  } else {
688
    params.nested_kcatalog_limit = SyncParameters::kDefaultNestedKcatalogLimit;
689
  }
690
  if (args.find('R') != args.end()) {
691
    params.root_kcatalog_limit = String2Uint64(*args.find('R')->second);
692
  } else {
693
    params.root_kcatalog_limit = SyncParameters::kDefaultRootKcatalogLimit;
694
  }
695
  if (args.find('U') != args.end()) {
696
    params.file_mbyte_limit = String2Uint64(*args.find('U')->second);
697
  } else {
698
    params.file_mbyte_limit = SyncParameters::kDefaultFileMbyteLimit;
699
  }
700
701
  if (args.find('v') != args.end()) {
702
    sanitizer::IntegerSanitizer sanitizer;
703
    if (!sanitizer.IsValid(*args.find('v')->second)) {
704
      PrintError("invalid revision number");
705
      return 1;
706
    }
707
    params.manual_revision = String2Uint64(*args.find('v')->second);
708
  }
709
710
  params.branched_catalog = args.find('B') != args.end();
711
712
  if (args.find('q') != args.end()) {
713
    params.max_concurrent_write_jobs = String2Uint64(*args.find('q')->second);
714
  }
715
716
  if (args.find('0') != args.end()) {
717
    params.num_upload_tasks = String2Uint64(*args.find('0')->second);
718
  }
719
720
  if (args.find('T') != args.end()) {
721
    params.ttl_seconds = String2Uint64(*args.find('T')->second);
722
  }
723
724
  if (args.find('g') != args.end()) {
725
    params.ignore_special_files = true;
726
  }
727
728
  if (args.find('P') != args.end()) {
729
    params.session_token_file = *args.find('P')->second;
730
  }
731
732
  if (args.find('H') != args.end()) {
733
    params.key_file = *args.find('H')->second;
734
  }
735
736
  if (args.find('D') != args.end()) {
737
    params.repo_tag.name_ = *args.find('D')->second;
738
  }
739
740
  if (args.find('G') != args.end()) {
741
    params.repo_tag.channel_ = *args.find('G')->second;
742
  }
743
744
  if (args.find('J') != args.end()) {
745
    params.repo_tag.description_ = *args.find('J')->second;
746
  }
747
748
  if (!CheckParams(params)) return 2;
749
  // This may fail, in which case a warning is printed and the process continues
750
  ObtainDacReadSearchCapability();
751
752
  perf::StatisticsTemplate publish_statistics("Publish", this->statistics());
753
754
  // Start spooler
755
  upload::SpoolerDefinition spooler_definition(
756
      params.spooler_definition, hash_algorithm, params.compression_alg,
757
      params.generate_legacy_bulk_chunks, params.use_file_chunking,
758
      params.min_file_chunk_size, params.avg_file_chunk_size,
759
      params.max_file_chunk_size, params.session_token_file, params.key_file);
760
  if (params.max_concurrent_write_jobs > 0) {
761
    spooler_definition.number_of_concurrent_uploads =
762
        params.max_concurrent_write_jobs;
763
  }
764
  spooler_definition.num_upload_tasks = params.num_upload_tasks;
765
766
  upload::SpoolerDefinition spooler_definition_catalogs(
767
      spooler_definition.Dup2DefaultCompression());
768
769
  params.spooler = upload::Spooler::Construct(spooler_definition,
770
                                              &publish_statistics);
771
  if (NULL == params.spooler) return 3;
772
  UniquePtr<upload::Spooler> spooler_catalogs(
773
      upload::Spooler::Construct(spooler_definition_catalogs));
774
  if (!spooler_catalogs.IsValid()) return 3;
775
776
  const bool follow_redirects = (args.count('L') > 0);
777
  if (!InitDownloadManager(follow_redirects)) {
778
    return 3;
779
  }
780
781
  if (!InitVerifyingSignatureManager(params.public_keys,
782
                                     params.trusted_certs)) {
783
    return 3;
784
  }
785
786
  bool with_gateway =
787
      spooler_definition.driver_type == upload::SpoolerDefinition::Gateway;
788
  /*
789
   * Note: If the upstream is of type gateway, due to the possibility of
790
   * concurrent
791
   *       release managers, it's possible to have a different local and remote
792
   * root
793
   *       hashes. We proceed by loading the remote manifest but we give an
794
   * empty base
795
   *       hash.
796
   */
797
  UniquePtr<manifest::Manifest> manifest;
798
  if (params.branched_catalog) {
799
    // Throw-away manifest
800
    manifest = new manifest::Manifest(shash::Any(), 0, "");
801
  } else if (params.virtual_dir_actions !=
802
             catalog::VirtualCatalog::kActionNone) {
803
    manifest = this->OpenLocalManifest(params.manifest_path);
804
    params.base_hash = manifest->catalog_hash();
805
  } else {
806
    if (with_gateway) {
807
      manifest =
808
          FetchRemoteManifest(params.stratum0, params.repo_name, shash::Any());
809
    } else {
810
      manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
811
                                     params.base_hash);
812
    }
813
  }
814
  if (!manifest) {
815
    return 3;
816
  }
817
818
  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
819
820
  catalog::WritableCatalogManager catalog_manager(
821
      params.base_hash, params.stratum0, params.dir_temp, spooler_catalogs,
822
      download_manager(), params.enforce_limits, params.nested_kcatalog_limit,
823
      params.root_kcatalog_limit, params.file_mbyte_limit, statistics(),
824
      params.is_balanced, params.max_weight, params.min_weight);
825
  catalog_manager.Init();
826
827
  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
828
829
  // Should be before the syncronization starts to avoid race of GetTTL with
830
  // other sqlite operations
831
  if ((params.ttl_seconds > 0) &&
832
      ((params.ttl_seconds != catalog_manager.GetTTL()) ||
833
       !catalog_manager.HasExplicitTTL())) {
834
    LogCvmfs(kLogCvmfs, kLogStdout, "Setting repository TTL to %" PRIu64 "s",
835
             params.ttl_seconds);
836
    catalog_manager.SetTTL(params.ttl_seconds);
837
  }
838
839
  // Either real catalogs or virtual catalog
840
  if (params.virtual_dir_actions == catalog::VirtualCatalog::kActionNone) {
841
    publish::SyncUnion *sync;
842
    if (params.union_fs_type == "overlayfs") {
843
      sync = new publish::SyncUnionOverlayfs(
844
          &mediator, params.dir_rdonly, params.dir_union, params.dir_scratch);
845
    } else if (params.union_fs_type == "aufs") {
846
      sync = new publish::SyncUnionAufs(&mediator, params.dir_rdonly,
847
                                        params.dir_union, params.dir_scratch);
848
    } else {
849
      LogCvmfs(kLogCvmfs, kLogStderr, "unknown union file system: %s",
850
               params.union_fs_type.c_str());
851
      return 3;
852
    }
853
854
    if (!sync->Initialize()) {
855
      LogCvmfs(kLogCvmfs, kLogStderr,
856
               "Initialization of the synchronisation "
857
               "engine failed");
858
      return 4;
859
    }
860
861
    sync->Traverse();
862
  } else {
863
    assert(!manifest->history().IsNull());
864
    catalog::VirtualCatalog virtual_catalog(
865
        manifest.weak_ref(), download_manager(), &catalog_manager, &params);
866
    virtual_catalog.Generate(params.virtual_dir_actions);
867
  }
868
869
  if (!params.authz_file.empty()) {
870
    LogCvmfs(kLogCvmfs, kLogDebug,
871
             "Adding contents of authz file %s to"
872
             " root catalog.",
873
             params.authz_file.c_str());
874
    int fd = open(params.authz_file.c_str(), O_RDONLY);
875
    if (fd == -1) {
876
      LogCvmfs(kLogCvmfs, kLogStderr,
877
               "Unable to open authz file (%s)"
878
               "from the publication process: %s",
879
               params.authz_file.c_str(), strerror(errno));
880
      return 7;
881
    }
882
883
    std::string new_authz;
884
    const bool read_successful = SafeReadToString(fd, &new_authz);
885
    close(fd);
886
887
    if (!read_successful) {
888
      LogCvmfs(kLogCvmfs, kLogStderr, "Failed to read authz file (%s): %s",
889
               params.authz_file.c_str(), strerror(errno));
890
      return 8;
891
    }
892
893
    catalog_manager.SetVOMSAuthz(new_authz);
894
  }
895
896
  if (!mediator.Commit(manifest.weak_ref())) {
897
    PrintError("something went wrong during sync");
898
    return 5;
899
  }
900
901
  // finalize the spooler
902
  LogCvmfs(kLogCvmfs, kLogStdout, "Wait for all uploads to finish");
903
  params.spooler->WaitForUpload();
904
  spooler_catalogs->WaitForUpload();
905
  params.spooler->FinalizeSession(false);
906
907
  LogCvmfs(kLogCvmfs, kLogStdout, "Exporting repository manifest");
908
909
  // We call FinalizeSession(true) this time, to also trigger the commit
910
  // operation on the gateway machine (if the upstream is of type "gw").
911
912
  // Get the path of the new root catalog
913
  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
914
915
  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
916
                                         params.repo_tag)) {
917
    PrintError("Failed to commit transaction.");
918
    return 9;
919
                                         }
920
  delete params.spooler;
921
922
  if (!manifest->Export(params.manifest_path)) {
923
    PrintError("Failed to create new repository");
924
    return 6;
925
  }
926
927
  return 0;
928
}