GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/swissknife_pull.cc Lines: 0 454 0.0 %
Date: 2019-02-03 02:48:13 Branches: 0 338 0.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 *
4
 * Replicates a cvmfs repository.  Uses the cvmfs intrinsic Merkle trees
5
 * to calculate the difference set.
6
 */
7
8
#define _FILE_OFFSET_BITS 64
9
#define __STDC_FORMAT_MACROS
10
11
#include "cvmfs_config.h"
12
#include "swissknife_pull.h"
13
14
#include <inttypes.h>
15
#include <pthread.h>
16
#include <sys/stat.h>
17
#include <unistd.h>
18
19
#include <cstdlib>
20
#include <cstring>
21
#include <string>
22
#include <vector>
23
24
#include "atomic.h"
25
#include "catalog.h"
26
#include "compression.h"
27
#include "download.h"
28
#include "hash.h"
29
#include "history_sqlite.h"
30
#include "logging.h"
31
#include "manifest.h"
32
#include "manifest_fetch.h"
33
#include "object_fetcher.h"
34
#include "path_filters/relaxed_path_filter.h"
35
#include "reflog.h"
36
#include "signature.h"
37
#include "smalloc.h"
38
#include "upload.h"
39
#include "util/posix.h"
40
#include "util/shared_ptr.h"
41
#include "util/string.h"
42
43
using namespace std;  // NOLINT
44
45
namespace swissknife {
46
47
namespace {
48
49
typedef HttpObjectFetcher<> ObjectFetcher;
50
51
/**
52
 * This just stores an shash::Any in a predictable way to send it through a
53
 * POSIX pipe.
54
 */
55
class ChunkJob {
56
 public:
57
  ChunkJob()
58
    : suffix(shash::kSuffixNone)
59
    , hash_algorithm(shash::kAny)
60
    , compression_alg(zlib::kZlibDefault) {}
61
62
  ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
63
    : suffix(hash.suffix)
64
    , hash_algorithm(hash.algorithm)
65
    , compression_alg(compression_alg)
66
  {
67
    memcpy(digest, hash.digest, hash.GetDigestSize());
68
  }
69
70
  bool IsTerminateJob() const {
71
    return (hash_algorithm == shash::kAny);
72
  }
73
74
  shash::Any hash() const {
75
    assert(!IsTerminateJob());
76
    return shash::Any(hash_algorithm,
77
                      digest,
78
                      suffix);
79
  }
80
81
  const shash::Suffix      suffix;
82
  const shash::Algorithms  hash_algorithm;
83
  const zlib::Algorithms   compression_alg;
84
  unsigned char            digest[shash::kMaxDigestSize];
85
};
86
87
static void SpoolerOnUpload(const upload::SpoolerResult &result) {
88
  unlink(result.local_path.c_str());
89
  if (result.return_code != 0) {
90
    LogCvmfs(kLogCvmfs, kLogStderr, "spooler failure %d (%s, hash: %s)",
91
             result.return_code,
92
             result.local_path.c_str(),
93
             result.content_hash.ToString().c_str());
94
    abort();
95
  }
96
}
97
98
SharedPtr<string>    stratum0_url;
99
SharedPtr<string>    stratum1_url;
100
SharedPtr<string>    temp_dir;
101
unsigned             num_parallel = 1;
102
bool                 pull_history = false;
103
bool                 apply_timestamp_threshold = false;
104
uint64_t             timestamp_threshold = 0;
105
bool                 is_garbage_collectable = false;
106
bool                 initial_snapshot = false;
107
upload::Spooler     *spooler = NULL;
108
int                  pipe_chunks[2];
109
// required for concurrent reading
110
pthread_mutex_t      lock_pipe = PTHREAD_MUTEX_INITIALIZER;
111
unsigned             retries = 3;
112
catalog::RelaxedPathFilter   *pathfilter = NULL;
113
atomic_int64         overall_chunks;
114
atomic_int64         overall_new;
115
atomic_int64         chunk_queue;
116
bool                 preload_cache = false;
117
string              *preload_cachedir = NULL;
118
bool                 inspect_existing_catalogs = false;
119
manifest::Reflog    *reflog = NULL;
120
121
}  // anonymous namespace
122
123
124
static std::string MakePath(const shash::Any &hash) {
125
  return (preload_cache)
126
    ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
127
    : "data/"           + hash.MakePath();
128
}
129
130
131
static bool Peek(const string &remote_path) {
132
  return (preload_cache) ? FileExists(remote_path)
133
                         : spooler->Peek(remote_path);
134
}
135
136
static bool Peek(const shash::Any &remote_hash) {
137
  return Peek(MakePath(remote_hash));
138
}
139
140
static void ReportDownloadError(const download::JobInfo &download_job) {
141
  const download::Failures error_code = download_job.error_code;
142
  const int http_code = download_job.http_code;
143
  const std::string url = *download_job.url;
144
145
  LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
146
           url.c_str(), error_code, download::Code2Ascii(error_code));
147
148
  switch (error_code) {
149
    case download::kFailProxyResolve:
150
    case download::kFailHostResolve:
151
      LogCvmfs(kLogCvmfs, kLogStderr, "DNS lookup for Stratum 0 failed - "
152
                                      "please check the network connection");
153
      break;
154
155
    case download::kFailHostHttp:
156
      LogCvmfs(kLogCvmfs, kLogStderr, "unexpected HTTP error code %d - "
157
               "please check the stratum 0 health", http_code);
158
      break;
159
160
    case download::kFailBadData:
161
      LogCvmfs(kLogCvmfs, kLogStderr, "downloaded corrupted data - "
162
                                      "please check the stratum 0 health");
163
      break;
164
165
    default:
166
      if (download::IsProxyTransferError(error_code) ||
167
          download::IsHostTransferError(error_code)) {
168
        LogCvmfs(kLogCvmfs, kLogStderr, "couldn't reach Stratum 0 - "
169
                                      "please check the network connection");
170
      } else {
171
        LogCvmfs(kLogCvmfs, kLogStderr, "unexpected error - feel free to file "
172
                                      "a bug report");
173
      }
174
  }
175
}
176
177
178
static void Store(
179
  const string &local_path,
180
  const string &remote_path,
181
  const bool compressed_src)
