CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_pull.cc
Go to the documentation of this file.
1 
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10 // NOLINTNEXTLINE
11 #define __STDC_FORMAT_MACROS
12 
13 
14 #include "swissknife_pull.h"
15 
16 #include <inttypes.h>
17 #include <pthread.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25 
26 #include "catalog.h"
28 #include "crypto/hash.h"
29 #include "crypto/signature.h"
30 #include "history_sqlite.h"
31 #include "manifest.h"
32 #include "manifest_fetch.h"
33 #include "network/download.h"
34 #include "object_fetcher.h"
36 #include "reflog.h"
37 #include "upload.h"
38 #include "util/atomic.h"
39 #include "util/concurrency.h"
40 #include "util/exception.h"
41 #include "util/logging.h"
42 #include "util/posix.h"
43 #include "util/shared_ptr.h"
44 #include "util/smalloc.h"
45 #include "util/string.h"
46 
47 using namespace std; // NOLINT
48 
49 namespace swissknife {
50 
51 namespace {
52 
54 
59 class ChunkJob {
60  public:
62  : suffix(shash::kSuffixNone)
63  , hash_algorithm(shash::kAny)
64  , compression_alg(zlib::kZlibDefault) { }
65 
66  ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
67  : suffix(hash.suffix)
68  , hash_algorithm(hash.algorithm)
69  , compression_alg(compression_alg) {
70  memcpy(digest, hash.digest, hash.GetDigestSize());
71  }
72 
73  bool IsTerminateJob() const { return (hash_algorithm == shash::kAny); }
74 
75  shash::Any hash() const {
76  assert(!IsTerminateJob());
77  return shash::Any(hash_algorithm, digest, suffix);
78  }
79 
83  unsigned char digest[shash::kMaxDigestSize];
84 };
85 
86 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
87  unlink(result.local_path.c_str());
88  if (result.return_code != 0) {
89  PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
90  result.local_path.c_str(), result.content_hash.ToString().c_str());
91  }
92 }
93 
97 unsigned num_parallel = 1;
98 bool pull_history = false;
100 uint64_t timestamp_threshold = 0;
102 bool initial_snapshot = false;
103 upload::Spooler *spooler = NULL;
104 int pipe_chunks[2];
105 // required for concurrent reading
106 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
107 unsigned retries = 3;
112 bool preload_cache = false;
113 string *preload_cachedir = NULL;
116 
117 } // anonymous namespace
118 
119 
120 static std::string MakePath(const shash::Any &hash) {
121  return (preload_cache)
122  ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
123  : "data/" + hash.MakePath();
124 }
125 
126 
127 static bool Peek(const string &remote_path) {
128  return (preload_cache) ? FileExists(remote_path) : spooler->Peek(remote_path);
129 }
130 
131 static bool Peek(const shash::Any &remote_hash) {
132  return Peek(MakePath(remote_hash));
133 }
134 
135 static void ReportDownloadError(const download::JobInfo &download_job) {
136  const download::Failures error_code = download_job.error_code();
137  const int http_code = download_job.http_code();
138  const std::string url = *download_job.url();
139 
140  LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
141  url.c_str(), error_code, download::Code2Ascii(error_code));
142 
143  switch (error_code) {
147  "DNS lookup for Stratum 0 failed - "
148  "please check the network connection");
149  break;
150 
153  "unexpected HTTP error code %d - "
154  "please check the stratum 0 health",
155  http_code);
156  break;
157 
160  "downloaded corrupted data - "
161  "please check the stratum 0 health");
162  break;
163 
164  default:
165  if (download::IsProxyTransferError(error_code)
166  || download::IsHostTransferError(error_code)) {
168  "couldn't reach Stratum 0 - "
169  "please check the network connection");
170  } else {
172  "unexpected error - feel free to file "
173  "a bug report");
174  }
175  }
176 }
177 
178 
179 static void Store(const string &local_path,
180  const string &remote_path,
181  const bool compressed_src) {
182  if (preload_cache) {
183  if (!compressed_src) {
184  const int retval = rename(local_path.c_str(), remote_path.c_str());
185  if (retval != 0) {
186  PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
187  remote_path.c_str());
188  }
189  } else {
190  // compressed input
191  string tmp_dest;
192  FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
193  if (fdest == NULL) {
194  PANIC(kLogStderr, "Failed to create temporary file '%s'",
195  remote_path.c_str());
196  }
197  int retval = zlib::DecompressPath2File(local_path, fdest);
198  if (!retval) {
199  PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
200  remote_path.c_str());
201  }
202  fclose(fdest);
203  retval = rename(tmp_dest.c_str(), remote_path.c_str());
204  assert(retval == 0);
205  unlink(local_path.c_str());
206  }
207  } else {
208  spooler->Upload(local_path, remote_path);
209  }
210 }
211 
212 static void Store(const string &local_path,
213  const shash::Any &remote_hash,
214  const bool compressed_src = true) {
215  Store(local_path, MakePath(remote_hash), compressed_src);
216 }
217 
218 
219 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
220  const std::string &dest_path, const bool compress) {
221  string tmp_file;
222  FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
223  assert(ftmp);
224  int retval;
225  if (compress) {
226  shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused
227  retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
228  } else {
229  retval = CopyMem2File(buffer, size, ftmp);
230  }
231  assert(retval);
232  fclose(ftmp);
233  Store(tmp_file, dest_path, true);
234 }
235 
236 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
237  const shash::Any &dest_hash, const bool compress) {
238  StoreBuffer(buffer, size, MakePath(dest_hash), compress);
239 }
240 
241 
242 static void WaitForStorage() {
243  if (!preload_cache)
244  spooler->WaitForUpload();
245 }
246 
247 
250 };
251 
252 static void *MainWorker(void *data) {
253  MainWorkerContext *mwc = static_cast<MainWorkerContext *>(data);
254  download::DownloadManager *download_manager = mwc->download_manager;
255 
256  while (1) {
257  ChunkJob next_chunk;
258  {
259  const MutexLockGuard m(&lock_pipe);
260  ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
261  }
262  if (next_chunk.IsTerminateJob())
263  break;
264 
265  const shash::Any chunk_hash = next_chunk.hash();
266  const zlib::Algorithms compression_alg = next_chunk.compression_alg;
267  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
268  chunk_hash.ToString().c_str());
269 
270  if (!Peek(chunk_hash)) {
271  string tmp_file;
272  FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
273  const string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
274  cvmfs::FileSink filesink(fchunk);
275  download::JobInfo download_chunk(&url_chunk, false, false, &chunk_hash,
276  &filesink);
277 
278  const download::Failures download_result = download_manager->Fetch(
279  &download_chunk);
280  if (download_result != download::kFailOk) {
281  ReportDownloadError(download_chunk);
282  PANIC(kLogStderr, "Download error");
283  }
284  fclose(fchunk);
285  Store(tmp_file, chunk_hash,
286  (compression_alg == zlib::kZlibDefault) ? true : false);
287  atomic_inc64(&overall_new);
288  }
289  if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
291  atomic_dec64(&chunk_queue);
292  }
293  return NULL;
294 }
295 
296 
297 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
298  const std::string &path) {
299  assert(catalog);
300 
301  // Previous catalogs
302  if (pull_history) {
303  const shash::Any previous_catalog = catalog->GetPreviousRevision();
304  if (previous_catalog.IsNull()) {
305  LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history");
306  } else {
307  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
308  previous_catalog.ToString().c_str());
309  const bool retval = Pull(previous_catalog, path);
310  if (!retval)
311  return false;
312  }
313  }
314 
315  // Nested catalogs (in a nested code block because goto fail...)
316  {
318  nested_catalogs = catalog->ListOwnNestedCatalogs();
319  for (catalog::Catalog::NestedCatalogList::const_iterator
320  i = nested_catalogs.begin(),
321  iEnd = nested_catalogs.end();
322  i != iEnd; ++i) {
323  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
324  i->mountpoint.c_str());
325  const bool retval = Pull(i->hash, i->mountpoint.ToString());
326  if (!retval)
327  return false;
328  }
329  }
330 
331  return true;
332 }
333 
334 bool CommandPull::Pull(const shash::Any &catalog_hash,
335  const std::string &path) {
336  int retval;
337  download::Failures dl_retval;
338  assert(shash::kSuffixCatalog == catalog_hash.suffix);
339 
340  // Check if the catalog already exists
341  if (Peek(catalog_hash)) {
342  // Preload: dirtab changed
344  if (!preload_cache) {
345  PANIC(kLogStderr, "to be implemented: -t without -c");
346  }
348  path, MakePath(catalog_hash), catalog_hash);
349  if (catalog == NULL) {
350  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
351  catalog_hash.ToString().c_str());
352  return false;
353  }
354  const bool retval = PullRecursion(catalog, path);
355  delete catalog;
356  return retval;
357  }
358 
359  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
360  return true;
361  }
362 
363  // Check if the catalog matches the pathfilter
364  if (path != "" && // necessary to load the root catalog
365  pathfilter && !pathfilter->IsMatching(path)) {
367  " Catalog in '%s' does not match"
368  " the path specification",
369  path.c_str());
370  return true;
371  }
372 
373  const int64_t gauge_chunks = atomic_read64(&overall_chunks);
374  const int64_t gauge_new = atomic_read64(&overall_new);
375 
376  // Download and uncompress catalog
377  shash::Any chunk_hash;
378  zlib::Algorithms compression_alg;
379  catalog::Catalog *catalog = NULL;
380  string file_catalog;
381  string file_catalog_vanilla;
382  FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
383  &file_catalog);
384  if (!fcatalog) {
385  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
386  return false;
387  }
388  fclose(fcatalog);
389  FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
390  &file_catalog_vanilla);
391  if (!fcatalog_vanilla) {
392  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
393  unlink(file_catalog.c_str());
394  return false;
395  }
396  const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
397  cvmfs::FileSink filesink(fcatalog_vanilla);
398  download::JobInfo download_catalog(&url_catalog, false, false, &catalog_hash,
399  &filesink);
400  dl_retval = download_manager()->Fetch(&download_catalog);
401  fclose(fcatalog_vanilla);
402  if (dl_retval != download::kFailOk) {
403  if (path == "" && is_garbage_collectable) {
405  "skipping missing root catalog %s - "
406  "probably sweeped by garbage collection",
407  catalog_hash.ToString().c_str());
408  goto pull_skip;
409  } else {
410  ReportDownloadError(download_catalog);
411  goto pull_cleanup;
412  }
413  }
414  retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
415  if (!retval) {
416  LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
417  file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
418  goto pull_cleanup;
419  }
420  if (path.empty() && reflog != NULL) {
421  if (!reflog->AddCatalog(catalog_hash)) {
422  LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
423  goto pull_cleanup;
424  }
425  }
426 
427  catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
428  if (catalog == NULL) {
429  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
430  catalog_hash.ToString().c_str());
431  goto pull_cleanup;
432  }
433 
434  // Always pull the HEAD root catalog and nested catalogs
435  if (apply_timestamp_threshold && (path == "")
436  && (catalog->GetLastModified() < timestamp_threshold)) {
438  " Pruning at root catalog from %s due to threshold at %s",
439  StringifyTime(catalog->GetLastModified(), false).c_str(),
440  StringifyTime(timestamp_threshold, false).c_str());
441  delete catalog;
442  goto pull_skip;
443  }
445 
446  // Traverse the chunks
448  " Processing chunks [%" PRIu64 " registered chunks]: ",
449  catalog->GetNumChunks());
450  retval = catalog->AllChunksBegin();
451  if (!retval) {
452  LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
453  goto pull_cleanup;
454  }
455  while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
456  ChunkJob next_chunk(chunk_hash, compression_alg);
457  WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
458  atomic_inc64(&chunk_queue);
459  }
460  catalog->AllChunksEnd();
461  while (atomic_read64(&chunk_queue) != 0) {
462  SafeSleepMs(100);
463  }
465  " fetched %" PRId64 " new chunks out of "
466  "%" PRId64 " unique chunks",
467  atomic_read64(&overall_new) - gauge_new,
468  atomic_read64(&overall_chunks) - gauge_chunks);
469 
470  retval = PullRecursion(catalog, path);
471 
472  delete catalog;
473  unlink(file_catalog.c_str());
474  WaitForStorage();
475  if (!retval)
476  return false;
477  Store(file_catalog_vanilla, catalog_hash);
478  return true;
479 
480 pull_cleanup:
481  delete catalog;
482  unlink(file_catalog.c_str());
483  unlink(file_catalog_vanilla.c_str());
484  return false;
485 
486 pull_skip:
487  unlink(file_catalog.c_str());
488  unlink(file_catalog_vanilla.c_str());
489  return true;
490 }
491 
492 
494  int retval;
495  manifest::Failures m_retval;
496  download::Failures dl_retval;
497  unsigned timeout = 60;
498  const int fd_lockfile = -1;
499  string spooler_definition_str;
501  shash::Any meta_info_hash;
502  string meta_info;
503 
504  // Option parsing
505  if (args.find('c') != args.end())
506  preload_cache = true;
507  if (args.find('l') != args.end()) {
508  const unsigned log_level = kLogLevel0
509  << String2Uint64(*args.find('l')->second);
510  if (log_level > kLogNone) {
511  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
512  return 1;
513  }
514  SetLogVerbosity(static_cast<LogLevels>(log_level));
515  }
516  stratum0_url = args.find('u')->second;
517  temp_dir = args.find('x')->second;
518  if (preload_cache) {
519  preload_cachedir = new string(*args.find('r')->second);
520  } else {
521  spooler_definition_str = *args.find('r')->second;
522  }
523  string master_keys = *args.find('k')->second;
524  if (DirectoryExists(master_keys))
525  master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
526  const string repository_name = *args.find('m')->second;
527  if (args.find('n') != args.end())
528  num_parallel = String2Uint64(*args.find('n')->second);
529  if (args.find('t') != args.end())
530  timeout = String2Uint64(*args.find('t')->second);
531  if (args.find('a') != args.end())
532  retries = String2Uint64(*args.find('a')->second);
533  if (args.find('d') != args.end()) {
534  pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
536  }
537  if (args.find('p') != args.end())
538  pull_history = true;
539  if (args.find('z') != args.end())
541  if (args.find('w') != args.end())
542  stratum1_url = args.find('w')->second;
543  if (args.find('i') != args.end())
544  initial_snapshot = true;
545  shash::Any reflog_hash;
546  string reflog_chksum_path;
547  if (args.find('R') != args.end()) {
548  reflog_chksum_path = *args.find('R')->second;
549  if (!initial_snapshot) {
550  if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
551  LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
552  return 1;
553  }
554  }
555  }
556  if (args.find('Z') != args.end()) {
557  timestamp_threshold = String2Int64(*args.find('Z')->second);
558  }
559 
560  if (!preload_cache && stratum1_url.Get() == NULL) {
561  LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
562  return 1;
563  }
564 
565  typedef std::vector<history::History::Tag> TagVector;
566  TagVector historic_tags;
567 
568  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
569  stratum0_url->c_str());
570 
571  int result = 1;
572 
573  // Initialization
574  atomic_init64(&overall_chunks);
575  atomic_init64(&overall_new);
576  atomic_init64(&chunk_queue);
577 
578  const bool follow_redirects = false;
579  const unsigned max_pool_handles = num_parallel + 1;
580  const string proxy = (args.find('@') != args.end()) ? *args.find('@')->second
581  : "";
582 
583  if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
584  return 1;
585  }
586 
587  if (!this->InitSignatureManager(master_keys)) {
588  LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures");
589  return 1;
590  } else {
591  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: using public key(s) %s",
592  JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
593  }
594 
595  unsigned current_group;
596  vector<vector<download::DownloadManager::ProxyInfo> > proxies;
597  download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
598  if (proxies.size() > 0) {
599  string proxy_str = "\nWarning, replicating through proxies\n";
600  proxy_str += " Load-balance groups:\n";
601  for (unsigned i = 0; i < proxies.size(); ++i) {
602  vector<string> urls;
603  for (unsigned j = 0; j < proxies[i].size(); ++j) {
604  urls.push_back(proxies[i][j].url);
605  }
606  proxy_str += " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ")
607  + "\n";
608  }
609  proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] "
610  + proxies[current_group][0].url;
611  LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
612  }
613  download_manager()->SetTimeout(timeout, timeout);
614  download_manager()->SetRetryParameters(retries, 500, 2000);
615  download_manager()->Spawn();
616 
617  // init the download helper
618  const ObjectFetcher object_fetcher(repository_name, *stratum0_url, *temp_dir,
619  download_manager(), signature_manager());
620 
621  pthread_t *workers = reinterpret_cast<pthread_t *>(
622  smalloc(sizeof(pthread_t) * num_parallel));
623 
624  // Check if we have a replica-ready server
625  const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
626  download::JobInfo download_sentinel(&url_sentinel, false);
627  retval = download_manager()->Fetch(&download_sentinel);
628  if (retval != download::kFailOk) {
629  if (download_sentinel.http_code() == 404) {
631  "This is not a CernVM-FS server for replication");
632  } else {
634  "Failed to contact stratum 0 server (%d - %s)", retval,
635  download::Code2Ascii(download_sentinel.error_code()));
636  }
637  goto fini;
638  }
639 
640  m_retval = FetchRemoteManifestEnsemble(
641  *stratum0_url, repository_name, &ensemble);
642  if (m_retval != manifest::kFailOk) {
643  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
644  m_retval, manifest::Code2Ascii(m_retval));
645  goto fini;
646  }
647 
648  // Get meta info
649  meta_info_hash = ensemble.manifest->meta_info();
650  if (!meta_info_hash.IsNull()) {
651  meta_info_hash = ensemble.manifest->meta_info();
652  const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
653  cvmfs::MemSink metainfo_memsink;
654  download::JobInfo download_metainfo(&url, true, false, &meta_info_hash,
655  &metainfo_memsink);
656  dl_retval = download_manager()->Fetch(&download_metainfo);
657  if (dl_retval != download::kFailOk) {
658  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
659  dl_retval, download::Code2Ascii(dl_retval));
660  goto fini;
661  }
662  meta_info = string(reinterpret_cast<char *>(metainfo_memsink.data()),
663  metainfo_memsink.pos());
664  }
665 
667 
668  // Manifest available, now the spooler's hash algorithm can be determined
669  // That doesn't actually matter because the replication does no re-hashing
670  if (!preload_cache) {
671  const upload::SpoolerDefinition spooler_definition(
672  spooler_definition_str, ensemble.manifest->GetHashAlgorithm());
673  spooler = upload::Spooler::Construct(spooler_definition);
674  assert(spooler);
675  spooler->RegisterListener(&SpoolerOnUpload);
676  }
677 
678  // Open the reflog for modification
679  if (!preload_cache) {
680  if (initial_snapshot) {
681  LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
682  repository_name.c_str());
683  reflog = CreateEmptyReflog(*temp_dir, repository_name);
684  if (reflog == NULL) {
685  LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
686  goto fini;
687  }
688  } else {
689  ObjectFetcher object_fetcher_stratum1(repository_name,
690  *stratum1_url,
691  *temp_dir,
692  download_manager(),
693  signature_manager());
694 
695  if (!reflog_hash.IsNull()) {
696  reflog = FetchReflog(&object_fetcher_stratum1, repository_name,
697  reflog_hash);
698  assert(reflog != NULL);
699  } else {
700  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
701  if (spooler->Peek(".cvmfsreflog")) {
703  "no reflog hash specified but reflog is present");
704  goto fini;
705  }
706  }
707  }
708 
709  if (reflog != NULL) {
711  // On commit: use manifest's hash algorithm
712  reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
713  }
714  }
715 
716  // Fetch tag list.
717  // If we are just preloading the cache it is not strictly necessarily to
718  // download the entire tag list
719  // TODO(molina): add user option to download tags when preloading the cache
720  if (!ensemble.manifest->history().IsNull() && !preload_cache) {
721  const shash::Any history_hash = ensemble.manifest->history();
722  const string history_url = *stratum0_url + "/data/"
723  + history_hash.MakePath();
724  const string history_path = *temp_dir + "/" + history_hash.ToString();
725 
726  cvmfs::PathSink pathsink(history_path);
727  download::JobInfo download_history(&history_url, false, false,
728  &history_hash, &pathsink);
729  dl_retval = download_manager()->Fetch(&download_history);
730  if (dl_retval != download::kFailOk) {
731  ReportDownloadError(download_history);
732  goto fini;
733  }
734  const std::string history_db_path = history_path + ".uncompressed";
735  retval = zlib::DecompressPath2Path(history_path, history_db_path);
736  assert(retval);
737  history::History *tag_db = history::SqliteHistory::Open(history_db_path);
738  if (NULL == tag_db) {
739  LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
740  history_db_path.c_str());
741  unlink(history_db_path.c_str());
742  goto fini;
743  }
744  retval = tag_db->List(&historic_tags);
745  delete tag_db;
746  unlink(history_db_path.c_str());
747  if (!retval) {
748  LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
749  history_db_path.c_str());
750  goto fini;
751  }
752 
753  LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots",
754  historic_tags.size());
755  // TODO(jblomer): We should repliacte the previous history dbs, too,
756  // in order to avoid races on fail-over between non-synchronized stratum 1s
757  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
758  Store(history_path, history_hash);
759  WaitForStorage();
760  unlink(history_path.c_str());
761  if (reflog != NULL && !reflog->AddHistory(history_hash)) {
762  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
763  goto fini;
764  }
765  }
766 
767  // Starting threads
769  LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
770  MainWorkerContext mwc;
771  mwc.download_manager = download_manager();
772  for (unsigned i = 0; i < num_parallel; ++i) {
773  const int retval = pthread_create(&workers[i], NULL, MainWorker,
774  static_cast<void *>(&mwc));
775  assert(retval == 0);
776  }
777 
778  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
779  retval = Pull(ensemble.manifest->catalog_hash(), "");
780  pull_history = false;
781  if (!historic_tags.empty()) {
782  LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
783  }
784  for (TagVector::const_iterator i = historic_tags.begin(),
785  iend = historic_tags.end();
786  i != iend;
787  ++i) {
788  if (Peek(i->root_hash))
789  continue;
790  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
791  i->name.c_str());
793  const bool retval2 = Pull(i->root_hash, "");
794  retval = retval && retval2;
795  }
796 
797  // Stopping threads
798  LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
799  for (unsigned i = 0; i < num_parallel; ++i) {
800  ChunkJob terminate_workers;
801  WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
802  }
803  for (unsigned i = 0; i < num_parallel; ++i) {
804  int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
805  assert(retval == 0);
806  }
808 
809  if (!retval)
810  goto fini;
811 
812  // Upload manifest ensemble
813  {
814  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
815  WaitForStorage();
816 
817  if (!Peek(ensemble.manifest->certificate())) {
818  StoreBuffer(ensemble.cert_buf, ensemble.cert_size,
819  ensemble.manifest->certificate(), true);
820  }
821  if (reflog != NULL
822  && !reflog->AddCertificate(ensemble.manifest->certificate())) {
823  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
824  goto fini;
825  }
826  if (!meta_info_hash.IsNull()) {
827  const unsigned char *info = reinterpret_cast<const unsigned char *>(
828  meta_info.data());
829  StoreBuffer(info, meta_info.size(), meta_info_hash, true);
830  if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
831  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
832  goto fini;
833  }
834  }
835 
836  // Create alternative bootstrapping symlinks for VOMS secured repos
837  if (ensemble.manifest->has_alt_catalog_path()) {
838  const bool success = spooler->PlaceBootstrappingShortcut(
839  ensemble.manifest->certificate())
840  && spooler->PlaceBootstrappingShortcut(
841  ensemble.manifest->catalog_hash())
842  && (ensemble.manifest->history().IsNull()
843  || spooler->PlaceBootstrappingShortcut(
844  ensemble.manifest->history()))
845  && (meta_info_hash.IsNull()
846  || spooler->PlaceBootstrappingShortcut(
847  meta_info_hash));
848 
849  if (!success) {
851  "failed to place root catalog bootstrapping symlinks");
852  goto fini;
853  }
854  }
855 
856  // upload Reflog database
857  if (!preload_cache && reflog != NULL) {
860  const string reflog_path = reflog->database_file();
861  delete reflog;
862  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
863  WaitForStorage(); // Reduce the duration of reflog /wo checksum
864  spooler->UploadReflog(reflog_path);
865  spooler->WaitForUpload();
866  unlink(reflog_path.c_str());
867  if (spooler->GetNumberOfErrors()) {
868  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
869  spooler->GetNumberOfErrors());
870  goto fini;
871  }
872  assert(!reflog_chksum_path.empty());
873  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
874  }
875 
876  if (preload_cache) {
877  const bool retval =
878  ensemble.manifest->ExportBreadcrumb(*preload_cachedir, 0660);
879  assert(retval);
880  } else {
881  // pkcs#7 structure contains content + certificate + signature
882  // So there is no race with whitelist and pkcs7 signature being out of
883  // sync
884  if (ensemble.whitelist_pkcs7_buf) {
886  ".cvmfswhitelist.pkcs7", false);
887  }
888  StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
889  ".cvmfswhitelist", false);
891  ".cvmfspublished", false);
892  }
893  LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64,
894  ensemble.manifest->revision());
895  }
896 
897  WaitForStorage();
899  "Fetched %" PRId64 " new chunks out of %" PRId64 " processed chunks",
900  atomic_read64(&overall_new), atomic_read64(&overall_chunks));
901  result = 0;
902 
903 fini:
904  if (fd_lockfile >= 0)
905  UnlockFile(fd_lockfile);
906  free(workers);
907  delete spooler;
908  delete pathfilter;
909  return result;
910 }
911 
912 } // namespace swissknife
int return_code
the return value of the spooler operation
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:258
std::string database_file() const
Definition: reflog.cc:322
const char kSuffixNone
Definition: compat.h:162
bool IsNull() const
Definition: hash.h:371
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
Definition: catalog.cc:415
bool AddHistory(const shash::Any &history)
Definition: reflog.cc:135
unsigned char * raw_manifest_buf
int64_t atomic_int64
Definition: atomic.h:18
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
bool IsValid() const
Definition: dirtab.h:127
static SqliteHistory * Open(const std::string &file_name)
unsigned char * data()
Definition: sink_mem.h:115
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
Definition: reflog.cc:48
#define PANIC(...)
Definition: exception.h:29
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1014
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:356
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
Definition: compression.cc:94
const std::string * url() const
Definition: jobinfo.h:171
bool AllChunksBegin()
Definition: catalog.cc:412
shash::Any GetPreviousRevision() const
Definition: catalog.cc:544
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
Definition: manifest.h:131
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
Algorithms algorithm
Definition: hash.h:122
string StringifyTime(const time_t seconds, const bool utc)
Definition: string.cc:104
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
static void WaitForStorage()
bool AddCatalog(const shash::Any &catalog)
Definition: reflog.cc:129
unsigned char digest[digest_size_]
Definition: hash.h:121
char algorithm
uint64_t revision() const
Definition: manifest.h:122
int http_code() const
Definition: jobinfo.h:201
static void SpoolerOnUpload(const upload::SpoolerResult &result)
Algorithms
Definition: hash.h:41
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
Definition: posix.cc:803
uint64_t GetLastModified() const
Definition: catalog.cc:522
int64_t String2Int64(const string &value)
Definition: string.cc:234
const char * Code2Ascii(const Failures error)
unsigned GetDigestSize() const
Definition: hash.h:164
Algorithms
Definition: compression.h:44
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:239
bool AllChunksEnd()
Definition: catalog.cc:421
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
bool AddMetainfo(const shash::Any &metainfo)
Definition: reflog.cc:141
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
Definition: hash.h:54
bool DecompressPath2File(const string &src, FILE *fdest)
Definition: compression.cc:681
shash::Any certificate() const
Definition: manifest.h:127
bool IsProxyTransferError(const Failures error)
static void ReportDownloadError(const download::JobInfo &download_job)
void BeginTransaction()
Definition: reflog.cc:282
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string &dest_path, const bool compress)
unsigned char digest[20]
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:309
shash::Any catalog_hash() const
Definition: manifest.h:125
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
Definition: string.cc:77
bool garbage_collectable() const
Definition: manifest.h:130
Failures error_code() const
Definition: jobinfo.h:200
void DropDatabaseFileOwnership()
Definition: reflog.cc:300
bool DirectoryExists(const std::string &path)
Definition: posix.cc:824
bool AddCertificate(const shash::Any &certificate)
Definition: reflog.cc:122
char Suffix
Definition: hash.h:111
shash::Algorithms GetHashAlgorithm() const
Definition: manifest.h:90
std::string MakePathWithoutSuffix() const
Definition: hash.h:323
size_t pos()
Definition: sink_mem.h:114
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:204
const NestedCatalogList ListOwnNestedCatalogs() const
Definition: catalog.cc:646
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Failures Fetch(JobInfo *info)
Definition: download.cc:1984
shash::Any history() const
Definition: manifest.h:128
Definition: mutex.h:42
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
Definition: compression.cc:692
uint64_t GetNumChunks() const
Definition: catalog.cc:530
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
void CommitTransaction()
Definition: reflog.cc:288
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
Definition: catalog.cc:29
const unsigned kMaxDigestSize
Definition: hash.h:71
bool DecompressPath2Path(const string &src, const string &dest)
Definition: compression.cc:383
std::string meta_info() const
Definition: repository.h:128
const int kLogVerboseMsg
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2025
bool IsHostTransferError(const Failures error)
Suffix suffix
Definition: hash.h:123
std::string MakePath() const
Definition: hash.h:306
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:65
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1129
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
shash::Any meta_info() const
Definition: manifest.h:132
void UnlockFile(const int filedes)
Definition: posix.cc:1004
const char * Code2Ascii(const Failures error)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545