CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_pull.cc
Go to the documentation of this file.
1 
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10 // NOLINTNEXTLINE
11 #define __STDC_FORMAT_MACROS
12 
13 
14 #include "swissknife_pull.h"
15 
16 #include <inttypes.h>
17 #include <pthread.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25 
26 #include "catalog.h"
28 #include "crypto/hash.h"
29 #include "crypto/signature.h"
30 #include "history_sqlite.h"
31 #include "manifest.h"
32 #include "manifest_fetch.h"
33 #include "network/download.h"
34 #include "object_fetcher.h"
36 #include "reflog.h"
37 #include "upload.h"
38 #include "util/atomic.h"
39 #include "util/concurrency.h"
40 #include "util/exception.h"
41 #include "util/logging.h"
42 #include "util/posix.h"
43 #include "util/shared_ptr.h"
44 #include "util/smalloc.h"
45 #include "util/string.h"
46 
47 using namespace std; // NOLINT
48 
49 namespace swissknife {
50 
51 namespace {
52 
54 
59 class ChunkJob {
60  public:
62  : suffix(shash::kSuffixNone)
63  , hash_algorithm(shash::kAny)
64  , compression_alg(zlib::kZlibDefault) { }
65 
66  ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
67  : suffix(hash.suffix)
68  , hash_algorithm(hash.algorithm)
69  , compression_alg(compression_alg) {
70  memcpy(digest, hash.digest, hash.GetDigestSize());
71  }
72 
73  bool IsTerminateJob() const { return (hash_algorithm == shash::kAny); }
74 
75  shash::Any hash() const {
76  assert(!IsTerminateJob());
77  return shash::Any(hash_algorithm, digest, suffix);
78  }
79 
83  unsigned char digest[shash::kMaxDigestSize];
84 };
85 
86 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
87  unlink(result.local_path.c_str());
88  if (result.return_code != 0) {
89  PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
90  result.local_path.c_str(), result.content_hash.ToString().c_str());
91  }
92 }
93 
97 unsigned num_parallel = 1;
98 bool pull_history = false;
100 uint64_t timestamp_threshold = 0;
102 bool initial_snapshot = false;
103 upload::Spooler *spooler = NULL;
104 int pipe_chunks[2];
105 // required for concurrent reading
106 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
107 unsigned retries = 3;
112 bool preload_cache = false;
113 string *preload_cachedir = NULL;
116 
117 } // anonymous namespace
118 
119 
120 static std::string MakePath(const shash::Any &hash) {
121  return (preload_cache)
122  ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
123  : "data/" + hash.MakePath();
124 }
125 
126 
127 static bool Peek(const string &remote_path) {
128  return (preload_cache) ? FileExists(remote_path) : spooler->Peek(remote_path);
129 }
130 
131 static bool Peek(const shash::Any &remote_hash) {
132  return Peek(MakePath(remote_hash));
133 }
134 
135 static void ReportDownloadError(const download::JobInfo &download_job) {
136  const download::Failures error_code = download_job.error_code();
137  const int http_code = download_job.http_code();
138  const std::string url = *download_job.url();
139 
140  LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
141  url.c_str(), error_code, download::Code2Ascii(error_code));
142 
143  switch (error_code) {
147  "DNS lookup for Stratum 0 failed - "
148  "please check the network connection");
149  break;
150 
153  "unexpected HTTP error code %d - "
154  "please check the stratum 0 health",
155  http_code);
156  break;
157 
160  "downloaded corrupted data - "
161  "please check the stratum 0 health");
162  break;
163 
164  default:
165  if (download::IsProxyTransferError(error_code)
166  || download::IsHostTransferError(error_code)) {
168  "couldn't reach Stratum 0 - "
169  "please check the network connection");
170  } else {
172  "unexpected error - feel free to file "
173  "a bug report");
174  }
175  }
176 }
177 
178 
179 static void Store(const string &local_path,
180  const string &remote_path,
181  const bool compressed_src) {
182  if (preload_cache) {
183  if (!compressed_src) {
184  int retval = rename(local_path.c_str(), remote_path.c_str());
185  if (retval != 0) {
186  PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
187  remote_path.c_str());
188  }
189  } else {
190  // compressed input
191  string tmp_dest;
192  FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
193  if (fdest == NULL) {
194  PANIC(kLogStderr, "Failed to create temporary file '%s'",
195  remote_path.c_str());
196  }
197  int retval = zlib::DecompressPath2File(local_path, fdest);
198  if (!retval) {
199  PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
200  remote_path.c_str());
201  }
202  fclose(fdest);
203  retval = rename(tmp_dest.c_str(), remote_path.c_str());
204  assert(retval == 0);
205  unlink(local_path.c_str());
206  }
207  } else {
208  spooler->Upload(local_path, remote_path);
209  }
210 }
211 
212 static void Store(const string &local_path,
213  const shash::Any &remote_hash,
214  const bool compressed_src = true) {
215  Store(local_path, MakePath(remote_hash), compressed_src);
216 }
217 
218 
219 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
220  const std::string &dest_path, const bool compress) {
221  string tmp_file;
222  FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
223  assert(ftmp);
224  int retval;
225  if (compress) {
226  shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused
227  retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
228  } else {
229  retval = CopyMem2File(buffer, size, ftmp);
230  }
231  assert(retval);
232  fclose(ftmp);
233  Store(tmp_file, dest_path, true);
234 }
235 
236 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
237  const shash::Any &dest_hash, const bool compress) {
238  StoreBuffer(buffer, size, MakePath(dest_hash), compress);
239 }
240 
241 
242 static void WaitForStorage() {
243  if (!preload_cache)
244  spooler->WaitForUpload();
245 }
246 
247 
250 };
251 
252 static void *MainWorker(void *data) {
253  MainWorkerContext *mwc = static_cast<MainWorkerContext *>(data);
254  download::DownloadManager *download_manager = mwc->download_manager;
255 
256  while (1) {
257  ChunkJob next_chunk;
258  {
260  ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
261  }
262  if (next_chunk.IsTerminateJob())
263  break;
264 
265  shash::Any chunk_hash = next_chunk.hash();
266  zlib::Algorithms compression_alg = next_chunk.compression_alg;
267  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
268  chunk_hash.ToString().c_str());
269 
270  if (!Peek(chunk_hash)) {
271  string tmp_file;
272  FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
273  string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
274  cvmfs::FileSink filesink(fchunk);
275  download::JobInfo download_chunk(&url_chunk, false, false, &chunk_hash,
276  &filesink);
277 
278  const download::Failures download_result = download_manager->Fetch(
279  &download_chunk);
280  if (download_result != download::kFailOk) {
281  ReportDownloadError(download_chunk);
282  PANIC(kLogStderr, "Download error");
283  }
284  fclose(fchunk);
285  Store(tmp_file, chunk_hash,
286  (compression_alg == zlib::kZlibDefault) ? true : false);
287  atomic_inc64(&overall_new);
288  }
289  if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
291  atomic_dec64(&chunk_queue);
292  }
293  return NULL;
294 }
295 
296 
297 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
298  const std::string &path) {
299  assert(catalog);
300 
301  // Previous catalogs
302  if (pull_history) {
303  shash::Any previous_catalog = catalog->GetPreviousRevision();
304  if (previous_catalog.IsNull()) {
305  LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history");
306  } else {
307  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
308  previous_catalog.ToString().c_str());
309  bool retval = Pull(previous_catalog, path);
310  if (!retval)
311  return false;
312  }
313  }
314 
315  // Nested catalogs (in a nested code block because goto fail...)
316  {
318  nested_catalogs = catalog->ListOwnNestedCatalogs();
319  for (catalog::Catalog::NestedCatalogList::const_iterator
320  i = nested_catalogs.begin(),
321  iEnd = nested_catalogs.end();
322  i != iEnd; ++i) {
323  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
324  i->mountpoint.c_str());
325  bool retval = Pull(i->hash, i->mountpoint.ToString());
326  if (!retval)
327  return false;
328  }
329  }
330 
331  return true;
332 }
333 
334 bool CommandPull::Pull(const shash::Any &catalog_hash,
335  const std::string &path) {
336  int retval;
337  download::Failures dl_retval;
338  assert(shash::kSuffixCatalog == catalog_hash.suffix);
339 
340  // Check if the catalog already exists
341  if (Peek(catalog_hash)) {
342  // Preload: dirtab changed
344  if (!preload_cache) {
345  PANIC(kLogStderr, "to be implemented: -t without -c");
346  }
348  path, MakePath(catalog_hash), catalog_hash);
349  if (catalog == NULL) {
350  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
351  catalog_hash.ToString().c_str());
352  return false;
353  }
354  bool retval = PullRecursion(catalog, path);
355  delete catalog;
356  return retval;
357  }
358 
359  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
360  return true;
361  }
362 
363  // Check if the catalog matches the pathfilter
364  if (path != "" && // necessary to load the root catalog
365  pathfilter && !pathfilter->IsMatching(path)) {
367  " Catalog in '%s' does not match"
368  " the path specification",
369  path.c_str());
370  return true;
371  }
372 
373  int64_t gauge_chunks = atomic_read64(&overall_chunks);
374  int64_t gauge_new = atomic_read64(&overall_new);
375 
376  // Download and uncompress catalog
377  shash::Any chunk_hash;
378  zlib::Algorithms compression_alg;
379  catalog::Catalog *catalog = NULL;
380  string file_catalog;
381  string file_catalog_vanilla;
382  FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
383  &file_catalog);
384  if (!fcatalog) {
385  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
386  return false;
387  }
388  fclose(fcatalog);
389  FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
390  &file_catalog_vanilla);
391  if (!fcatalog_vanilla) {
392  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
393  unlink(file_catalog.c_str());
394  return false;
395  }
396  const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
397  cvmfs::FileSink filesink(fcatalog_vanilla);
398  download::JobInfo download_catalog(&url_catalog, false, false, &catalog_hash,
399  &filesink);
400  dl_retval = download_manager()->Fetch(&download_catalog);
401  fclose(fcatalog_vanilla);
402  if (dl_retval != download::kFailOk) {
403  if (path == "" && is_garbage_collectable) {
405  "skipping missing root catalog %s - "
406  "probably sweeped by garbage collection",
407  catalog_hash.ToString().c_str());
408  goto pull_skip;
409  } else {
410  ReportDownloadError(download_catalog);
411  goto pull_cleanup;
412  }
413  }
414  retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
415  if (!retval) {
416  LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
417  file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
418  goto pull_cleanup;
419  }
420  if (path.empty() && reflog != NULL) {
421  if (!reflog->AddCatalog(catalog_hash)) {
422  LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
423  goto pull_cleanup;
424  }
425  }
426 
427  catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
428  if (catalog == NULL) {
429  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
430  catalog_hash.ToString().c_str());
431  goto pull_cleanup;
432  }
433 
434  // Always pull the HEAD root catalog and nested catalogs
435  if (apply_timestamp_threshold && (path == "")
436  && (catalog->GetLastModified() < timestamp_threshold)) {
438  " Pruning at root catalog from %s due to threshold at %s",
439  StringifyTime(catalog->GetLastModified(), false).c_str(),
440  StringifyTime(timestamp_threshold, false).c_str());
441  delete catalog;
442  goto pull_skip;
443  }
445 
446  // Traverse the chunks
448  " Processing chunks [%" PRIu64 " registered chunks]: ",
449  catalog->GetNumChunks());
450  retval = catalog->AllChunksBegin();
451  if (!retval) {
452  LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
453  goto pull_cleanup;
454  }
455  while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
456  ChunkJob next_chunk(chunk_hash, compression_alg);
457  WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
458  atomic_inc64(&chunk_queue);
459  }
460  catalog->AllChunksEnd();
461  while (atomic_read64(&chunk_queue) != 0) {
462  SafeSleepMs(100);
463  }
465  " fetched %" PRId64 " new chunks out of "
466  "%" PRId64 " unique chunks",
467  atomic_read64(&overall_new) - gauge_new,
468  atomic_read64(&overall_chunks) - gauge_chunks);
469 
470  retval = PullRecursion(catalog, path);
471 
472  delete catalog;
473  unlink(file_catalog.c_str());
474  WaitForStorage();
475  if (!retval)
476  return false;
477  Store(file_catalog_vanilla, catalog_hash);
478  return true;
479 
480 pull_cleanup:
481  delete catalog;
482  unlink(file_catalog.c_str());
483  unlink(file_catalog_vanilla.c_str());
484  return false;
485 
486 pull_skip:
487  unlink(file_catalog.c_str());
488  unlink(file_catalog_vanilla.c_str());
489  return true;
490 }
491 
492 
494  int retval;
495  manifest::Failures m_retval;
496  download::Failures dl_retval;
497  unsigned timeout = 60;
498  int fd_lockfile = -1;
499  string spooler_definition_str;
501  shash::Any meta_info_hash;
502  string meta_info;
503 
504  // Option parsing
505  if (args.find('c') != args.end())
506  preload_cache = true;
507  if (args.find('l') != args.end()) {
508  unsigned log_level = kLogLevel0 << String2Uint64(*args.find('l')->second);
509  if (log_level > kLogNone) {
510  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
511  return 1;
512  }
513  SetLogVerbosity(static_cast<LogLevels>(log_level));
514  }
515  stratum0_url = args.find('u')->second;
516  temp_dir = args.find('x')->second;
517  if (preload_cache) {
518  preload_cachedir = new string(*args.find('r')->second);
519  } else {
520  spooler_definition_str = *args.find('r')->second;
521  }
522  string master_keys = *args.find('k')->second;
523  if (DirectoryExists(master_keys))
524  master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
525  const string repository_name = *args.find('m')->second;
526  if (args.find('n') != args.end())
527  num_parallel = String2Uint64(*args.find('n')->second);
528  if (args.find('t') != args.end())
529  timeout = String2Uint64(*args.find('t')->second);
530  if (args.find('a') != args.end())
531  retries = String2Uint64(*args.find('a')->second);
532  if (args.find('d') != args.end()) {
533  pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
535  }
536  if (args.find('p') != args.end())
537  pull_history = true;
538  if (args.find('z') != args.end())
540  if (args.find('w') != args.end())
541  stratum1_url = args.find('w')->second;
542  if (args.find('i') != args.end())
543  initial_snapshot = true;
544  shash::Any reflog_hash;
545  string reflog_chksum_path;
546  if (args.find('R') != args.end()) {
547  reflog_chksum_path = *args.find('R')->second;
548  if (!initial_snapshot) {
549  if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
550  LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
551  return 1;
552  }
553  }
554  }
555  if (args.find('Z') != args.end()) {
556  timestamp_threshold = String2Int64(*args.find('Z')->second);
557  }
558 
559  if (!preload_cache && stratum1_url.Get() == NULL) {
560  LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
561  return 1;
562  }
563 
564  typedef std::vector<history::History::Tag> TagVector;
565  TagVector historic_tags;
566 
567  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
568  stratum0_url->c_str());
569 
570  int result = 1;
571 
572  // Initialization
573  atomic_init64(&overall_chunks);
574  atomic_init64(&overall_new);
575  atomic_init64(&chunk_queue);
576 
577  const bool follow_redirects = false;
578  const unsigned max_pool_handles = num_parallel + 1;
579  const string proxy = (args.find('@') != args.end()) ? *args.find('@')->second
580  : "";
581 
582  if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
583  return 1;
584  }
585 
586  if (!this->InitSignatureManager(master_keys)) {
587  LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures");
588  return 1;
589  } else {
590  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: using public key(s) %s",
591  JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
592  }
593 
594  unsigned current_group;
595  vector<vector<download::DownloadManager::ProxyInfo> > proxies;
596  download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
597  if (proxies.size() > 0) {
598  string proxy_str = "\nWarning, replicating through proxies\n";
599  proxy_str += " Load-balance groups:\n";
600  for (unsigned i = 0; i < proxies.size(); ++i) {
601  vector<string> urls;
602  for (unsigned j = 0; j < proxies[i].size(); ++j) {
603  urls.push_back(proxies[i][j].url);
604  }
605  proxy_str += " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ")
606  + "\n";
607  }
608  proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] "
609  + proxies[current_group][0].url;
610  LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
611  }
612  download_manager()->SetTimeout(timeout, timeout);
613  download_manager()->SetRetryParameters(retries, 500, 2000);
614  download_manager()->Spawn();
615 
616  // init the download helper
617  ObjectFetcher object_fetcher(repository_name,
618  *stratum0_url,
619  *temp_dir,
620  download_manager(),
621  signature_manager());
622 
623  pthread_t *workers = reinterpret_cast<pthread_t *>(
624  smalloc(sizeof(pthread_t) * num_parallel));
625 
626  // Check if we have a replica-ready server
627  const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
628  download::JobInfo download_sentinel(&url_sentinel, false);
629  retval = download_manager()->Fetch(&download_sentinel);
630  if (retval != download::kFailOk) {
631  if (download_sentinel.http_code() == 404) {
633  "This is not a CernVM-FS server for replication");
634  } else {
636  "Failed to contact stratum 0 server (%d - %s)", retval,
637  download::Code2Ascii(download_sentinel.error_code()));
638  }
639  goto fini;
640  }
641 
642  m_retval = FetchRemoteManifestEnsemble(
643  *stratum0_url, repository_name, &ensemble);
644  if (m_retval != manifest::kFailOk) {
645  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
646  m_retval, manifest::Code2Ascii(m_retval));
647  goto fini;
648  }
649 
650  // Get meta info
651  meta_info_hash = ensemble.manifest->meta_info();
652  if (!meta_info_hash.IsNull()) {
653  meta_info_hash = ensemble.manifest->meta_info();
654  const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
655  cvmfs::MemSink metainfo_memsink;
656  download::JobInfo download_metainfo(&url, true, false, &meta_info_hash,
657  &metainfo_memsink);
658  dl_retval = download_manager()->Fetch(&download_metainfo);
659  if (dl_retval != download::kFailOk) {
660  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
661  dl_retval, download::Code2Ascii(dl_retval));
662  goto fini;
663  }
664  meta_info = string(reinterpret_cast<char *>(metainfo_memsink.data()),
665  metainfo_memsink.pos());
666  }
667 
669 
670  // Manifest available, now the spooler's hash algorithm can be determined
671  // That doesn't actually matter because the replication does no re-hashing
672  if (!preload_cache) {
673  const upload::SpoolerDefinition spooler_definition(
674  spooler_definition_str, ensemble.manifest->GetHashAlgorithm());
675  spooler = upload::Spooler::Construct(spooler_definition);
676  assert(spooler);
677  spooler->RegisterListener(&SpoolerOnUpload);
678  }
679 
680  // Open the reflog for modification
681  if (!preload_cache) {
682  if (initial_snapshot) {
683  LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
684  repository_name.c_str());
685  reflog = CreateEmptyReflog(*temp_dir, repository_name);
686  if (reflog == NULL) {
687  LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
688  goto fini;
689  }
690  } else {
691  ObjectFetcher object_fetcher_stratum1(repository_name,
692  *stratum1_url,
693  *temp_dir,
694  download_manager(),
695  signature_manager());
696 
697  if (!reflog_hash.IsNull()) {
698  reflog = FetchReflog(&object_fetcher_stratum1, repository_name,
699  reflog_hash);
700  assert(reflog != NULL);
701  } else {
702  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
703  if (spooler->Peek(".cvmfsreflog")) {
705  "no reflog hash specified but reflog is present");
706  goto fini;
707  }
708  }
709  }
710 
711  if (reflog != NULL) {
713  // On commit: use manifest's hash algorithm
714  reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
715  }
716  }
717 
718  // Fetch tag list.
719  // If we are just preloading the cache it is not strictly necessarily to
720  // download the entire tag list
721  // TODO(molina): add user option to download tags when preloading the cache
722  if (!ensemble.manifest->history().IsNull() && !preload_cache) {
723  shash::Any history_hash = ensemble.manifest->history();
724  const string history_url = *stratum0_url + "/data/"
725  + history_hash.MakePath();
726  const string history_path = *temp_dir + "/" + history_hash.ToString();
727 
728  cvmfs::PathSink pathsink(history_path);
729  download::JobInfo download_history(&history_url, false, false,
730  &history_hash, &pathsink);
731  dl_retval = download_manager()->Fetch(&download_history);
732  if (dl_retval != download::kFailOk) {
733  ReportDownloadError(download_history);
734  goto fini;
735  }
736  const std::string history_db_path = history_path + ".uncompressed";
737  retval = zlib::DecompressPath2Path(history_path, history_db_path);
738  assert(retval);
739  history::History *tag_db = history::SqliteHistory::Open(history_db_path);
740  if (NULL == tag_db) {
741  LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
742  history_db_path.c_str());
743  unlink(history_db_path.c_str());
744  goto fini;
745  }
746  retval = tag_db->List(&historic_tags);
747  delete tag_db;
748  unlink(history_db_path.c_str());
749  if (!retval) {
750  LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
751  history_db_path.c_str());
752  goto fini;
753  }
754 
755  LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots",
756  historic_tags.size());
757  // TODO(jblomer): We should repliacte the previous history dbs, too,
758  // in order to avoid races on fail-over between non-synchronized stratum 1s
759  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
760  Store(history_path, history_hash);
761  WaitForStorage();
762  unlink(history_path.c_str());
763  if (reflog != NULL && !reflog->AddHistory(history_hash)) {
764  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
765  goto fini;
766  }
767  }
768 
769  // Starting threads
771  LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
772  MainWorkerContext mwc;
773  mwc.download_manager = download_manager();
774  for (unsigned i = 0; i < num_parallel; ++i) {
775  int retval = pthread_create(&workers[i], NULL, MainWorker,
776  static_cast<void *>(&mwc));
777  assert(retval == 0);
778  }
779 
780  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
781  retval = Pull(ensemble.manifest->catalog_hash(), "");
782  pull_history = false;
783  if (!historic_tags.empty()) {
784  LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
785  }
786  for (TagVector::const_iterator i = historic_tags.begin(),
787  iend = historic_tags.end();
788  i != iend;
789  ++i) {
790  if (Peek(i->root_hash))
791  continue;
792  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
793  i->name.c_str());
795  bool retval2 = Pull(i->root_hash, "");
796  retval = retval && retval2;
797  }
798 
799  // Stopping threads
800  LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
801  for (unsigned i = 0; i < num_parallel; ++i) {
802  ChunkJob terminate_workers;
803  WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
804  }
805  for (unsigned i = 0; i < num_parallel; ++i) {
806  int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
807  assert(retval == 0);
808  }
810 
811  if (!retval)
812  goto fini;
813 
814  // Upload manifest ensemble
815  {
816  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
817  WaitForStorage();
818 
819  if (!Peek(ensemble.manifest->certificate())) {
820  StoreBuffer(ensemble.cert_buf, ensemble.cert_size,
821  ensemble.manifest->certificate(), true);
822  }
823  if (reflog != NULL
824  && !reflog->AddCertificate(ensemble.manifest->certificate())) {
825  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
826  goto fini;
827  }
828  if (!meta_info_hash.IsNull()) {
829  const unsigned char *info = reinterpret_cast<const unsigned char *>(
830  meta_info.data());
831  StoreBuffer(info, meta_info.size(), meta_info_hash, true);
832  if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
833  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
834  goto fini;
835  }
836  }
837 
838  // Create alternative bootstrapping symlinks for VOMS secured repos
839  if (ensemble.manifest->has_alt_catalog_path()) {
840  const bool success = spooler->PlaceBootstrappingShortcut(
841  ensemble.manifest->certificate())
842  && spooler->PlaceBootstrappingShortcut(
843  ensemble.manifest->catalog_hash())
844  && (ensemble.manifest->history().IsNull()
845  || spooler->PlaceBootstrappingShortcut(
846  ensemble.manifest->history()))
847  && (meta_info_hash.IsNull()
848  || spooler->PlaceBootstrappingShortcut(
849  meta_info_hash));
850 
851  if (!success) {
853  "failed to place root catalog bootstrapping symlinks");
854  goto fini;
855  }
856  }
857 
858  // upload Reflog database
859  if (!preload_cache && reflog != NULL) {
862  string reflog_path = reflog->database_file();
863  delete reflog;
864  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
865  WaitForStorage(); // Reduce the duration of reflog /wo checksum
866  spooler->UploadReflog(reflog_path);
867  spooler->WaitForUpload();
868  unlink(reflog_path.c_str());
869  if (spooler->GetNumberOfErrors()) {
870  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
871  spooler->GetNumberOfErrors());
872  goto fini;
873  }
874  assert(!reflog_chksum_path.empty());
875  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
876  }
877 
878  if (preload_cache) {
879  bool retval = ensemble.manifest->ExportBreadcrumb(*preload_cachedir,
880  0660);
881  assert(retval);
882  } else {
883  // pkcs#7 structure contains content + certificate + signature
884  // So there is no race with whitelist and pkcs7 signature being out of
885  // sync
886  if (ensemble.whitelist_pkcs7_buf) {
888  ".cvmfswhitelist.pkcs7", false);
889  }
890  StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
891  ".cvmfswhitelist", false);
893  ".cvmfspublished", false);
894  }
895  LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64,
896  ensemble.manifest->revision());
897  }
898 
899  WaitForStorage();
901  "Fetched %" PRId64 " new chunks out of %" PRId64 " processed chunks",
902  atomic_read64(&overall_new), atomic_read64(&overall_chunks));
903  result = 0;
904 
905 fini:
906  if (fd_lockfile >= 0)
907  UnlockFile(fd_lockfile);
908  free(workers);
909  delete spooler;
910  delete pathfilter;
911  return result;
912 }
913 
914 } // namespace swissknife
int return_code
the return value of the spooler operation
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:258
std::string database_file() const
Definition: reflog.cc:321
const char kSuffixNone
Definition: compat.h:162
bool IsNull() const
Definition: hash.h:371
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
Definition: catalog.cc:415
bool AddHistory(const shash::Any &history)
Definition: reflog.cc:134
unsigned char * raw_manifest_buf
int64_t atomic_int64
Definition: atomic.h:18
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
bool IsValid() const
Definition: dirtab.h:127
static SqliteHistory * Open(const std::string &file_name)
unsigned char * data()
Definition: sink_mem.h:115
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
Definition: reflog.cc:48
#define PANIC(...)
Definition: exception.h:29
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1013
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:356
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
Definition: compression.cc:94
const std::string * url() const
Definition: jobinfo.h:171
bool AllChunksBegin()
Definition: catalog.cc:412
shash::Any GetPreviousRevision() const
Definition: catalog.cc:544
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
Definition: manifest.h:131
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
Algorithms algorithm
Definition: hash.h:122
string StringifyTime(const time_t seconds, const bool utc)
Definition: string.cc:104
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
static void WaitForStorage()
bool AddCatalog(const shash::Any &catalog)
Definition: reflog.cc:128
unsigned char digest[digest_size_]
Definition: hash.h:121
char algorithm
uint64_t revision() const
Definition: manifest.h:122
int http_code() const
Definition: jobinfo.h:201
static void SpoolerOnUpload(const upload::SpoolerResult &result)
Algorithms
Definition: hash.h:41
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
Definition: posix.cc:803
uint64_t GetLastModified() const
Definition: catalog.cc:522
int64_t String2Int64(const string &value)
Definition: string.cc:234
const char * Code2Ascii(const Failures error)
unsigned GetDigestSize() const
Definition: hash.h:164
Algorithms
Definition: compression.h:44
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:239
bool AllChunksEnd()
Definition: catalog.cc:421
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
bool AddMetainfo(const shash::Any &metainfo)
Definition: reflog.cc:140
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
Definition: hash.h:54
bool DecompressPath2File(const string &src, FILE *fdest)
Definition: compression.cc:681
shash::Any certificate() const
Definition: manifest.h:127
bool IsProxyTransferError(const Failures error)
static void ReportDownloadError(const download::JobInfo &download_job)
void BeginTransaction()
Definition: reflog.cc:281
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string &dest_path, const bool compress)
unsigned char digest[20]
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:308
shash::Any catalog_hash() const
Definition: manifest.h:125
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
Definition: string.cc:77
bool garbage_collectable() const
Definition: manifest.h:130
Failures error_code() const
Definition: jobinfo.h:200
void DropDatabaseFileOwnership()
Definition: reflog.cc:299
bool DirectoryExists(const std::string &path)
Definition: posix.cc:824
bool AddCertificate(const shash::Any &certificate)
Definition: reflog.cc:121
char Suffix
Definition: hash.h:111
shash::Algorithms GetHashAlgorithm() const
Definition: manifest.h:90
std::string MakePathWithoutSuffix() const
Definition: hash.h:323
size_t pos()
Definition: sink_mem.h:114
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:204
const NestedCatalogList ListOwnNestedCatalogs() const
Definition: catalog.cc:646
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Failures Fetch(JobInfo *info)
Definition: download.cc:1982
shash::Any history() const
Definition: manifest.h:128
Definition: mutex.h:42
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
Definition: compression.cc:692
uint64_t GetNumChunks() const
Definition: catalog.cc:530
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
void CommitTransaction()
Definition: reflog.cc:287
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
Definition: catalog.cc:29
const unsigned kMaxDigestSize
Definition: hash.h:71
bool DecompressPath2Path(const string &src, const string &dest)
Definition: compression.cc:383
std::string meta_info() const
Definition: repository.h:128
const int kLogVerboseMsg
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2024
bool IsHostTransferError(const Failures error)
Suffix suffix
Definition: hash.h:123
std::string MakePath() const
Definition: hash.h:306
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:65
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1128
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
shash::Any meta_info() const
Definition: manifest.h:132
void UnlockFile(const int filedes)
Definition: posix.cc:1003
const char * Code2Ascii(const Failures error)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545