182
{
183
  if (preload_cache) {
184
    if (!compressed_src) {
185
      int retval = rename(local_path.c_str(), remote_path.c_str());
186
      if (retval != 0) {
187
        LogCvmfs(kLogCvmfs, kLogStderr, "Failed to move '%s' to '%s'",
188
                 local_path.c_str(), remote_path.c_str());
189
        abort();
190
      }
191
    } else {
192
      // compressed input
193
      string tmp_dest;
194
      FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
195
      if (fdest == NULL) {
196
        LogCvmfs(kLogCvmfs, kLogStderr, "Failed to create temporary file '%s'",
197
                 remote_path.c_str());
198
        abort();
199
      }
200
      int retval = zlib::DecompressPath2File(local_path, fdest);
201
      if (!retval) {
202
        LogCvmfs(kLogCvmfs, kLogStderr, "Failed to preload %s to %s",
203
                 local_path.c_str(), remote_path.c_str());
204
        abort();
205
      }
206
      fclose(fdest);
207
      retval = rename(tmp_dest.c_str(), remote_path.c_str());
208
      assert(retval == 0);
209
      unlink(local_path.c_str());
210
    }
211
  } else {
212
    spooler->Upload(local_path, remote_path);
213
  }
214
}
215
216
static void Store(
217
  const string &local_path,
218
  const shash::Any &remote_hash,
219
  const bool compressed_src = true)
