CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_pull.cc
Go to the documentation of this file.
1 
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10 // NOLINTNEXTLINE
11 #define __STDC_FORMAT_MACROS
12 
13 #include "cvmfs_config.h"
14 #include "swissknife_pull.h"
15 
16 #include <inttypes.h>
17 #include <pthread.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25 
26 #include "atomic.h"
27 #include "catalog.h"
28 #include "compression.h"
29 #include "download.h"
30 #include "hash.h"
31 #include "history_sqlite.h"
32 #include "logging.h"
33 #include "manifest.h"
34 #include "manifest_fetch.h"
35 #include "object_fetcher.h"
37 #include "reflog.h"
38 #include "signature.h"
39 #include "smalloc.h"
40 #include "upload.h"
41 #include "util/exception.h"
42 #include "util/posix.h"
43 #include "util/shared_ptr.h"
44 #include "util/string.h"
45 #include "util_concurrency.h"
46 
47 using namespace std; // NOLINT
48 
49 namespace swissknife {
50 
51 namespace {
52 
54 
59 class ChunkJob {
60  public:
62  : suffix(shash::kSuffixNone)
63  , hash_algorithm(shash::kAny)
64  , compression_alg(zlib::kZlibDefault) {}
65 
66  ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
67  : suffix(hash.suffix)
68  , hash_algorithm(hash.algorithm)
69  , compression_alg(compression_alg)
70  {
71  memcpy(digest, hash.digest, hash.GetDigestSize());
72  }
73 
74  bool IsTerminateJob() const {
75  return (hash_algorithm == shash::kAny);
76  }
77 
78  shash::Any hash() const {
79  assert(!IsTerminateJob());
80  return shash::Any(hash_algorithm,
81  digest,
82  suffix);
83  }
84 
88  unsigned char digest[shash::kMaxDigestSize];
89 };
90 
91 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
92  unlink(result.local_path.c_str());
93  if (result.return_code != 0) {
94  PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
95  result.local_path.c_str(), result.content_hash.ToString().c_str());
96  }
97 }
98 
102 unsigned num_parallel = 1;
103 bool pull_history = false;
105 uint64_t timestamp_threshold = 0;
107 bool initial_snapshot = false;
108 upload::Spooler *spooler = NULL;
109 int pipe_chunks[2];
110 // required for concurrent reading
111 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
112 unsigned retries = 3;
117 bool preload_cache = false;
118 string *preload_cachedir = NULL;
121 
122 } // anonymous namespace
123 
124 
125 static std::string MakePath(const shash::Any &hash) {
126  return (preload_cache)
127  ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
128  : "data/" + hash.MakePath();
129 }
130 
131 
132 static bool Peek(const string &remote_path) {
133  return (preload_cache) ? FileExists(remote_path)
134  : spooler->Peek(remote_path);
135 }
136 
137 static bool Peek(const shash::Any &remote_hash) {
138  return Peek(MakePath(remote_hash));
139 }
140 
141 static void ReportDownloadError(const download::JobInfo &download_job) {
142  const download::Failures error_code = download_job.error_code;
143  const int http_code = download_job.http_code;
144  const std::string url = *download_job.url;
145 
146  LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
147  url.c_str(), error_code, download::Code2Ascii(error_code));
148 
149  switch (error_code) {
152  LogCvmfs(kLogCvmfs, kLogStderr, "DNS lookup for Stratum 0 failed - "
153  "please check the network connection");
154  break;
155 
157  LogCvmfs(kLogCvmfs, kLogStderr, "unexpected HTTP error code %d - "
158  "please check the stratum 0 health", http_code);
159  break;
160 
162  LogCvmfs(kLogCvmfs, kLogStderr, "downloaded corrupted data - "
163  "please check the stratum 0 health");
164  break;
165 
166  default:
167  if (download::IsProxyTransferError(error_code) ||
168  download::IsHostTransferError(error_code)) {
169  LogCvmfs(kLogCvmfs, kLogStderr, "couldn't reach Stratum 0 - "
170  "please check the network connection");
171  } else {
172  LogCvmfs(kLogCvmfs, kLogStderr, "unexpected error - feel free to file "
173  "a bug report");
174  }
175  }
176 }
177 
178 
179 static void Store(
180  const string &local_path,
181  const string &remote_path,
182  const bool compressed_src)
183 {
184  if (preload_cache) {
185  if (!compressed_src) {
186  int retval = rename(local_path.c_str(), remote_path.c_str());
187  if (retval != 0) {
188  PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
189  remote_path.c_str());
190  }
191  } else {
192  // compressed input
193  string tmp_dest;
194  FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
195  if (fdest == NULL) {
196  PANIC(kLogStderr, "Failed to create temporary file '%s'",
197  remote_path.c_str());
198  }
199  int retval = zlib::DecompressPath2File(local_path, fdest);
200  if (!retval) {
201  PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
202  remote_path.c_str());
203  }
204  fclose(fdest);
205  retval = rename(tmp_dest.c_str(), remote_path.c_str());
206  assert(retval == 0);
207  unlink(local_path.c_str());
208  }
209  } else {
210  spooler->Upload(local_path, remote_path);
211  }
212 }
213 
214 static void Store(
215  const string &local_path,
216  const shash::Any &remote_hash,
217  const bool compressed_src = true)
218 {
219  Store(local_path, MakePath(remote_hash), compressed_src);
220 }
221 
222 
223 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
224  const std::string &dest_path, const bool compress) {
225  string tmp_file;
226  FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
227  assert(ftmp);
228  int retval;
229  if (compress) {
230  shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unsused
231  retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
232  } else {
233  retval = CopyMem2File(buffer, size, ftmp);
234  }
235  assert(retval);
236  fclose(ftmp);
237  Store(tmp_file, dest_path, true);
238 }
239 
240 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
241  const shash::Any &dest_hash, const bool compress) {
242  StoreBuffer(buffer, size, MakePath(dest_hash), compress);
243 }
244 
245 
246 static void WaitForStorage() {
247  if (!preload_cache) spooler->WaitForUpload();
248 }
249 
250 
253 };
254 
255 static void *MainWorker(void *data) {
256  MainWorkerContext *mwc = static_cast<MainWorkerContext*>(data);
257  download::DownloadManager *download_manager = mwc->download_manager;
258 
259  while (1) {
260  ChunkJob next_chunk;
261  {
263  ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
264  }
265  if (next_chunk.IsTerminateJob())
266  break;
267 
268  shash::Any chunk_hash = next_chunk.hash();
269  zlib::Algorithms compression_alg = next_chunk.compression_alg;
270  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
271  chunk_hash.ToString().c_str());
272 
273  if (!Peek(chunk_hash)) {
274  string tmp_file;
275  FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
276  &tmp_file);
277  assert(fchunk);
278  string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
279  download::JobInfo download_chunk(&url_chunk, false, false, fchunk,
280  &chunk_hash);
281 
282  const download::Failures download_result =
283  download_manager->Fetch(&download_chunk);
284  if (download_result != download::kFailOk) {
285  ReportDownloadError(download_chunk);
286  PANIC(kLogStderr, "Download error");
287  }
288  fclose(fchunk);
289  Store(tmp_file, chunk_hash,
290  (compression_alg == zlib::kZlibDefault) ? true : false);
291  atomic_inc64(&overall_new);
292  }
293  if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
295  atomic_dec64(&chunk_queue);
296  }
297  return NULL;
298 }
299 
300 
301 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
302  const std::string &path) {
303  assert(catalog);
304 
305  // Previous catalogs
306  if (pull_history) {
307  shash::Any previous_catalog = catalog->GetPreviousRevision();
308  if (previous_catalog.IsNull()) {
309  LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history");
310  } else {
311  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
312  previous_catalog.ToString().c_str());
313  bool retval = Pull(previous_catalog, path);
314  if (!retval)
315  return false;
316  }
317  }
318 
319  // Nested catalogs (in a nested code block because goto fail...)
320  {
321  const catalog::Catalog::NestedCatalogList nested_catalogs =
322  catalog->ListOwnNestedCatalogs();
323  for (catalog::Catalog::NestedCatalogList::const_iterator i =
324  nested_catalogs.begin(), iEnd = nested_catalogs.end();
325  i != iEnd; ++i)
326  {
327  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
328  i->mountpoint.c_str());
329  bool retval = Pull(i->hash, i->mountpoint.ToString());
330  if (!retval)
331  return false;
332  }
333  }
334 
335  return true;
336 }
337 
338 bool CommandPull::Pull(const shash::Any &catalog_hash,
339  const std::string &path) {
340  int retval;
341  download::Failures dl_retval;
342  assert(shash::kSuffixCatalog == catalog_hash.suffix);
343 
344  // Check if the catalog already exists
345  if (Peek(catalog_hash)) {
346  // Preload: dirtab changed
348  if (!preload_cache) {
349  PANIC(kLogStderr, "to be implemented: -t without -c");
350  }
352  path, MakePath(catalog_hash), catalog_hash);
353  if (catalog == NULL) {
354  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
355  catalog_hash.ToString().c_str());
356  return false;
357  }
358  bool retval = PullRecursion(catalog, path);
359  delete catalog;
360  return retval;
361  }
362 
363  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
364  return true;
365  }
366 
367  // Check if the catalog matches the pathfilter
368  if (path != "" && // necessary to load the root catalog
369  pathfilter &&
370  !pathfilter->IsMatching(path)) {
371  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog in '%s' does not match"
372  " the path specification", path.c_str());
373  return true;
374  }
375 
376  int64_t gauge_chunks = atomic_read64(&overall_chunks);
377  int64_t gauge_new = atomic_read64(&overall_new);
378 
379  // Download and uncompress catalog
380  shash::Any chunk_hash;
381  zlib::Algorithms compression_alg;
382  catalog::Catalog *catalog = NULL;
383  string file_catalog;
384  string file_catalog_vanilla;
385  FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
386  &file_catalog);
387  if (!fcatalog) {
388  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
389  return false;
390  }
391  fclose(fcatalog);
392  FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
393  &file_catalog_vanilla);
394  if (!fcatalog_vanilla) {
395  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
396  unlink(file_catalog.c_str());
397  return false;
398  }
399  const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
400  download::JobInfo download_catalog(&url_catalog, false, false,
401  fcatalog_vanilla, &catalog_hash);
402  dl_retval = download_manager()->Fetch(&download_catalog);
403  fclose(fcatalog_vanilla);
404  if (dl_retval != download::kFailOk) {
405  if (path == "" && is_garbage_collectable) {
406  LogCvmfs(kLogCvmfs, kLogStdout, "skipping missing root catalog %s - "
407  "probably sweeped by garbage collection",
408  catalog_hash.ToString().c_str());
409  goto pull_skip;
410  } else {
411  ReportDownloadError(download_catalog);
412  goto pull_cleanup;
413  }
414  }
415  retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
416  if (!retval) {
417  LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
418  file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
419  goto pull_cleanup;
420  }
421  if (path.empty() && reflog != NULL) {
422  if (!reflog->AddCatalog(catalog_hash)) {
423  LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
424  goto pull_cleanup;
425  }
426  }
427 
428  catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
429  if (catalog == NULL) {
430  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
431  catalog_hash.ToString().c_str());
432  goto pull_cleanup;
433  }
434 
435  // Always pull the HEAD root catalog and nested catalogs
436  if (apply_timestamp_threshold && (path == "") &&
437  (catalog->GetLastModified() < timestamp_threshold))
438  {
440  " Pruning at root catalog from %s due to threshold at %s",
441  StringifyTime(catalog->GetLastModified(), false).c_str(),
442  StringifyTime(timestamp_threshold, false).c_str());
443  delete catalog;
444  goto pull_skip;
445  }
447 
448  // Traverse the chunks
450  " Processing chunks [%" PRIu64 " registered chunks]: ",
451  catalog->GetNumChunks());
452  retval = catalog->AllChunksBegin();
453  if (!retval) {
454  LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
455  goto pull_cleanup;
456  }
457  while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
458  ChunkJob next_chunk(chunk_hash, compression_alg);
459  WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
460  atomic_inc64(&chunk_queue);
461  }
462  catalog->AllChunksEnd();
463  while (atomic_read64(&chunk_queue) != 0) {
464  SafeSleepMs(100);
465  }
466  LogCvmfs(kLogCvmfs, kLogStdout, " fetched %" PRId64 " new chunks out of "
467  "%" PRId64 " unique chunks",
468  atomic_read64(&overall_new)-gauge_new,
469  atomic_read64(&overall_chunks)-gauge_chunks);
470 
471  retval = PullRecursion(catalog, path);
472 
473  delete catalog;
474  unlink(file_catalog.c_str());
475  WaitForStorage();
476  if (!retval)
477  return false;
478  Store(file_catalog_vanilla, catalog_hash);
479  return true;
480 
481  pull_cleanup:
482  delete catalog;
483  unlink(file_catalog.c_str());
484  unlink(file_catalog_vanilla.c_str());
485  return false;
486 
487  pull_skip:
488  unlink(file_catalog.c_str());
489  unlink(file_catalog_vanilla.c_str());
490  return true;
491 }
492 
493 
495  int retval;
496  manifest::Failures m_retval;
497  download::Failures dl_retval;
498  unsigned timeout = 60;
499  int fd_lockfile = -1;
500  string spooler_definition_str;
502  shash::Any meta_info_hash;
503  string meta_info;
504 
505  // Option parsing
506  if (args.find('c') != args.end())
507  preload_cache = true;
508  if (args.find('l') != args.end()) {
509  unsigned log_level =
510  kLogLevel0 << String2Uint64(*args.find('l')->second);
511  if (log_level > kLogNone) {
512  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
513  return 1;
514  }
515  SetLogVerbosity(static_cast<LogLevels>(log_level));
516  }
517  stratum0_url = args.find('u')->second;
518  temp_dir = args.find('x')->second;
519  if (preload_cache) {
520  preload_cachedir = new string(*args.find('r')->second);
521  } else {
522  spooler_definition_str = *args.find('r')->second;
523  }
524  string master_keys = *args.find('k')->second;
525  if (DirectoryExists(master_keys))
526  master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
527  const string repository_name = *args.find('m')->second;
528  string trusted_certs;
529  if (args.find('y') != args.end())
530  trusted_certs = *args.find('y')->second;
531  if (args.find('n') != args.end())
532  num_parallel = String2Uint64(*args.find('n')->second);
533  if (args.find('t') != args.end())
534  timeout = String2Uint64(*args.find('t')->second);
535  if (args.find('a') != args.end())
536  retries = String2Uint64(*args.find('a')->second);
537  if (args.find('d') != args.end()) {
538  pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
540  }
541  if (args.find('p') != args.end())
542  pull_history = true;
543  if (args.find('z') != args.end())
545  if (args.find('w') != args.end())
546  stratum1_url = args.find('w')->second;
547  if (args.find('i') != args.end())
548  initial_snapshot = true;
549  shash::Any reflog_hash;
550  string reflog_chksum_path;
551  if (args.find('R') != args.end()) {
552  reflog_chksum_path = *args.find('R')->second;
553  if (!initial_snapshot) {
554  if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
555  LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
556  return 1;
557  }
558  }
559  }
560  if (args.find('Z') != args.end()) {
561  timestamp_threshold = String2Int64(*args.find('Z')->second);
562  }
563 
564  if (!preload_cache && stratum1_url.Get() == NULL) {
565  LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
566  return 1;
567  }
568 
569  typedef std::vector<history::History::Tag> TagVector;
570  TagVector historic_tags;
571 
572  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
573  stratum0_url->c_str());
574 
575  int result = 1;
576 
577  // Initialization
578  atomic_init64(&overall_chunks);
579  atomic_init64(&overall_new);
580  atomic_init64(&chunk_queue);
581 
582  const bool follow_redirects = false;
583  const unsigned max_pool_handles = num_parallel+1;
584  const string proxy =
585  (args.find('@') != args.end()) ? *args.find('@')->second : "";
586 
587  if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
588  return 1;
589  }
590 
591  if (!this->InitVerifyingSignatureManager(master_keys, trusted_certs)) {
592  LogCvmfs(kLogCvmfs, kLogStderr, "failed to initalize CVMFS signatures");
593  return 1;
594  } else {
596  "CernVM-FS: using public key(s) %s",
597  JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
598  if (!trusted_certs.empty()) {
600  "CernVM-FS: using trusted certificates in %s",
601  JoinStrings(SplitString(trusted_certs, ':'), ", ").c_str());
602  }
603  }
604 
605  unsigned current_group;
606  vector< vector<download::DownloadManager::ProxyInfo> > proxies;
607  download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
608  if (proxies.size() > 0) {
609  string proxy_str = "\nWarning, replicating through proxies\n";
610  proxy_str += " Load-balance groups:\n";
611  for (unsigned i = 0; i < proxies.size(); ++i) {
612  vector<string> urls;
613  for (unsigned j = 0; j < proxies[i].size(); ++j) {
614  urls.push_back(proxies[i][j].url);
615  }
616  proxy_str +=
617  " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") + "\n";
618  }
619  proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] " +
620  proxies[current_group][0].url;
621  LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
622  }
623  download_manager()->SetTimeout(timeout, timeout);
624  download_manager()->SetRetryParameters(retries, 500, 2000);
625  download_manager()->Spawn();
626 
627  // init the download helper
628  ObjectFetcher object_fetcher(repository_name,
629  *stratum0_url,
630  *temp_dir,
631  download_manager(),
632  signature_manager());
633 
634  pthread_t *workers =
635  reinterpret_cast<pthread_t *>(smalloc(sizeof(pthread_t) * num_parallel));
636 
637  // Check if we have a replica-ready server
638  const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
639  download::JobInfo download_sentinel(&url_sentinel, false);
640  retval = download_manager()->Fetch(&download_sentinel);
641  if (retval != download::kFailOk) {
642  if (download_sentinel.http_code == 404) {
644  "This is not a CernVM-FS server for replication");
645  } else {
647  "Failed to contact stratum 0 server (%d - %s)",
648  retval, download::Code2Ascii(download_sentinel.error_code));
649  }
650  goto fini;
651  }
652 
653  m_retval = FetchRemoteManifestEnsemble(*stratum0_url,
654  repository_name,
655  &ensemble);
656  if (m_retval != manifest::kFailOk) {
657  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
658  m_retval, manifest::Code2Ascii(m_retval));
659  goto fini;
660  }
661 
662  // Get meta info
663  meta_info_hash = ensemble.manifest->meta_info();
664  if (!meta_info_hash.IsNull()) {
665  meta_info_hash = ensemble.manifest->meta_info();
666  const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
667  download::JobInfo download_metainfo(&url, true, false, &meta_info_hash);
668  dl_retval = download_manager()->Fetch(&download_metainfo);
669  if (dl_retval != download::kFailOk) {
670  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
671  dl_retval, download::Code2Ascii(dl_retval));
672  goto fini;
673  }
674  meta_info = string(download_metainfo.destination_mem.data,
675  download_metainfo.destination_mem.pos);
676  }
677 
679 
680  // Manifest available, now the spooler's hash algorithm can be determined
681  // That doesn't actually matter because the replication does no re-hashing
682  if (!preload_cache) {
684  spooler_definition(spooler_definition_str,
685  ensemble.manifest->GetHashAlgorithm());
686  spooler = upload::Spooler::Construct(spooler_definition);
687  assert(spooler);
688  spooler->RegisterListener(&SpoolerOnUpload);
689  }
690 
691  // Open the reflog for modification
692  if (!preload_cache) {
693  if (initial_snapshot) {
694  LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
695  repository_name.c_str());
696  reflog = CreateEmptyReflog(*temp_dir, repository_name);
697  if (reflog == NULL) {
698  LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
699  goto fini;
700  }
701  } else {
702  ObjectFetcher object_fetcher_stratum1(repository_name,
703  *stratum1_url,
704  *temp_dir,
705  download_manager(),
706  signature_manager());
707 
708  if (!reflog_hash.IsNull()) {
709  reflog =
710  FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash);
711  assert(reflog != NULL);
712  } else {
713  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
714  if (spooler->Peek("/.cvmfsreflog")) {
716  "no reflog hash specified but reflog is present");
717  goto fini;
718  }
719  }
720  }
721 
722  if (reflog != NULL) {
724  // On commit: use manifest's hash algorithm
725  reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
726  }
727  }
728 
729  // Fetch tag list.
730  // If we are just preloading the cache it is not strictly necessarily to
731  // download the entire tag list
732  // TODO(molina): add user option to download tags when preloading the cache
733  if (!ensemble.manifest->history().IsNull() && !preload_cache) {
734  shash::Any history_hash = ensemble.manifest->history();
735  const string history_url = *stratum0_url + "/data/"
736  + history_hash.MakePath();
737  const string history_path = *temp_dir + "/" + history_hash.ToString();
738  download::JobInfo download_history(&history_url, false, false,
739  &history_path,
740  &history_hash);
741  dl_retval = download_manager()->Fetch(&download_history);
742  if (dl_retval != download::kFailOk) {
743  ReportDownloadError(download_history);
744  goto fini;
745  }
746  const std::string history_db_path = history_path + ".uncompressed";
747  retval = zlib::DecompressPath2Path(history_path, history_db_path);
748  assert(retval);
749  history::History *tag_db = history::SqliteHistory::Open(history_db_path);
750  if (NULL == tag_db) {
751  LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
752  history_db_path.c_str());
753  unlink(history_db_path.c_str());
754  goto fini;
755  }
756  retval = tag_db->List(&historic_tags);
757  delete tag_db;
758  unlink(history_db_path.c_str());
759  if (!retval) {
760  LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
761  history_db_path.c_str());
762  goto fini;
763  }
764 
765  LogCvmfs(kLogCvmfs, kLogStdout, "Found %u named snapshots",
766  historic_tags.size());
767  // TODO(jblomer): We should repliacte the previous history dbs, too,
768  // in order to avoid races on fail-over between non-synchronized stratum 1s
769  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
770  Store(history_path, history_hash);
771  WaitForStorage();
772  unlink(history_path.c_str());
773  if (reflog != NULL && !reflog->AddHistory(history_hash)) {
774  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
775  goto fini;
776  }
777  }
778 
779  // Starting threads
781  LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
782  MainWorkerContext mwc;
783  mwc.download_manager = download_manager();
784  for (unsigned i = 0; i < num_parallel; ++i) {
785  int retval = pthread_create(&workers[i], NULL, MainWorker,
786  static_cast<void*>(&mwc));
787  assert(retval == 0);
788  }
789 
790  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
791  retval = Pull(ensemble.manifest->catalog_hash(), "");
792  pull_history = false;
793  if (!historic_tags.empty()) {
794  LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
795  }
796  for (TagVector::const_iterator i = historic_tags.begin(),
797  iend = historic_tags.end();
798  i != iend; ++i)
799  {
800  if (Peek(i->root_hash))
801  continue;
802  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
803  i->name.c_str());
805  bool retval2 = Pull(i->root_hash, "");
806  retval = retval && retval2;
807  }
808 
809  // Stopping threads
810  LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
811  for (unsigned i = 0; i < num_parallel; ++i) {
812  ChunkJob terminate_workers;
813  WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
814  }
815  for (unsigned i = 0; i < num_parallel; ++i) {
816  int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
817  assert(retval == 0);
818  }
820 
821  if (!retval)
822  goto fini;
823 
824  // Upload manifest ensemble
825  {
826  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
827  WaitForStorage();
828 
829  if (!Peek(ensemble.manifest->certificate())) {
830  StoreBuffer(ensemble.cert_buf,
831  ensemble.cert_size,
832  ensemble.manifest->certificate(), true);
833  }
834  if (reflog != NULL &&
835  !reflog->AddCertificate(ensemble.manifest->certificate())) {
836  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
837  goto fini;
838  }
839  if (!meta_info_hash.IsNull()) {
840  const unsigned char *info = reinterpret_cast<const unsigned char *>(
841  meta_info.data());
842  StoreBuffer(info, meta_info.size(), meta_info_hash, true);
843  if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
844  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
845  goto fini;
846  }
847  }
848 
849  // Create alternative bootstrapping symlinks for VOMS secured repos
850  if (ensemble.manifest->has_alt_catalog_path()) {
851  const bool success =
852  spooler->PlaceBootstrappingShortcut(ensemble.manifest->certificate()) &&
853  spooler->PlaceBootstrappingShortcut(ensemble.manifest->catalog_hash())
854  && (ensemble.manifest->history().IsNull() ||
855  spooler->PlaceBootstrappingShortcut(ensemble.manifest->history()))
856  && (meta_info_hash.IsNull() ||
857  spooler->PlaceBootstrappingShortcut(meta_info_hash));
858 
859  if (!success) {
861  "failed to place root catalog bootstrapping symlinks");
862  goto fini;
863  }
864  }
865 
866  // upload Reflog database
867  if (!preload_cache && reflog != NULL) {
870  string reflog_path = reflog->database_file();
871  delete reflog;
872  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
873  WaitForStorage(); // Reduce the duration of reflog /wo checksum
874  spooler->UploadReflog(reflog_path);
875  spooler->WaitForUpload();
876  unlink(reflog_path.c_str());
877  if (spooler->GetNumberOfErrors()) {
878  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
879  spooler->GetNumberOfErrors());
880  goto fini;
881  }
882  assert(!reflog_chksum_path.empty());
883  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
884  }
885 
886  if (preload_cache) {
887  bool retval =
888  ensemble.manifest->ExportBreadcrumb(*preload_cachedir, 0660);
889  assert(retval);
890  } else {
891  // pkcs#7 structure contains content + certificate + signature
892  // So there is no race with whitelist and pkcs7 signature being out of
893  // sync
894  if (ensemble.whitelist_pkcs7_buf) {
896  ".cvmfswhitelist.pkcs7", false);
897  }
898  StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
899  ".cvmfswhitelist", false);
901  ".cvmfspublished", false);
902  }
903  LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %u",
904  ensemble.manifest->revision());
905  }
906 
907  WaitForStorage();
908  LogCvmfs(kLogCvmfs, kLogStdout, "Fetched %" PRId64 " new chunks out of %"
909  PRId64 " processed chunks",
910  atomic_read64(&overall_new), atomic_read64(&overall_chunks));
911  result = 0;
912 
913  fini:
914  if (fd_lockfile >= 0)
915  UnlockFile(fd_lockfile);
916  free(workers);
917  delete spooler;
918  delete pathfilter;
919  return result;
920 }
921 
922 } // namespace swissknife
int return_code
the return value of the spooler operation
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:260
std::string database_file() const
Definition: reflog.cc:337
const char kSuffixNone
Definition: compat.h:163
bool IsNull() const
Definition: hash.h:382
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
Definition: catalog.cc:427
bool AddHistory(const shash::Any &history)
Definition: reflog.cc:134
unsigned char * raw_manifest_buf
int64_t atomic_int64
Definition: atomic.h:18
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
bool IsValid() const
Definition: dirtab.h:127
static SqliteHistory * Open(const std::string &file_name)
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
Definition: reflog.cc:47
#define PANIC(...)
Definition: exception.h:26
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1030
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:318
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
Definition: compression.cc:86
bool AllChunksBegin()
Definition: catalog.cc:422
shash::Any GetPreviousRevision() const
Definition: catalog.cc:555
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
Definition: manifest.h:130
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
Algorithms algorithm
Definition: hash.h:124
string StringifyTime(const time_t seconds, const bool utc)
Definition: string.cc:105
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
static void WaitForStorage()
bool AddCatalog(const shash::Any &catalog)
Definition: reflog.cc:128
unsigned char digest[digest_size_]
Definition: hash.h:123
char algorithm
uint64_t revision() const
Definition: manifest.h:121
static void SpoolerOnUpload(const upload::SpoolerResult &result)
Algorithms
Definition: hash.h:40
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
Definition: posix.cc:816
uint64_t GetLastModified() const
Definition: catalog.cc:533
int64_t String2Int64(const string &value)
Definition: string.cc:222
const char * Code2Ascii(const Failures error)
Definition: download.h:88
unsigned GetDigestSize() const
Definition: hash.h:167
Algorithms
Definition: compression.h:44
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:232
bool AllChunksEnd()
Definition: catalog.cc:433
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
bool AddMetainfo(const shash::Any &metainfo)
Definition: reflog.cc:140
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
Definition: hash.h:53
bool DecompressPath2File(const string &src, FILE *fdest)
Definition: compression.cc:667
shash::Any certificate() const
Definition: manifest.h:126
bool IsProxyTransferError(const Failures error)
Definition: download.h:76
static void ReportDownloadError(const download::JobInfo &download_job)
void BeginTransaction()
Definition: reflog.cc:295
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string &dest_path, const bool compress)
unsigned char digest[20]
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:322
shash::Any catalog_hash() const
Definition: manifest.h:124
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool garbage_collectable() const
Definition: manifest.h:129
void DropDatabaseFileOwnership()
Definition: reflog.cc:313
bool DirectoryExists(const std::string &path)
Definition: posix.cc:838
bool AddCertificate(const shash::Any &certificate)
Definition: reflog.cc:121
char Suffix
Definition: hash.h:113
shash::Algorithms GetHashAlgorithm() const
Definition: manifest.h:83
std::string MakePathWithoutSuffix() const
Definition: hash.h:334
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:208
const NestedCatalogList ListOwnNestedCatalogs() const
Definition: catalog.cc:656
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Failures error_code
Definition: download.h:289
Failures Fetch(JobInfo *info)
Definition: download.cc:1716
struct download::JobInfo::@3 destination_mem
shash::Any history() const
Definition: manifest.h:127
Definition: mutex.h:42
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
Definition: compression.cc:678
uint64_t GetNumChunks() const
Definition: catalog.cc:541
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
void CommitTransaction()
Definition: reflog.cc:301
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
Definition: catalog.cc:29
const unsigned kMaxDigestSize
Definition: hash.h:71
bool DecompressPath2Path(const string &src, const string &dest)
Definition: compression.cc:381
std::string meta_info() const
Definition: repository.h:126
const int kLogVerboseMsg
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1918
bool IsHostTransferError(const Failures error)
Definition: download.h:64
Suffix suffix
Definition: hash.h:125
std::string MakePath() const
Definition: hash.h:315
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:64
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:534
static void size_t size
Definition: smalloc.h:47
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:546
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1149
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:584
const std::string * url
Definition: download.h:152
shash::Any meta_info() const
Definition: manifest.h:131
void UnlockFile(const int filedes)
Definition: posix.cc:1020
const char * Code2Ascii(const Failures error)