CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_pull.cc
Go to the documentation of this file.
1 
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10 // NOLINTNEXTLINE
11 #define __STDC_FORMAT_MACROS
12 
13 
14 #include "swissknife_pull.h"
15 
16 #include <inttypes.h>
17 #include <pthread.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20 
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25 
26 #include "catalog.h"
28 #include "crypto/hash.h"
29 #include "crypto/signature.h"
30 #include "history_sqlite.h"
31 #include "manifest.h"
32 #include "manifest_fetch.h"
33 #include "network/download.h"
34 #include "object_fetcher.h"
36 #include "reflog.h"
37 #include "upload.h"
38 #include "util/atomic.h"
39 #include "util/concurrency.h"
40 #include "util/exception.h"
41 #include "util/logging.h"
42 #include "util/posix.h"
43 #include "util/shared_ptr.h"
44 #include "util/smalloc.h"
45 #include "util/string.h"
46 
47 using namespace std; // NOLINT
48 
49 namespace swissknife {
50 
51 namespace {
52 
54 
59 class ChunkJob {
60  public:
62  : suffix(shash::kSuffixNone)
63  , hash_algorithm(shash::kAny)
64  , compression_alg(zlib::kZlibDefault) {}
65 
66  ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
67  : suffix(hash.suffix)
68  , hash_algorithm(hash.algorithm)
69  , compression_alg(compression_alg)
70  {
71  memcpy(digest, hash.digest, hash.GetDigestSize());
72  }
73 
74  bool IsTerminateJob() const {
75  return (hash_algorithm == shash::kAny);
76  }
77 
78  shash::Any hash() const {
79  assert(!IsTerminateJob());
80  return shash::Any(hash_algorithm,
81  digest,
82  suffix);
83  }
84 
88  unsigned char digest[shash::kMaxDigestSize];
89 };
90 
91 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
92  unlink(result.local_path.c_str());
93  if (result.return_code != 0) {
94  PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
95  result.local_path.c_str(), result.content_hash.ToString().c_str());
96  }
97 }
98 
102 unsigned num_parallel = 1;
103 bool pull_history = false;
105 uint64_t timestamp_threshold = 0;
107 bool initial_snapshot = false;
108 upload::Spooler *spooler = NULL;
109 int pipe_chunks[2];
110 // required for concurrent reading
111 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
112 unsigned retries = 3;
117 bool preload_cache = false;
118 string *preload_cachedir = NULL;
121 
122 } // anonymous namespace
123 
124 
125 static std::string MakePath(const shash::Any &hash) {
126  return (preload_cache)
127  ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
128  : "data/" + hash.MakePath();
129 }
130 
131 
132 static bool Peek(const string &remote_path) {
133  return (preload_cache) ? FileExists(remote_path)
134  : spooler->Peek(remote_path);
135 }
136 
137 static bool Peek(const shash::Any &remote_hash) {
138  return Peek(MakePath(remote_hash));
139 }
140 
141 static void ReportDownloadError(const download::JobInfo &download_job) {
142  const download::Failures error_code = download_job.error_code();
143  const int http_code = download_job.http_code();
144  const std::string url = *download_job.url();
145 
146  LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
147  url.c_str(), error_code, download::Code2Ascii(error_code));
148 
149  switch (error_code) {
152  LogCvmfs(kLogCvmfs, kLogStderr, "DNS lookup for Stratum 0 failed - "
153  "please check the network connection");
154  break;
155 
157  LogCvmfs(kLogCvmfs, kLogStderr, "unexpected HTTP error code %d - "
158  "please check the stratum 0 health", http_code);
159  break;
160 
162  LogCvmfs(kLogCvmfs, kLogStderr, "downloaded corrupted data - "
163  "please check the stratum 0 health");
164  break;
165 
166  default:
167  if (download::IsProxyTransferError(error_code) ||
168  download::IsHostTransferError(error_code)) {
169  LogCvmfs(kLogCvmfs, kLogStderr, "couldn't reach Stratum 0 - "
170  "please check the network connection");
171  } else {
172  LogCvmfs(kLogCvmfs, kLogStderr, "unexpected error - feel free to file "
173  "a bug report");
174  }
175  }
176 }
177 
178 
179 static void Store(
180  const string &local_path,
181  const string &remote_path,
182  const bool compressed_src)
183 {
184  if (preload_cache) {
185  if (!compressed_src) {
186  int retval = rename(local_path.c_str(), remote_path.c_str());
187  if (retval != 0) {
188  PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
189  remote_path.c_str());
190  }
191  } else {
192  // compressed input
193  string tmp_dest;
194  FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
195  if (fdest == NULL) {
196  PANIC(kLogStderr, "Failed to create temporary file '%s'",
197  remote_path.c_str());
198  }
199  int retval = zlib::DecompressPath2File(local_path, fdest);
200  if (!retval) {
201  PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
202  remote_path.c_str());
203  }
204  fclose(fdest);
205  retval = rename(tmp_dest.c_str(), remote_path.c_str());
206  assert(retval == 0);
207  unlink(local_path.c_str());
208  }
209  } else {
210  spooler->Upload(local_path, remote_path);
211  }
212 }
213 
214 static void Store(
215  const string &local_path,
216  const shash::Any &remote_hash,
217  const bool compressed_src = true)
218 {
219  Store(local_path, MakePath(remote_hash), compressed_src);
220 }
221 
222 
223 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
224  const std::string &dest_path, const bool compress) {
225  string tmp_file;
226  FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
227  assert(ftmp);
228  int retval;
229  if (compress) {
230  shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused
231  retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
232  } else {
233  retval = CopyMem2File(buffer, size, ftmp);
234  }
235  assert(retval);
236  fclose(ftmp);
237  Store(tmp_file, dest_path, true);
238 }
239 
240 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
241  const shash::Any &dest_hash, const bool compress) {
242  StoreBuffer(buffer, size, MakePath(dest_hash), compress);
243 }
244 
245 
246 static void WaitForStorage() {
247  if (!preload_cache) spooler->WaitForUpload();
248 }
249 
250 
253 };
254 
255 static void *MainWorker(void *data) {
256  MainWorkerContext *mwc = static_cast<MainWorkerContext*>(data);
257  download::DownloadManager *download_manager = mwc->download_manager;
258 
259  while (1) {
260  ChunkJob next_chunk;
261  {
263  ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
264  }
265  if (next_chunk.IsTerminateJob())
266  break;
267 
268  shash::Any chunk_hash = next_chunk.hash();
269  zlib::Algorithms compression_alg = next_chunk.compression_alg;
270  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
271  chunk_hash.ToString().c_str());
272 
273  if (!Peek(chunk_hash)) {
274  string tmp_file;
275  FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
276  &tmp_file);
277  string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
278  cvmfs::FileSink filesink(fchunk);
279  download::JobInfo download_chunk(&url_chunk, false, false,
280  &chunk_hash, &filesink);
281 
282  const download::Failures download_result =
283  download_manager->Fetch(&download_chunk);
284  if (download_result != download::kFailOk) {
285  ReportDownloadError(download_chunk);
286  PANIC(kLogStderr, "Download error");
287  }
288  fclose(fchunk);
289  Store(tmp_file, chunk_hash,
290  (compression_alg == zlib::kZlibDefault) ? true : false);
291  atomic_inc64(&overall_new);
292  }
293  if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
295  atomic_dec64(&chunk_queue);
296  }
297  return NULL;
298 }
299 
300 
301 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
302  const std::string &path) {
303  assert(catalog);
304 
305  // Previous catalogs
306  if (pull_history) {
307  shash::Any previous_catalog = catalog->GetPreviousRevision();
308  if (previous_catalog.IsNull()) {
309  LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history");
310  } else {
311  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
312  previous_catalog.ToString().c_str());
313  bool retval = Pull(previous_catalog, path);
314  if (!retval)
315  return false;
316  }
317  }
318 
319  // Nested catalogs (in a nested code block because goto fail...)
320  {
321  const catalog::Catalog::NestedCatalogList nested_catalogs =
322  catalog->ListOwnNestedCatalogs();
323  for (catalog::Catalog::NestedCatalogList::const_iterator i =
324  nested_catalogs.begin(), iEnd = nested_catalogs.end();
325  i != iEnd; ++i)
326  {
327  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
328  i->mountpoint.c_str());
329  bool retval = Pull(i->hash, i->mountpoint.ToString());
330  if (!retval)
331  return false;
332  }
333  }
334 
335  return true;
336 }
337 
338 bool CommandPull::Pull(const shash::Any &catalog_hash,
339  const std::string &path) {
340  int retval;
341  download::Failures dl_retval;
342  assert(shash::kSuffixCatalog == catalog_hash.suffix);
343 
344  // Check if the catalog already exists
345  if (Peek(catalog_hash)) {
346  // Preload: dirtab changed
348  if (!preload_cache) {
349  PANIC(kLogStderr, "to be implemented: -t without -c");
350  }
352  path, MakePath(catalog_hash), catalog_hash);
353  if (catalog == NULL) {
354  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
355  catalog_hash.ToString().c_str());
356  return false;
357  }
358  bool retval = PullRecursion(catalog, path);
359  delete catalog;
360  return retval;
361  }
362 
363  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
364  return true;
365  }
366 
367  // Check if the catalog matches the pathfilter
368  if (path != "" && // necessary to load the root catalog
369  pathfilter &&
370  !pathfilter->IsMatching(path)) {
371  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog in '%s' does not match"
372  " the path specification", path.c_str());
373  return true;
374  }
375 
376  int64_t gauge_chunks = atomic_read64(&overall_chunks);
377  int64_t gauge_new = atomic_read64(&overall_new);
378 
379  // Download and uncompress catalog
380  shash::Any chunk_hash;
381  zlib::Algorithms compression_alg;
382  catalog::Catalog *catalog = NULL;
383  string file_catalog;
384  string file_catalog_vanilla;
385  FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
386  &file_catalog);
387  if (!fcatalog) {
388  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
389  return false;
390  }
391  fclose(fcatalog);
392  FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
393  &file_catalog_vanilla);
394  if (!fcatalog_vanilla) {
395  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
396  unlink(file_catalog.c_str());
397  return false;
398  }
399  const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
400  cvmfs::FileSink filesink(fcatalog_vanilla);
401  download::JobInfo download_catalog(&url_catalog, false, false,
402  &catalog_hash, &filesink);
403  dl_retval = download_manager()->Fetch(&download_catalog);
404  fclose(fcatalog_vanilla);
405  if (dl_retval != download::kFailOk) {
406  if (path == "" && is_garbage_collectable) {
407  LogCvmfs(kLogCvmfs, kLogStdout, "skipping missing root catalog %s - "
408  "probably sweeped by garbage collection",
409  catalog_hash.ToString().c_str());
410  goto pull_skip;
411  } else {
412  ReportDownloadError(download_catalog);
413  goto pull_cleanup;
414  }
415  }
416  retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
417  if (!retval) {
418  LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
419  file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
420  goto pull_cleanup;
421  }
422  if (path.empty() && reflog != NULL) {
423  if (!reflog->AddCatalog(catalog_hash)) {
424  LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
425  goto pull_cleanup;
426  }
427  }
428 
429  catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
430  if (catalog == NULL) {
431  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
432  catalog_hash.ToString().c_str());
433  goto pull_cleanup;
434  }
435 
436  // Always pull the HEAD root catalog and nested catalogs
437  if (apply_timestamp_threshold && (path == "") &&
438  (catalog->GetLastModified() < timestamp_threshold))
439  {
441  " Pruning at root catalog from %s due to threshold at %s",
442  StringifyTime(catalog->GetLastModified(), false).c_str(),
443  StringifyTime(timestamp_threshold, false).c_str());
444  delete catalog;
445  goto pull_skip;
446  }
448 
449  // Traverse the chunks
451  " Processing chunks [%" PRIu64 " registered chunks]: ",
452  catalog->GetNumChunks());
453  retval = catalog->AllChunksBegin();
454  if (!retval) {
455  LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
456  goto pull_cleanup;
457  }
458  while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
459  ChunkJob next_chunk(chunk_hash, compression_alg);
460  WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
461  atomic_inc64(&chunk_queue);
462  }
463  catalog->AllChunksEnd();
464  while (atomic_read64(&chunk_queue) != 0) {
465  SafeSleepMs(100);
466  }
467  LogCvmfs(kLogCvmfs, kLogStdout, " fetched %" PRId64 " new chunks out of "
468  "%" PRId64 " unique chunks",
469  atomic_read64(&overall_new)-gauge_new,
470  atomic_read64(&overall_chunks)-gauge_chunks);
471 
472  retval = PullRecursion(catalog, path);
473 
474  delete catalog;
475  unlink(file_catalog.c_str());
476  WaitForStorage();
477  if (!retval)
478  return false;
479  Store(file_catalog_vanilla, catalog_hash);
480  return true;
481 
482  pull_cleanup:
483  delete catalog;
484  unlink(file_catalog.c_str());
485  unlink(file_catalog_vanilla.c_str());
486  return false;
487 
488  pull_skip:
489  unlink(file_catalog.c_str());
490  unlink(file_catalog_vanilla.c_str());
491  return true;
492 }
493 
494 
496  int retval;
497  manifest::Failures m_retval;
498  download::Failures dl_retval;
499  unsigned timeout = 60;
500  int fd_lockfile = -1;
501  string spooler_definition_str;
503  shash::Any meta_info_hash;
504  string meta_info;
505 
506  // Option parsing
507  if (args.find('c') != args.end())
508  preload_cache = true;
509  if (args.find('l') != args.end()) {
510  unsigned log_level =
511  kLogLevel0 << String2Uint64(*args.find('l')->second);
512  if (log_level > kLogNone) {
513  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
514  return 1;
515  }
516  SetLogVerbosity(static_cast<LogLevels>(log_level));
517  }
518  stratum0_url = args.find('u')->second;
519  temp_dir = args.find('x')->second;
520  if (preload_cache) {
521  preload_cachedir = new string(*args.find('r')->second);
522  } else {
523  spooler_definition_str = *args.find('r')->second;
524  }
525  string master_keys = *args.find('k')->second;
526  if (DirectoryExists(master_keys))
527  master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
528  const string repository_name = *args.find('m')->second;
529  if (args.find('n') != args.end())
530  num_parallel = String2Uint64(*args.find('n')->second);
531  if (args.find('t') != args.end())
532  timeout = String2Uint64(*args.find('t')->second);
533  if (args.find('a') != args.end())
534  retries = String2Uint64(*args.find('a')->second);
535  if (args.find('d') != args.end()) {
536  pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
538  }
539  if (args.find('p') != args.end())
540  pull_history = true;
541  if (args.find('z') != args.end())
543  if (args.find('w') != args.end())
544  stratum1_url = args.find('w')->second;
545  if (args.find('i') != args.end())
546  initial_snapshot = true;
547  shash::Any reflog_hash;
548  string reflog_chksum_path;
549  if (args.find('R') != args.end()) {
550  reflog_chksum_path = *args.find('R')->second;
551  if (!initial_snapshot) {
552  if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
553  LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
554  return 1;
555  }
556  }
557  }
558  if (args.find('Z') != args.end()) {
559  timestamp_threshold = String2Int64(*args.find('Z')->second);
560  }
561 
562  if (!preload_cache && stratum1_url.Get() == NULL) {
563  LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
564  return 1;
565  }
566 
567  typedef std::vector<history::History::Tag> TagVector;
568  TagVector historic_tags;
569 
570  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
571  stratum0_url->c_str());
572 
573  int result = 1;
574 
575  // Initialization
576  atomic_init64(&overall_chunks);
577  atomic_init64(&overall_new);
578  atomic_init64(&chunk_queue);
579 
580  const bool follow_redirects = false;
581  const unsigned max_pool_handles = num_parallel+1;
582  const string proxy =
583  (args.find('@') != args.end()) ? *args.find('@')->second : "";
584 
585  if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
586  return 1;
587  }
588 
589  if (!this->InitSignatureManager(master_keys)) {
590  LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures");
591  return 1;
592  } else {
594  "CernVM-FS: using public key(s) %s",
595  JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
596  }
597 
598  unsigned current_group;
599  vector< vector<download::DownloadManager::ProxyInfo> > proxies;
600  download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
601  if (proxies.size() > 0) {
602  string proxy_str = "\nWarning, replicating through proxies\n";
603  proxy_str += " Load-balance groups:\n";
604  for (unsigned i = 0; i < proxies.size(); ++i) {
605  vector<string> urls;
606  for (unsigned j = 0; j < proxies[i].size(); ++j) {
607  urls.push_back(proxies[i][j].url);
608  }
609  proxy_str +=
610  " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") + "\n";
611  }
612  proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] " +
613  proxies[current_group][0].url;
614  LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
615  }
616  download_manager()->SetTimeout(timeout, timeout);
617  download_manager()->SetRetryParameters(retries, 500, 2000);
618  download_manager()->Spawn();
619 
620  // init the download helper
621  ObjectFetcher object_fetcher(repository_name,
622  *stratum0_url,
623  *temp_dir,
624  download_manager(),
625  signature_manager());
626 
627  pthread_t *workers =
628  reinterpret_cast<pthread_t *>(smalloc(sizeof(pthread_t) * num_parallel));
629 
630  // Check if we have a replica-ready server
631  const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
632  download::JobInfo download_sentinel(&url_sentinel, false);
633  retval = download_manager()->Fetch(&download_sentinel);
634  if (retval != download::kFailOk) {
635  if (download_sentinel.http_code() == 404) {
637  "This is not a CernVM-FS server for replication");
638  } else {
640  "Failed to contact stratum 0 server (%d - %s)",
641  retval, download::Code2Ascii(download_sentinel.error_code()));
642  }
643  goto fini;
644  }
645 
646  m_retval = FetchRemoteManifestEnsemble(*stratum0_url,
647  repository_name,
648  &ensemble);
649  if (m_retval != manifest::kFailOk) {
650  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
651  m_retval, manifest::Code2Ascii(m_retval));
652  goto fini;
653  }
654 
655  // Get meta info
656  meta_info_hash = ensemble.manifest->meta_info();
657  if (!meta_info_hash.IsNull()) {
658  meta_info_hash = ensemble.manifest->meta_info();
659  const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
660  cvmfs::MemSink metainfo_memsink;
661  download::JobInfo download_metainfo(&url, true, false, &meta_info_hash,
662  &metainfo_memsink);
663  dl_retval = download_manager()->Fetch(&download_metainfo);
664  if (dl_retval != download::kFailOk) {
665  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
666  dl_retval, download::Code2Ascii(dl_retval));
667  goto fini;
668  }
669  meta_info = string(reinterpret_cast<char*>(metainfo_memsink.data()),
670  metainfo_memsink.pos());
671  }
672 
674 
675  // Manifest available, now the spooler's hash algorithm can be determined
676  // That doesn't actually matter because the replication does no re-hashing
677  if (!preload_cache) {
679  spooler_definition(spooler_definition_str,
680  ensemble.manifest->GetHashAlgorithm());
681  spooler = upload::Spooler::Construct(spooler_definition);
682  assert(spooler);
683  spooler->RegisterListener(&SpoolerOnUpload);
684  }
685 
686  // Open the reflog for modification
687  if (!preload_cache) {
688  if (initial_snapshot) {
689  LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
690  repository_name.c_str());
691  reflog = CreateEmptyReflog(*temp_dir, repository_name);
692  if (reflog == NULL) {
693  LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
694  goto fini;
695  }
696  } else {
697  ObjectFetcher object_fetcher_stratum1(repository_name,
698  *stratum1_url,
699  *temp_dir,
700  download_manager(),
701  signature_manager());
702 
703  if (!reflog_hash.IsNull()) {
704  reflog =
705  FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash);
706  assert(reflog != NULL);
707  } else {
708  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
709  if (spooler->Peek(".cvmfsreflog")) {
711  "no reflog hash specified but reflog is present");
712  goto fini;
713  }
714  }
715  }
716 
717  if (reflog != NULL) {
719  // On commit: use manifest's hash algorithm
720  reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
721  }
722  }
723 
724  // Fetch tag list.
725  // If we are just preloading the cache it is not strictly necessarily to
726  // download the entire tag list
727  // TODO(molina): add user option to download tags when preloading the cache
728  if (!ensemble.manifest->history().IsNull() && !preload_cache) {
729  shash::Any history_hash = ensemble.manifest->history();
730  const string history_url = *stratum0_url + "/data/"
731  + history_hash.MakePath();
732  const string history_path = *temp_dir + "/" + history_hash.ToString();
733 
734  cvmfs::PathSink pathsink(history_path);
735  download::JobInfo download_history(&history_url, false, false,
736  &history_hash, &pathsink);
737  dl_retval = download_manager()->Fetch(&download_history);
738  if (dl_retval != download::kFailOk) {
739  ReportDownloadError(download_history);
740  goto fini;
741  }
742  const std::string history_db_path = history_path + ".uncompressed";
743  retval = zlib::DecompressPath2Path(history_path, history_db_path);
744  assert(retval);
745  history::History *tag_db = history::SqliteHistory::Open(history_db_path);
746  if (NULL == tag_db) {
747  LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
748  history_db_path.c_str());
749  unlink(history_db_path.c_str());
750  goto fini;
751  }
752  retval = tag_db->List(&historic_tags);
753  delete tag_db;
754  unlink(history_db_path.c_str());
755  if (!retval) {
756  LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
757  history_db_path.c_str());
758  goto fini;
759  }
760 
761  LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots",
762  historic_tags.size());
763  // TODO(jblomer): We should repliacte the previous history dbs, too,
764  // in order to avoid races on fail-over between non-synchronized stratum 1s
765  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
766  Store(history_path, history_hash);
767  WaitForStorage();
768  unlink(history_path.c_str());
769  if (reflog != NULL && !reflog->AddHistory(history_hash)) {
770  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
771  goto fini;
772  }
773  }
774 
775  // Starting threads
777  LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
778  MainWorkerContext mwc;
779  mwc.download_manager = download_manager();
780  for (unsigned i = 0; i < num_parallel; ++i) {
781  int retval = pthread_create(&workers[i], NULL, MainWorker,
782  static_cast<void*>(&mwc));
783  assert(retval == 0);
784  }
785 
786  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
787  retval = Pull(ensemble.manifest->catalog_hash(), "");
788  pull_history = false;
789  if (!historic_tags.empty()) {
790  LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
791  }
792  for (TagVector::const_iterator i = historic_tags.begin(),
793  iend = historic_tags.end();
794  i != iend; ++i)
795  {
796  if (Peek(i->root_hash))
797  continue;
798  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
799  i->name.c_str());
801  bool retval2 = Pull(i->root_hash, "");
802  retval = retval && retval2;
803  }
804 
805  // Stopping threads
806  LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
807  for (unsigned i = 0; i < num_parallel; ++i) {
808  ChunkJob terminate_workers;
809  WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
810  }
811  for (unsigned i = 0; i < num_parallel; ++i) {
812  int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
813  assert(retval == 0);
814  }
816 
817  if (!retval)
818  goto fini;
819 
820  // Upload manifest ensemble
821  {
822  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
823  WaitForStorage();
824 
825  if (!Peek(ensemble.manifest->certificate())) {
826  StoreBuffer(ensemble.cert_buf,
827  ensemble.cert_size,
828  ensemble.manifest->certificate(), true);
829  }
830  if (reflog != NULL &&
831  !reflog->AddCertificate(ensemble.manifest->certificate())) {
832  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
833  goto fini;
834  }
835  if (!meta_info_hash.IsNull()) {
836  const unsigned char *info = reinterpret_cast<const unsigned char *>(
837  meta_info.data());
838  StoreBuffer(info, meta_info.size(), meta_info_hash, true);
839  if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
840  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
841  goto fini;
842  }
843  }
844 
845  // Create alternative bootstrapping symlinks for VOMS secured repos
846  if (ensemble.manifest->has_alt_catalog_path()) {
847  const bool success =
848  spooler->PlaceBootstrappingShortcut(ensemble.manifest->certificate()) &&
849  spooler->PlaceBootstrappingShortcut(ensemble.manifest->catalog_hash())
850  && (ensemble.manifest->history().IsNull() ||
851  spooler->PlaceBootstrappingShortcut(ensemble.manifest->history()))
852  && (meta_info_hash.IsNull() ||
853  spooler->PlaceBootstrappingShortcut(meta_info_hash));
854 
855  if (!success) {
857  "failed to place root catalog bootstrapping symlinks");
858  goto fini;
859  }
860  }
861 
862  // upload Reflog database
863  if (!preload_cache && reflog != NULL) {
866  string reflog_path = reflog->database_file();
867  delete reflog;
868  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
869  WaitForStorage(); // Reduce the duration of reflog /wo checksum
870  spooler->UploadReflog(reflog_path);
871  spooler->WaitForUpload();
872  unlink(reflog_path.c_str());
873  if (spooler->GetNumberOfErrors()) {
874  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
875  spooler->GetNumberOfErrors());
876  goto fini;
877  }
878  assert(!reflog_chksum_path.empty());
879  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
880  }
881 
882  if (preload_cache) {
883  bool retval =
884  ensemble.manifest->ExportBreadcrumb(*preload_cachedir, 0660);
885  assert(retval);
886  } else {
887  // pkcs#7 structure contains content + certificate + signature
888  // So there is no race with whitelist and pkcs7 signature being out of
889  // sync
890  if (ensemble.whitelist_pkcs7_buf) {
892  ".cvmfswhitelist.pkcs7", false);
893  }
894  StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
895  ".cvmfswhitelist", false);
897  ".cvmfspublished", false);
898  }
899  LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64,
900  ensemble.manifest->revision());
901  }
902 
903  WaitForStorage();
904  LogCvmfs(kLogCvmfs, kLogStdout, "Fetched %" PRId64 " new chunks out of %"
905  PRId64 " processed chunks",
906  atomic_read64(&overall_new), atomic_read64(&overall_chunks));
907  result = 0;
908 
909  fini:
910  if (fd_lockfile >= 0)
911  UnlockFile(fd_lockfile);
912  free(workers);
913  delete spooler;
914  delete pathfilter;
915  return result;
916 }
917 
918 } // namespace swissknife
int return_code
the return value of the spooler operation
void SetLogVerbosity(const LogLevels max_level)
Definition: logging.cc:261
std::string database_file() const
Definition: reflog.cc:337
const char kSuffixNone
Definition: compat.h:163
bool IsNull() const
Definition: hash.h:383
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
Definition: catalog.cc:427
bool AddHistory(const shash::Any &history)
Definition: reflog.cc:134
unsigned char * raw_manifest_buf
int64_t atomic_int64
Definition: atomic.h:18
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
bool IsValid() const
Definition: dirtab.h:127
static SqliteHistory * Open(const std::string &file_name)
unsigned char * data()
Definition: sink_mem.h:122
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
Definition: reflog.cc:47
#define PANIC(...)
Definition: exception.h:29
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1016
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:343
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
Definition: compression.cc:86
const std::string * url() const
Definition: jobinfo.h:175
bool AllChunksBegin()
Definition: catalog.cc:422
shash::Any GetPreviousRevision() const
Definition: catalog.cc:555
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
Definition: manifest.h:138
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
Algorithms algorithm
Definition: hash.h:125
string StringifyTime(const time_t seconds, const bool utc)
Definition: string.cc:105
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
static void WaitForStorage()
bool AddCatalog(const shash::Any &catalog)
Definition: reflog.cc:128
unsigned char digest[digest_size_]
Definition: hash.h:124
char algorithm
uint64_t revision() const
Definition: manifest.h:129
int http_code() const
Definition: jobinfo.h:205
static void SpoolerOnUpload(const upload::SpoolerResult &result)
Algorithms
Definition: hash.h:41
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
Definition: posix.cc:802
uint64_t GetLastModified() const
Definition: catalog.cc:533
int64_t String2Int64(const string &value)
Definition: string.cc:240
const char * Code2Ascii(const Failures error)
unsigned GetDigestSize() const
Definition: hash.h:168
Algorithms
Definition: compression.h:44
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:246
bool AllChunksEnd()
Definition: catalog.cc:433
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
bool AddMetainfo(const shash::Any &metainfo)
Definition: reflog.cc:140
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:308
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
Definition: hash.h:54
bool DecompressPath2File(const string &src, FILE *fdest)
Definition: compression.cc:667
shash::Any certificate() const
Definition: manifest.h:134
bool IsProxyTransferError(const Failures error)
static void ReportDownloadError(const download::JobInfo &download_job)
void BeginTransaction()
Definition: reflog.cc:295
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string &dest_path, const bool compress)
unsigned char digest[20]
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:322
shash::Any catalog_hash() const
Definition: manifest.h:132
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool garbage_collectable() const
Definition: manifest.h:137
Failures error_code() const
Definition: jobinfo.h:204
void DropDatabaseFileOwnership()
Definition: reflog.cc:313
bool DirectoryExists(const std::string &path)
Definition: posix.cc:824
bool AddCertificate(const shash::Any &certificate)
Definition: reflog.cc:121
char Suffix
Definition: hash.h:114
shash::Algorithms GetHashAlgorithm() const
Definition: manifest.h:91
std::string MakePathWithoutSuffix() const
Definition: hash.h:335
size_t pos()
Definition: sink_mem.h:121
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:208
const NestedCatalogList ListOwnNestedCatalogs() const
Definition: catalog.cc:656
uint64_t String2Uint64(const string &value)
Definition: string.cc:246
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Failures Fetch(JobInfo *info)
Definition: download.cc:2001
shash::Any history() const
Definition: manifest.h:135
Definition: mutex.h:42
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
Definition: compression.cc:678
uint64_t GetNumChunks() const
Definition: catalog.cc:541
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
void CommitTransaction()
Definition: reflog.cc:301
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
Definition: catalog.cc:29
const unsigned kMaxDigestSize
Definition: hash.h:72
bool DecompressPath2Path(const string &src, const string &dest)
Definition: compression.cc:381
std::string meta_info() const
Definition: repository.h:128
const int kLogVerboseMsg
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2049
bool IsHostTransferError(const Failures error)
Suffix suffix
Definition: hash.h:126
std::string MakePath() const
Definition: hash.h:316
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:64
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1135
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
shash::Any meta_info() const
Definition: manifest.h:139
void UnlockFile(const int filedes)
Definition: posix.cc:1006
const char * Code2Ascii(const Failures error)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528