220
{
221
  Store(local_path, MakePath(remote_hash), compressed_src);
222
}
223
224
225
static void StoreBuffer(const unsigned char *buffer, const unsigned size,
226
                        const std::string dest_path, const bool compress) {
227
  string tmp_file;
228
  FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
229
  assert(ftmp);
230
  int retval;
231
  if (compress) {
232
    shash::Any dummy(shash::kSha1);  // hardcoded hash no problem, unsused
233
    retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
234
  } else {
235
    retval = CopyMem2File(buffer, size, ftmp);
236
  }
237
  assert(retval);
238
  fclose(ftmp);
239
  Store(tmp_file, dest_path, true);
240
}
241
242
static void StoreBuffer(const unsigned char *buffer, const unsigned size,
243
                        const shash::Any &dest_hash, const bool compress) {
244
  StoreBuffer(buffer, size, MakePath(dest_hash), compress);
245
}
246
247
248
static void WaitForStorage() {
249
  if (!preload_cache) spooler->WaitForUpload();
250
}
251
252
253
struct MainWorkerContext {
254
  download::DownloadManager *download_manager;
255
};
256
257
static void *MainWorker(void *data) {
258
  MainWorkerContext *mwc = static_cast<MainWorkerContext*>(data);
259
  download::DownloadManager *download_manager = mwc->download_manager;
260
261
  while (1) {
262
    ChunkJob next_chunk;
263
    pthread_mutex_lock(&lock_pipe);
264
    ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
265
    pthread_mutex_unlock(&lock_pipe);
266
    if (next_chunk.IsTerminateJob())
267
      break;
268
269
    shash::Any chunk_hash = next_chunk.hash();
270
    zlib::Algorithms compression_alg = next_chunk.compression_alg;
271
    LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
272
             chunk_hash.ToString().c_str());
273
274
    if (!Peek(chunk_hash)) {
275
      string tmp_file;
276
      FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
277
                                    &tmp_file);
278
      assert(fchunk);
279
      string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
280
      download::JobInfo download_chunk(&url_chunk, false, false, fchunk,
281
                                       &chunk_hash);
282
283
      const download::Failures download_result =
284
                                       download_manager->Fetch(&download_chunk);
285
      if (download_result != download::kFailOk) {
286
        ReportDownloadError(download_chunk);
287
        abort();
288
      }
289
      fclose(fchunk);
290
      Store(tmp_file, chunk_hash,
291
            (compression_alg == zlib::kZlibDefault) ? true : false);
292
      atomic_inc64(&overall_new);
293
    }
294
    if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
295
      LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, ".");
296
    atomic_dec64(&chunk_queue);
297
  }
298
  return NULL;
299
}
300
301
302
bool CommandPull::PullRecursion(catalog::Catalog   *catalog,
303
                                const std::string  &path) {
304
  assert(catalog);
305
306
  // Previous catalogs
307
  if (pull_history) {
308
    shash::Any previous_catalog = catalog->GetPreviousRevision();
309
    if (previous_catalog.IsNull()) {
310
      LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history");
311
    } else {
312
      LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
313
               previous_catalog.ToString().c_str());
314
      bool retval = Pull(previous_catalog, path);
315
      if (!retval)
316
        return false;
317
    }
318
  }
319
320
  // Nested catalogs (in a nested code block because goto fail...)
321
  {
322
    const catalog::Catalog::NestedCatalogList nested_catalogs =
323
      catalog->ListOwnNestedCatalogs();
324
    for (catalog::Catalog::NestedCatalogList::const_iterator i =
325
         nested_catalogs.begin(), iEnd = nested_catalogs.end();
326
         i != iEnd; ++i)
327
    {
328
      LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
329
               i->mountpoint.c_str());
330
      bool retval = Pull(i->hash, i->mountpoint.ToString());
331
      if (!retval)
332
        return false;
333
    }
334
  }
335
336
  return true;
337
}
338
339
bool CommandPull::Pull(const shash::Any   &catalog_hash,
340
                       const std::string  &path) {
341
  int retval;
342
  download::Failures dl_retval;
343
  assert(shash::kSuffixCatalog == catalog_hash.suffix);
344
345
  // Check if the catalog already exists
346
  if (Peek(catalog_hash)) {
347
    // Preload: dirtab changed
348
    if (inspect_existing_catalogs) {
349
      if (!preload_cache) {
350
        LogCvmfs(kLogCvmfs, kLogStderr, "to be implemented: -t without -c");
351
        abort();
352
      }
353
      catalog::Catalog *catalog = catalog::Catalog::AttachFreely(
354
        path, MakePath(catalog_hash), catalog_hash);
355
      if (catalog == NULL) {
356
        LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
357
                 catalog_hash.ToString().c_str());
358
        return false;
359
      }
360
      bool retval = PullRecursion(catalog, path);
361
      delete catalog;
362
      return retval;
363
    }
364
365
    LogCvmfs(kLogCvmfs, kLogStdout, "  Catalog up to date");
366
    return true;
367
  }
368
369
  // Check if the catalog matches the pathfilter
370
  if (path != ""              &&  // necessary to load the root catalog
371
      pathfilter              &&
372
     !pathfilter->IsMatching(path)) {
373
    LogCvmfs(kLogCvmfs, kLogStdout, "  Catalog in '%s' does not match"
374
             " the path specification", path.c_str());
375
    return true;
376
  }
377
378
  int64_t gauge_chunks = atomic_read64(&overall_chunks);
379
  int64_t gauge_new = atomic_read64(&overall_new);
380
381
  // Download and uncompress catalog
382
  shash::Any chunk_hash;
383
  zlib::Algorithms compression_alg;
384
  catalog::Catalog *catalog = NULL;
385
  string file_catalog;
386
  string file_catalog_vanilla;
387
  FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
388
                                  &file_catalog);
389
  if (!fcatalog) {
390
    LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
391
    return false;
392
  }
393
  fclose(fcatalog);
394
  FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
395
                                          &file_catalog_vanilla);
396
  if (!fcatalog_vanilla) {
397
    LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
398
    unlink(file_catalog.c_str());
399
    return false;
400
  }
401
  const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
402
  download::JobInfo download_catalog(&url_catalog, false, false,
403
                                     fcatalog_vanilla, &catalog_hash);
404
  dl_retval = download_manager()->Fetch(&download_catalog);
405
  fclose(fcatalog_vanilla);
406
  if (dl_retval != download::kFailOk) {
407
    if (path == "" && is_garbage_collectable) {
408
      LogCvmfs(kLogCvmfs, kLogStdout, "skipping missing root catalog %s - "
409
                                      "probably sweeped by garbage collection",
410
               catalog_hash.ToString().c_str());
411
      goto pull_skip;
412
    } else {
413
      ReportDownloadError(download_catalog);
414
      goto pull_cleanup;
415
    }
416
  }
417
  retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
418
  if (!retval) {
419
    LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
420
             file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
421
    goto pull_cleanup;
422
  }
423
  if (path.empty() && reflog != NULL) {
424
    if (!reflog->AddCatalog(catalog_hash)) {
425
      LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
426
      goto pull_cleanup;
427
    }
428
  }
429
430
  catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
431
  if (catalog == NULL) {
432
    LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
433
             catalog_hash.ToString().c_str());
434
    goto pull_cleanup;
435
  }
436
437
  // Always pull the HEAD root catalog and nested catalogs
438
  if (apply_timestamp_threshold && (path == "") &&
439
      (catalog->GetLastModified() < timestamp_threshold))
440
  {
441
    LogCvmfs(kLogCvmfs, kLogStdout,
442
             "  Pruning at root catalog from %s due to threshold at %s",
443
             StringifyTime(catalog->GetLastModified(), false).c_str(),
444
             StringifyTime(timestamp_threshold, false).c_str());
445
    delete catalog;
446
    goto pull_skip;
447
  }
448
  apply_timestamp_threshold = true;
449
450
  // Traverse the chunks
451
  LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak,
452
           "  Processing chunks [%" PRIu64 " registered chunks]: ",
453
           catalog->GetNumChunks());
454
  retval = catalog->AllChunksBegin();
455
  if (!retval) {
456
    LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
457
    goto pull_cleanup;
458
  }
459
  while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
460
    ChunkJob next_chunk(chunk_hash, compression_alg);
461
    WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
462
    atomic_inc64(&chunk_queue);
463
  }
464
  catalog->AllChunksEnd();
465
  while (atomic_read64(&chunk_queue) != 0) {
466
    SafeSleepMs(100);
467
  }
468
  LogCvmfs(kLogCvmfs, kLogStdout, " fetched %" PRId64 " new chunks out of "
469
           "%" PRId64 " unique chunks",
470
           atomic_read64(&overall_new)-gauge_new,
471
           atomic_read64(&overall_chunks)-gauge_chunks);
472
473
  retval = PullRecursion(catalog, path);
474
475
  delete catalog;
476
  unlink(file_catalog.c_str());
477
  WaitForStorage();
478
  Store(file_catalog_vanilla, catalog_hash);
479
  return true;
480
481
 pull_cleanup:
482
  delete catalog;
483
  unlink(file_catalog.c_str());
484
  unlink(file_catalog_vanilla.c_str());
485
  return false;
486
487
 pull_skip:
488
  unlink(file_catalog.c_str());
489
  unlink(file_catalog_vanilla.c_str());
490
  return true;
491
}
492
493
494
int swissknife::CommandPull::Main(const swissknife::ArgumentList &args) {
495
  int retval;
496
  manifest::Failures m_retval;
497
  download::Failures dl_retval;
498
  unsigned timeout = 60;
499
  int fd_lockfile = -1;
500
  string spooler_definition_str;
501
  manifest::ManifestEnsemble ensemble;
502
  shash::Any meta_info_hash;
503
  string meta_info;
504
505
  // Option parsing
506
  if (args.find('c') != args.end())
507
    preload_cache = true;
508
  if (args.find('l') != args.end()) {
509
    unsigned log_level =
510
    1 << (kLogLevel0 + String2Uint64(*args.find('l')->second));
511
    if (log_level > kLogNone) {
512
      LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
513
      return 1;
514
    }
515
    SetLogVerbosity(static_cast<LogLevels>(log_level));
516
  }
517
  stratum0_url = args.find('u')->second;
518
  temp_dir = args.find('x')->second;
519
  if (preload_cache) {
520
    preload_cachedir = new string(*args.find('r')->second);
521
  } else {
522
    spooler_definition_str = *args.find('r')->second;
523
  }
524
  string master_keys = *args.find('k')->second;
525
  if (DirectoryExists(master_keys))
526
    master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
527
  const string repository_name = *args.find('m')->second;
528
  string trusted_certs;
529
  if (args.find('y') != args.end())
530
    trusted_certs = *args.find('y')->second;
531
  if (args.find('n') != args.end())
532
    num_parallel = String2Uint64(*args.find('n')->second);
533
  if (args.find('t') != args.end())
534
    timeout = String2Uint64(*args.find('t')->second);
535
  if (args.find('a') != args.end())
536
    retries = String2Uint64(*args.find('a')->second);
537
  if (args.find('d') != args.end()) {
538
    pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
539
    assert(pathfilter->IsValid());
540
  }
541
  if (args.find('p') != args.end())
542
    pull_history = true;
543
  if (args.find('z') != args.end())
544
    inspect_existing_catalogs = true;
545
  if (args.find('w') != args.end())
546
    stratum1_url = args.find('w')->second;
547
  if (args.find('i') != args.end())
548
    initial_snapshot = true;
549
  shash::Any reflog_hash;
550
  string reflog_chksum_path;
551
  if (args.find('R') != args.end()) {
552
    reflog_chksum_path = *args.find('R')->second;
553
    if (!initial_snapshot) {
554
      if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
555
        LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
556
        return 1;
557
      }
558
    }
559
  }
560
  if (args.find('Z') != args.end()) {
561
    timestamp_threshold = String2Int64(*args.find('Z')->second);
562
  }
563
564
  if (!preload_cache && stratum1_url == NULL) {
565
    LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
566
    return 1;
567
  }
568
569
  typedef std::vector<history::History::Tag> TagVector;
570
  TagVector historic_tags;
571
572
  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
573
           stratum0_url->c_str());
574
575
  int result = 1;
576
577
  // Initialization
578
  atomic_init64(&overall_chunks);
579
  atomic_init64(&overall_new);
580
  atomic_init64(&chunk_queue);
581
582
  const bool     follow_redirects = false;
583
  const unsigned max_pool_handles = num_parallel+1;
584
585
  if (!this->InitDownloadManager(follow_redirects, max_pool_handles)) {
586
    return 1;
587
  }
588
589
  if (!this->InitVerifyingSignatureManager(master_keys, trusted_certs)) {
590
    LogCvmfs(kLogCvmfs, kLogStderr, "failed to initalize CVMFS signatures");
591
    return 1;
592
  } else {
593
    LogCvmfs(kLogCvmfs, kLogStdout,
594
             "CernVM-FS: using public key(s) %s",
595
             JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
596
    if (!trusted_certs.empty()) {
597
      LogCvmfs(kLogCvmfs, kLogStdout,
598
               "CernVM-FS: using trusted certificates in %s",
599
               JoinStrings(SplitString(trusted_certs, ':'), ", ").c_str());
600
    }
601
  }
602
603
  unsigned current_group;
604
  vector< vector<download::DownloadManager::ProxyInfo> > proxies;
605
  download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
606
  if (proxies.size() > 0) {
607
    string proxy_str = "\nWarning, replicating through proxies\n";
608
    proxy_str += "  Load-balance groups:\n";
609
    for (unsigned i = 0; i < proxies.size(); ++i) {
610
      vector<string> urls;
611
      for (unsigned j = 0; j < proxies[i].size(); ++j) {
612
        urls.push_back(proxies[i][j].url);
613
      }
614
      proxy_str +=
615
        "  [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") + "\n";
616
    }
617
    proxy_str += "  Active proxy: [" + StringifyInt(current_group) + "] " +
618
                 proxies[current_group][0].url;
619
    LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
620
  }
621
  download_manager()->SetTimeout(timeout, timeout);
622
  download_manager()->SetRetryParameters(retries, 500, 2000);
623
  download_manager()->Spawn();
624
625
  // init the download helper
626
  ObjectFetcher object_fetcher(repository_name,
627
                               *stratum0_url,
628
                               *temp_dir,
629
                               download_manager(),
630
                               signature_manager());
631
632
  pthread_t *workers =
633
    reinterpret_cast<pthread_t *>(smalloc(sizeof(pthread_t) * num_parallel));
634
635
  // Check if we have a replica-ready server
636
  const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
637
  download::JobInfo download_sentinel(&url_sentinel, false);
638
  retval = download_manager()->Fetch(&download_sentinel);
639
  if (retval != download::kFailOk) {
640
    if (download_sentinel.http_code == 404) {
641
      LogCvmfs(kLogCvmfs, kLogStderr,
642
               "This is not a CernVM-FS server for replication");
643
    } else {
644
      LogCvmfs(kLogCvmfs, kLogStderr,
645
               "Failed to contact stratum 0 server (%d - %s)",
646
               retval, download::Code2Ascii(download_sentinel.error_code));
647
    }
648
    goto fini;
649
  }
650
651
  m_retval = FetchRemoteManifestEnsemble(*stratum0_url,
652
                                          repository_name,
653
                                          &ensemble);
654
  if (m_retval != manifest::kFailOk) {
655
    LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
656
             m_retval, manifest::Code2Ascii(m_retval));
657
    goto fini;
658
  }
659
660
  // Get meta info
661
  meta_info_hash = ensemble.manifest->meta_info();
662
  if (!meta_info_hash.IsNull()) {
663
    meta_info_hash = ensemble.manifest->meta_info();
664
    const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
665
    download::JobInfo download_metainfo(&url, true, false, &meta_info_hash);
666
    dl_retval = download_manager()->Fetch(&download_metainfo);
667
    if (dl_retval != download::kFailOk) {
668
      LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
669
               dl_retval, download::Code2Ascii(dl_retval));
670
      goto fini;
671
    }
672
    meta_info = string(download_metainfo.destination_mem.data,
673
                       download_metainfo.destination_mem.pos);
674
  }
675
676
  is_garbage_collectable = ensemble.manifest->garbage_collectable();
677
678
  // Manifest available, now the spooler's hash algorithm can be determined
679
  // That doesn't actually matter because the replication does no re-hashing
680
  if (!preload_cache) {
681
    const upload::SpoolerDefinition
682
      spooler_definition(spooler_definition_str,
683
                         ensemble.manifest->GetHashAlgorithm());
684
    spooler = upload::Spooler::Construct(spooler_definition);
685
    assert(spooler);
686
    spooler->RegisterListener(&SpoolerOnUpload);
687
  }
688
689
  // Open the reflog for modification
690
  if (!preload_cache) {
691
    if (initial_snapshot) {
692
      LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
693
                                      repository_name.c_str());
694
      reflog = CreateEmptyReflog(*temp_dir, repository_name);
695
      if (reflog == NULL) {
696
        LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
697
        goto fini;
698
      }
699
    } else {
700
      ObjectFetcher object_fetcher_stratum1(repository_name,
701
                                            *stratum1_url,
702
                                            *temp_dir,
703
                                            download_manager(),
704
                                            signature_manager());
705
706
      if (!reflog_hash.IsNull()) {
707
        reflog =
708
          FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash);
709
        assert(reflog != NULL);
710
      } else {
711
        LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
712
        if (spooler->Peek("/.cvmfsreflog")) {
713
          LogCvmfs(kLogCvmfs, kLogStderr,
714
                   "no reflog hash specified but reflog is present");
715
          goto fini;
716
        }
717
      }
718
    }
719
720
    if (reflog != NULL) {
721
      reflog->BeginTransaction();
722
      // On commit: use manifest's hash algorithm
723
      reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
724
    }
725
  }
726
727
  // Fetch tag list.
728
  // If we are just preloading the cache it is not strictly necessarily to
729
  // download the entire tag list
730
  // TODO(molina): add user option to download tags when preloading the cache
731
  if (!ensemble.manifest->history().IsNull() && !preload_cache) {
732
    shash::Any history_hash = ensemble.manifest->history();
733
    const string history_url = *stratum0_url + "/data/"
734
                                             + history_hash.MakePath();
735
    const string history_path = *temp_dir + "/" + history_hash.ToString();
736
    download::JobInfo download_history(&history_url, false, false,
737
                                       &history_path,
738
                                       &history_hash);
739
    dl_retval = download_manager()->Fetch(&download_history);
740
    if (dl_retval != download::kFailOk) {
741
      ReportDownloadError(download_history);
742
      goto fini;
743
    }
744
    const std::string history_db_path = history_path + ".uncompressed";
745
    retval = zlib::DecompressPath2Path(history_path, history_db_path);
746
    assert(retval);
747
    history::History *tag_db = history::SqliteHistory::Open(history_db_path);
748
    if (NULL == tag_db) {
749
      LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
750
               history_db_path.c_str());
751
      unlink(history_db_path.c_str());
752
      goto fini;
753
    }
754
    retval = tag_db->List(&historic_tags);
755
    delete tag_db;
756
    unlink(history_db_path.c_str());
757
    if (!retval) {
758
      LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
759
               history_db_path.c_str());
760
      goto fini;
761
    }
762
763
    LogCvmfs(kLogCvmfs, kLogStdout, "Found %u named snapshots",
764
             historic_tags.size());
765
    // TODO(jblomer): We should repliacte the previous history dbs, too,
766
    // in order to avoid races on fail-over between non-synchronized stratum 1s
767
    LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
768
    Store(history_path, history_hash);
769
    WaitForStorage();
770
    unlink(history_path.c_str());
771
    if (reflog != NULL && !reflog->AddHistory(history_hash)) {
772
      LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
773
      goto fini;
774
    }
775
  }
776
777
  // Starting threads
778
  MakePipe(pipe_chunks);
779
  LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
780
  MainWorkerContext mwc;
781
  mwc.download_manager = download_manager();
782
  for (unsigned i = 0; i < num_parallel; ++i) {
783
    int retval = pthread_create(&workers[i], NULL, MainWorker,
784
                                static_cast<void*>(&mwc));
785
    assert(retval == 0);
786
  }
787
788
  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
789
  retval = Pull(ensemble.manifest->catalog_hash(), "");
790
  pull_history = false;
791
  if (!historic_tags.empty()) {
792
    LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
793
  }
794
  for (TagVector::const_iterator i    = historic_tags.begin(),
795
                                 iend = historic_tags.end();
796
       i != iend; ++i)
797
  {
798
    if (Peek(i->root_hash))
799
      continue;
800
    LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
801
             i->name.c_str());
802
    apply_timestamp_threshold = false;
803
    bool retval2 = Pull(i->root_hash, "");
804
    retval = retval && retval2;
805
  }
806
807
  // Stopping threads
808
  LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
809
  for (unsigned i = 0; i < num_parallel; ++i) {
810
    ChunkJob terminate_workers;
811
    WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
812
  }
813
  for (unsigned i = 0; i < num_parallel; ++i) {
814
    int retval = pthread_join(workers[i], NULL);
815
    assert(retval == 0);
816
  }
817
  ClosePipe(pipe_chunks);
818
819
  if (!retval)
820
    goto fini;
821
822
  // Upload manifest ensemble
823
  {
824
    LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
825
    WaitForStorage();
826
827
    if (!Peek(ensemble.manifest->certificate())) {
828
      StoreBuffer(ensemble.cert_buf,
829
                  ensemble.cert_size,
830
                  ensemble.manifest->certificate(), true);
831
    }
832
    if (reflog != NULL &&
833
        !reflog->AddCertificate(ensemble.manifest->certificate())) {
834
      LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
835
      goto fini;
836
    }
837
    if (!meta_info_hash.IsNull()) {
838
      const unsigned char *info = reinterpret_cast<const unsigned char *>(
839
        meta_info.data());
840
      StoreBuffer(info, meta_info.size(), meta_info_hash, true);
841
      if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
842
        LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
843
        goto fini;
844
      }
845
    }
846
847
    // Create alternative bootstrapping symlinks for VOMS secured repos
848
    if (ensemble.manifest->has_alt_catalog_path()) {
849
      const bool success =
850
        spooler->PlaceBootstrappingShortcut(ensemble.manifest->certificate()) &&
851
        spooler->PlaceBootstrappingShortcut(ensemble.manifest->catalog_hash())
852
          && (ensemble.manifest->history().IsNull() ||
853
            spooler->PlaceBootstrappingShortcut(ensemble.manifest->history()))
854
          && (meta_info_hash.IsNull() ||
855
            spooler->PlaceBootstrappingShortcut(meta_info_hash));
856
857
      if (!success) {
858
        LogCvmfs(kLogCvmfs, kLogStderr,
859
                 "failed to place root catalog bootstrapping symlinks");
860
        return 1;
861
      }
862
    }
863
864
    // upload Reflog database
865
    if (!preload_cache && reflog != NULL) {
866
      reflog->CommitTransaction();
867
      reflog->DropDatabaseFileOwnership();
868
      string reflog_path = reflog->database_file();
869
      delete reflog;
870
      manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
871
      WaitForStorage();  // Reduce the duration of reflog /wo checksum
872
      spooler->UploadReflog(reflog_path);
873
      spooler->WaitForUpload();
874
      unlink(reflog_path.c_str());
875
      if (spooler->GetNumberOfErrors()) {
876
        LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
877
                 spooler->GetNumberOfErrors());
878
        goto fini;
879
      }
880
      assert(!reflog_chksum_path.empty());
881
      manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
882
    }
883
884
    if (preload_cache) {
885
      bool retval = ensemble.manifest->ExportChecksum(*preload_cachedir, 0660);
886
      assert(retval);
887
    } else {
888
      // pkcs#7 structure contains content + certificate + signature
889
      // So there is no race with whitelist and pkcs7 signature being out of
890
      // sync
891
      if (ensemble.whitelist_pkcs7_buf) {
892
        StoreBuffer(ensemble.whitelist_pkcs7_buf, ensemble.whitelist_pkcs7_size,
893
                    ".cvmfswhitelist.pkcs7", false);
894
      }
895
      StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
896
                  ".cvmfswhitelist", false);
897
      StoreBuffer(ensemble.raw_manifest_buf, ensemble.raw_manifest_size,
898
                  ".cvmfspublished", false);
899
    }
900
    LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %u",
901
             ensemble.manifest->revision());
902
  }
903
904
  WaitForStorage();
905
  LogCvmfs(kLogCvmfs, kLogStdout, "Fetched %" PRId64 " new chunks out of %"
906
           PRId64 " processed chunks",
907
           atomic_read64(&overall_new), atomic_read64(&overall_chunks));
908
  result = 0;
909
910
 fini:
911
  if (fd_lockfile >= 0)
912
    UnlockFile(fd_lockfile);
913
  free(workers);
914
  delete spooler;
915
  delete pathfilter;
916
  return result;
917
}
918
919
}  // namespace swissknife