CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_pull.cc
Go to the documentation of this file.
1 
8 #define _FILE_OFFSET_BITS 64
9 #define __STDC_FORMAT_MACROS
10 
11 #include "cvmfs_config.h"
12 #include "swissknife_pull.h"
13 
14 #include <inttypes.h>
15 #include <pthread.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18 
19 #include <cstdlib>
20 #include <cstring>
21 #include <string>
22 #include <vector>
23 
24 #include "atomic.h"
25 #include "catalog.h"
26 #include "compression.h"
27 #include "download.h"
28 #include "hash.h"
29 #include "history_sqlite.h"
30 #include "logging.h"
31 #include "manifest.h"
32 #include "manifest_fetch.h"
33 #include "object_fetcher.h"
35 #include "reflog.h"
36 #include "signature.h"
37 #include "smalloc.h"
38 #include "upload.h"
39 #include "util/exception.h"
40 #include "util/posix.h"
41 #include "util/shared_ptr.h"
42 #include "util/string.h"
43 #include "util_concurrency.h"
44 
45 using namespace std; // NOLINT
46 
47 namespace swissknife {
48 
49 namespace {
50 
52 
57 class ChunkJob {
58  public:
60  : suffix(shash::kSuffixNone)
61  , hash_algorithm(shash::kAny)
62  , compression_alg(zlib::kZlibDefault) {}
63 
64  ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
65  : suffix(hash.suffix)
66  , hash_algorithm(hash.algorithm)
67  , compression_alg(compression_alg)
68  {
69  memcpy(digest, hash.digest, hash.GetDigestSize());
70  }
71 
72  bool IsTerminateJob() const {
73  return (hash_algorithm == shash::kAny);
74  }
75 
76  shash::Any hash() const {
77  assert(!IsTerminateJob());
78  return shash::Any(hash_algorithm,
79  digest,
80  suffix);
81  }
82 
86  unsigned char digest[shash::kMaxDigestSize];
87 };
88 
89 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
90  unlink(result.local_path.c_str());
91  if (result.return_code != 0) {
92  PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
93  result.local_path.c_str(), result.content_hash.ToString().c_str());
94  }
95 }
96 
100 unsigned num_parallel = 1;
101 bool pull_history = false;
103 uint64_t timestamp_threshold = 0;
105 bool initial_snapshot = false;
106 upload::Spooler *spooler = NULL;
107 int pipe_chunks[2];
108 // required for concurrent reading
109 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
110 unsigned retries = 3;
115 bool preload_cache = false;
116 string *preload_cachedir = NULL;
119 
120 } // anonymous namespace
121 
122 
123 static std::string MakePath(const shash::Any &hash) {
124  return (preload_cache)
125  ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
126  : "data/" + hash.MakePath();
127 }
128 
129 
130 static bool Peek(const string &remote_path) {
131  return (preload_cache) ? FileExists(remote_path)
132  : spooler->Peek(remote_path);
133 }
134 
135 static bool Peek(const shash::Any &remote_hash) {
136  return Peek(MakePath(remote_hash));
137 }
138 
139 static void ReportDownloadError(const download::JobInfo &download_job) {
140  const download::Failures error_code = download_job.error_code;
141  const int http_code = download_job.http_code;
142  const std::string url = *download_job.url;
143 
144  LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
145  url.c_str(), error_code, download::Code2Ascii(error_code));
146 
147  switch (error_code) {
150  LogCvmfs(kLogCvmfs, kLogStderr, "DNS lookup for Stratum 0 failed - "
151  "please check the network connection");
152  break;
153 
155  LogCvmfs(kLogCvmfs, kLogStderr, "unexpected HTTP error code %d - "
156  "please check the stratum 0 health", http_code);
157  break;
158 
160  LogCvmfs(kLogCvmfs, kLogStderr, "downloaded corrupted data - "
161  "please check the stratum 0 health");
162  break;
163 
164  default:
165  if (download::IsProxyTransferError(error_code) ||
166  download::IsHostTransferError(error_code)) {
167  LogCvmfs(kLogCvmfs, kLogStderr, "couldn't reach Stratum 0 - "
168  "please check the network connection");
169  } else {
170  LogCvmfs(kLogCvmfs, kLogStderr, "unexpected error - feel free to file "
171  "a bug report");
172  }
173  }
174 }
175 
176 
177 static void Store(
178  const string &local_path,
179  const string &remote_path,
180  const bool compressed_src)
181 {
182  if (preload_cache) {
183  if (!compressed_src) {
184  int retval = rename(local_path.c_str(), remote_path.c_str());
185  if (retval != 0) {
186  PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
187  remote_path.c_str());
188  }
189  } else {
190  // compressed input
191  string tmp_dest;
192  FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
193  if (fdest == NULL) {
194  PANIC(kLogStderr, "Failed to create temporary file '%s'",
195  remote_path.c_str());
196  }
197  int retval = zlib::DecompressPath2File(local_path, fdest);
198  if (!retval) {
199  PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
200  remote_path.c_str());
201  }
202  fclose(fdest);
203  retval = rename(tmp_dest.c_str(), remote_path.c_str());
204  assert(retval == 0);
205  unlink(local_path.c_str());
206  }
207  } else {
208  spooler->Upload(local_path, remote_path);
209  }
210 }
211 
212 static void Store(
213  const string &local_path,
214  const shash::Any &remote_hash,
215  const bool compressed_src = true)
216 {
217  Store(local_path, MakePath(remote_hash), compressed_src);
218 }
219 
220 
221 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
222  const std::string dest_path, const bool compress) {
223  string tmp_file;
224  FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
225  assert(ftmp);
226  int retval;
227  if (compress) {
228  shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unsused
229  retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
230  } else {
231  retval = CopyMem2File(buffer, size, ftmp);
232  }
233  assert(retval);
234  fclose(ftmp);
235  Store(tmp_file, dest_path, true);
236 }
237 
238 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
239  const shash::Any &dest_hash, const bool compress) {
240  StoreBuffer(buffer, size, MakePath(dest_hash), compress);
241 }
242 
243 
244 static void WaitForStorage() {
245  if (!preload_cache) spooler->WaitForUpload();
246 }
247 
248 
251 };
252 
253 static void *MainWorker(void *data) {
254  MainWorkerContext *mwc = static_cast<MainWorkerContext*>(data);
255  download::DownloadManager *download_manager = mwc->download_manager;
256 
257  while (1) {
258  ChunkJob next_chunk;
259  {
261  ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
262  }
263  if (next_chunk.IsTerminateJob())
264  break;
265 
266  shash::Any chunk_hash = next_chunk.hash();
267  zlib::Algorithms compression_alg = next_chunk.compression_alg;
268  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
269  chunk_hash.ToString().c_str());
270 
271  if (!Peek(chunk_hash)) {
272  string tmp_file;
273  FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
274  &tmp_file);
275  assert(fchunk);
276  string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
277  download::JobInfo download_chunk(&url_chunk, false, false, fchunk,
278  &chunk_hash);
279 
280  const download::Failures download_result =
281  download_manager->Fetch(&download_chunk);
282  if (download_result != download::kFailOk) {
283  ReportDownloadError(download_chunk);
284  PANIC(kLogStderr, "Download error");
285  }
286  fclose(fchunk);
287  Store(tmp_file, chunk_hash,
288  (compression_alg == zlib::kZlibDefault) ? true : false);
289  atomic_inc64(&overall_new);
290  }
291  if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
293  atomic_dec64(&chunk_queue);
294  }
295  return NULL;
296 }
297 
298 
299 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
300  const std::string &path) {
301  assert(catalog);
302 
303  // Previous catalogs
304  if (pull_history) {
305  shash::Any previous_catalog = catalog->GetPreviousRevision();
306  if (previous_catalog.IsNull()) {
307  LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history");
308  } else {
309  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
310  previous_catalog.ToString().c_str());
311  bool retval = Pull(previous_catalog, path);
312  if (!retval)
313  return false;
314  }
315  }
316 
317  // Nested catalogs (in a nested code block because goto fail...)
318  {
319  const catalog::Catalog::NestedCatalogList nested_catalogs =
320  catalog->ListOwnNestedCatalogs();
321  for (catalog::Catalog::NestedCatalogList::const_iterator i =
322  nested_catalogs.begin(), iEnd = nested_catalogs.end();
323  i != iEnd; ++i)
324  {
325  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
326  i->mountpoint.c_str());
327  bool retval = Pull(i->hash, i->mountpoint.ToString());
328  if (!retval)
329  return false;
330  }
331  }
332 
333  return true;
334 }
335 
336 bool CommandPull::Pull(const shash::Any &catalog_hash,
337  const std::string &path) {
338  int retval;
339  download::Failures dl_retval;
340  assert(shash::kSuffixCatalog == catalog_hash.suffix);
341 
342  // Check if the catalog already exists
343  if (Peek(catalog_hash)) {
344  // Preload: dirtab changed
346  if (!preload_cache) {
347  PANIC(kLogStderr, "to be implemented: -t without -c");
348  }
350  path, MakePath(catalog_hash), catalog_hash);
351  if (catalog == NULL) {
352  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
353  catalog_hash.ToString().c_str());
354  return false;
355  }
356  bool retval = PullRecursion(catalog, path);
357  delete catalog;
358  return retval;
359  }
360 
361  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
362  return true;
363  }
364 
365  // Check if the catalog matches the pathfilter
366  if (path != "" && // necessary to load the root catalog
367  pathfilter &&
368  !pathfilter->IsMatching(path)) {
369  LogCvmfs(kLogCvmfs, kLogStdout, " Catalog in '%s' does not match"
370  " the path specification", path.c_str());
371  return true;
372  }
373 
374  int64_t gauge_chunks = atomic_read64(&overall_chunks);
375  int64_t gauge_new = atomic_read64(&overall_new);
376 
377  // Download and uncompress catalog
378  shash::Any chunk_hash;
379  zlib::Algorithms compression_alg;
380  catalog::Catalog *catalog = NULL;
381  string file_catalog;
382  string file_catalog_vanilla;
383  FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
384  &file_catalog);
385  if (!fcatalog) {
386  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
387  return false;
388  }
389  fclose(fcatalog);
390  FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
391  &file_catalog_vanilla);
392  if (!fcatalog_vanilla) {
393  LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
394  unlink(file_catalog.c_str());
395  return false;
396  }
397  const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
398  download::JobInfo download_catalog(&url_catalog, false, false,
399  fcatalog_vanilla, &catalog_hash);
400  dl_retval = download_manager()->Fetch(&download_catalog);
401  fclose(fcatalog_vanilla);
402  if (dl_retval != download::kFailOk) {
403  if (path == "" && is_garbage_collectable) {
404  LogCvmfs(kLogCvmfs, kLogStdout, "skipping missing root catalog %s - "
405  "probably sweeped by garbage collection",
406  catalog_hash.ToString().c_str());
407  goto pull_skip;
408  } else {
409  ReportDownloadError(download_catalog);
410  goto pull_cleanup;
411  }
412  }
413  retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
414  if (!retval) {
415  LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
416  file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
417  goto pull_cleanup;
418  }
419  if (path.empty() && reflog != NULL) {
420  if (!reflog->AddCatalog(catalog_hash)) {
421  LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
422  goto pull_cleanup;
423  }
424  }
425 
426  catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
427  if (catalog == NULL) {
428  LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
429  catalog_hash.ToString().c_str());
430  goto pull_cleanup;
431  }
432 
433  // Always pull the HEAD root catalog and nested catalogs
434  if (apply_timestamp_threshold && (path == "") &&
435  (catalog->GetLastModified() < timestamp_threshold))
436  {
438  " Pruning at root catalog from %s due to threshold at %s",
439  StringifyTime(catalog->GetLastModified(), false).c_str(),
440  StringifyTime(timestamp_threshold, false).c_str());
441  delete catalog;
442  goto pull_skip;
443  }
445 
446  // Traverse the chunks
448  " Processing chunks [%" PRIu64 " registered chunks]: ",
449  catalog->GetNumChunks());
450  retval = catalog->AllChunksBegin();
451  if (!retval) {
452  LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
453  goto pull_cleanup;
454  }
455  while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
456  ChunkJob next_chunk(chunk_hash, compression_alg);
457  WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
458  atomic_inc64(&chunk_queue);
459  }
460  catalog->AllChunksEnd();
461  while (atomic_read64(&chunk_queue) != 0) {
462  SafeSleepMs(100);
463  }
464  LogCvmfs(kLogCvmfs, kLogStdout, " fetched %" PRId64 " new chunks out of "
465  "%" PRId64 " unique chunks",
466  atomic_read64(&overall_new)-gauge_new,
467  atomic_read64(&overall_chunks)-gauge_chunks);
468 
469  retval = PullRecursion(catalog, path);
470 
471  delete catalog;
472  unlink(file_catalog.c_str());
473  WaitForStorage();
474  Store(file_catalog_vanilla, catalog_hash);
475  return true;
476 
477  pull_cleanup:
478  delete catalog;
479  unlink(file_catalog.c_str());
480  unlink(file_catalog_vanilla.c_str());
481  return false;
482 
483  pull_skip:
484  unlink(file_catalog.c_str());
485  unlink(file_catalog_vanilla.c_str());
486  return true;
487 }
488 
489 
491  int retval;
492  manifest::Failures m_retval;
493  download::Failures dl_retval;
494  unsigned timeout = 60;
495  int fd_lockfile = -1;
496  string spooler_definition_str;
498  shash::Any meta_info_hash;
499  string meta_info;
500 
501  // Option parsing
502  if (args.find('c') != args.end())
503  preload_cache = true;
504  if (args.find('l') != args.end()) {
505  unsigned log_level =
506  1 << (kLogLevel0 + String2Uint64(*args.find('l')->second));
507  if (log_level > kLogNone) {
508  LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
509  return 1;
510  }
511  SetLogVerbosity(static_cast<LogLevels>(log_level));
512  }
513  stratum0_url = args.find('u')->second;
514  temp_dir = args.find('x')->second;
515  if (preload_cache) {
516  preload_cachedir = new string(*args.find('r')->second);
517  } else {
518  spooler_definition_str = *args.find('r')->second;
519  }
520  string master_keys = *args.find('k')->second;
521  if (DirectoryExists(master_keys))
522  master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
523  const string repository_name = *args.find('m')->second;
524  string trusted_certs;
525  if (args.find('y') != args.end())
526  trusted_certs = *args.find('y')->second;
527  if (args.find('n') != args.end())
528  num_parallel = String2Uint64(*args.find('n')->second);
529  if (args.find('t') != args.end())
530  timeout = String2Uint64(*args.find('t')->second);
531  if (args.find('a') != args.end())
532  retries = String2Uint64(*args.find('a')->second);
533  if (args.find('d') != args.end()) {
534  pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
536  }
537  if (args.find('p') != args.end())
538  pull_history = true;
539  if (args.find('z') != args.end())
541  if (args.find('w') != args.end())
542  stratum1_url = args.find('w')->second;
543  if (args.find('i') != args.end())
544  initial_snapshot = true;
545  shash::Any reflog_hash;
546  string reflog_chksum_path;
547  if (args.find('R') != args.end()) {
548  reflog_chksum_path = *args.find('R')->second;
549  if (!initial_snapshot) {
550  if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
551  LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
552  return 1;
553  }
554  }
555  }
556  if (args.find('Z') != args.end()) {
557  timestamp_threshold = String2Int64(*args.find('Z')->second);
558  }
559 
560  if (!preload_cache && stratum1_url == NULL) {
561  LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
562  return 1;
563  }
564 
565  typedef std::vector<history::History::Tag> TagVector;
566  TagVector historic_tags;
567 
568  LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
569  stratum0_url->c_str());
570 
571  int result = 1;
572 
573  // Initialization
574  atomic_init64(&overall_chunks);
575  atomic_init64(&overall_new);
576  atomic_init64(&chunk_queue);
577 
578  const bool follow_redirects = false;
579  const unsigned max_pool_handles = num_parallel+1;
580 
581  if (!this->InitDownloadManager(follow_redirects, max_pool_handles)) {
582  return 1;
583  }
584 
585  if (!this->InitVerifyingSignatureManager(master_keys, trusted_certs)) {
586  LogCvmfs(kLogCvmfs, kLogStderr, "failed to initalize CVMFS signatures");
587  return 1;
588  } else {
590  "CernVM-FS: using public key(s) %s",
591  JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
592  if (!trusted_certs.empty()) {
594  "CernVM-FS: using trusted certificates in %s",
595  JoinStrings(SplitString(trusted_certs, ':'), ", ").c_str());
596  }
597  }
598 
599  unsigned current_group;
600  vector< vector<download::DownloadManager::ProxyInfo> > proxies;
601  download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
602  if (proxies.size() > 0) {
603  string proxy_str = "\nWarning, replicating through proxies\n";
604  proxy_str += " Load-balance groups:\n";
605  for (unsigned i = 0; i < proxies.size(); ++i) {
606  vector<string> urls;
607  for (unsigned j = 0; j < proxies[i].size(); ++j) {
608  urls.push_back(proxies[i][j].url);
609  }
610  proxy_str +=
611  " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") + "\n";
612  }
613  proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] " +
614  proxies[current_group][0].url;
615  LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
616  }
617  download_manager()->SetTimeout(timeout, timeout);
618  download_manager()->SetRetryParameters(retries, 500, 2000);
619  download_manager()->Spawn();
620 
621  // init the download helper
622  ObjectFetcher object_fetcher(repository_name,
623  *stratum0_url,
624  *temp_dir,
625  download_manager(),
626  signature_manager());
627 
628  pthread_t *workers =
629  reinterpret_cast<pthread_t *>(smalloc(sizeof(pthread_t) * num_parallel));
630 
631  // Check if we have a replica-ready server
632  const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
633  download::JobInfo download_sentinel(&url_sentinel, false);
634  retval = download_manager()->Fetch(&download_sentinel);
635  if (retval != download::kFailOk) {
636  if (download_sentinel.http_code == 404) {
638  "This is not a CernVM-FS server for replication");
639  } else {
641  "Failed to contact stratum 0 server (%d - %s)",
642  retval, download::Code2Ascii(download_sentinel.error_code));
643  }
644  goto fini;
645  }
646 
647  m_retval = FetchRemoteManifestEnsemble(*stratum0_url,
648  repository_name,
649  &ensemble);
650  if (m_retval != manifest::kFailOk) {
651  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
652  m_retval, manifest::Code2Ascii(m_retval));
653  goto fini;
654  }
655 
656  // Get meta info
657  meta_info_hash = ensemble.manifest->meta_info();
658  if (!meta_info_hash.IsNull()) {
659  meta_info_hash = ensemble.manifest->meta_info();
660  const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
661  download::JobInfo download_metainfo(&url, true, false, &meta_info_hash);
662  dl_retval = download_manager()->Fetch(&download_metainfo);
663  if (dl_retval != download::kFailOk) {
664  LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
665  dl_retval, download::Code2Ascii(dl_retval));
666  goto fini;
667  }
668  meta_info = string(download_metainfo.destination_mem.data,
669  download_metainfo.destination_mem.pos);
670  }
671 
673 
674  // Manifest available, now the spooler's hash algorithm can be determined
675  // That doesn't actually matter because the replication does no re-hashing
676  if (!preload_cache) {
678  spooler_definition(spooler_definition_str,
679  ensemble.manifest->GetHashAlgorithm());
680  spooler = upload::Spooler::Construct(spooler_definition);
681  assert(spooler);
682  spooler->RegisterListener(&SpoolerOnUpload);
683  }
684 
685  // Open the reflog for modification
686  if (!preload_cache) {
687  if (initial_snapshot) {
688  LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
689  repository_name.c_str());
690  reflog = CreateEmptyReflog(*temp_dir, repository_name);
691  if (reflog == NULL) {
692  LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
693  goto fini;
694  }
695  } else {
696  ObjectFetcher object_fetcher_stratum1(repository_name,
697  *stratum1_url,
698  *temp_dir,
699  download_manager(),
700  signature_manager());
701 
702  if (!reflog_hash.IsNull()) {
703  reflog =
704  FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash);
705  assert(reflog != NULL);
706  } else {
707  LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
708  if (spooler->Peek("/.cvmfsreflog")) {
710  "no reflog hash specified but reflog is present");
711  goto fini;
712  }
713  }
714  }
715 
716  if (reflog != NULL) {
718  // On commit: use manifest's hash algorithm
719  reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
720  }
721  }
722 
723  // Fetch tag list.
724  // If we are just preloading the cache it is not strictly necessarily to
725  // download the entire tag list
726  // TODO(molina): add user option to download tags when preloading the cache
727  if (!ensemble.manifest->history().IsNull() && !preload_cache) {
728  shash::Any history_hash = ensemble.manifest->history();
729  const string history_url = *stratum0_url + "/data/"
730  + history_hash.MakePath();
731  const string history_path = *temp_dir + "/" + history_hash.ToString();
732  download::JobInfo download_history(&history_url, false, false,
733  &history_path,
734  &history_hash);
735  dl_retval = download_manager()->Fetch(&download_history);
736  if (dl_retval != download::kFailOk) {
737  ReportDownloadError(download_history);
738  goto fini;
739  }
740  const std::string history_db_path = history_path + ".uncompressed";
741  retval = zlib::DecompressPath2Path(history_path, history_db_path);
742  assert(retval);
743  history::History *tag_db = history::SqliteHistory::Open(history_db_path);
744  if (NULL == tag_db) {
745  LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
746  history_db_path.c_str());
747  unlink(history_db_path.c_str());
748  goto fini;
749  }
750  retval = tag_db->List(&historic_tags);
751  delete tag_db;
752  unlink(history_db_path.c_str());
753  if (!retval) {
754  LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
755  history_db_path.c_str());
756  goto fini;
757  }
758 
759  LogCvmfs(kLogCvmfs, kLogStdout, "Found %u named snapshots",
760  historic_tags.size());
761  // TODO(jblomer): We should repliacte the previous history dbs, too,
762  // in order to avoid races on fail-over between non-synchronized stratum 1s
763  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
764  Store(history_path, history_hash);
765  WaitForStorage();
766  unlink(history_path.c_str());
767  if (reflog != NULL && !reflog->AddHistory(history_hash)) {
768  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
769  goto fini;
770  }
771  }
772 
773  // Starting threads
775  LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
776  MainWorkerContext mwc;
777  mwc.download_manager = download_manager();
778  for (unsigned i = 0; i < num_parallel; ++i) {
779  int retval = pthread_create(&workers[i], NULL, MainWorker,
780  static_cast<void*>(&mwc));
781  assert(retval == 0);
782  }
783 
784  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
785  retval = Pull(ensemble.manifest->catalog_hash(), "");
786  pull_history = false;
787  if (!historic_tags.empty()) {
788  LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
789  }
790  for (TagVector::const_iterator i = historic_tags.begin(),
791  iend = historic_tags.end();
792  i != iend; ++i)
793  {
794  if (Peek(i->root_hash))
795  continue;
796  LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
797  i->name.c_str());
799  bool retval2 = Pull(i->root_hash, "");
800  retval = retval && retval2;
801  }
802 
803  // Stopping threads
804  LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
805  for (unsigned i = 0; i < num_parallel; ++i) {
806  ChunkJob terminate_workers;
807  WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
808  }
809  for (unsigned i = 0; i < num_parallel; ++i) {
810  int retval = pthread_join(workers[i], NULL);
811  assert(retval == 0);
812  }
814 
815  if (!retval)
816  goto fini;
817 
818  // Upload manifest ensemble
819  {
820  LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
821  WaitForStorage();
822 
823  if (!Peek(ensemble.manifest->certificate())) {
824  StoreBuffer(ensemble.cert_buf,
825  ensemble.cert_size,
826  ensemble.manifest->certificate(), true);
827  }
828  if (reflog != NULL &&
829  !reflog->AddCertificate(ensemble.manifest->certificate())) {
830  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
831  goto fini;
832  }
833  if (!meta_info_hash.IsNull()) {
834  const unsigned char *info = reinterpret_cast<const unsigned char *>(
835  meta_info.data());
836  StoreBuffer(info, meta_info.size(), meta_info_hash, true);
837  if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
838  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
839  goto fini;
840  }
841  }
842 
843  // Create alternative bootstrapping symlinks for VOMS secured repos
844  if (ensemble.manifest->has_alt_catalog_path()) {
845  const bool success =
846  spooler->PlaceBootstrappingShortcut(ensemble.manifest->certificate()) &&
847  spooler->PlaceBootstrappingShortcut(ensemble.manifest->catalog_hash())
848  && (ensemble.manifest->history().IsNull() ||
849  spooler->PlaceBootstrappingShortcut(ensemble.manifest->history()))
850  && (meta_info_hash.IsNull() ||
851  spooler->PlaceBootstrappingShortcut(meta_info_hash));
852 
853  if (!success) {
855  "failed to place root catalog bootstrapping symlinks");
856  return 1;
857  }
858  }
859 
860  // upload Reflog database
861  if (!preload_cache && reflog != NULL) {
864  string reflog_path = reflog->database_file();
865  delete reflog;
866  manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
867  WaitForStorage(); // Reduce the duration of reflog /wo checksum
868  spooler->UploadReflog(reflog_path);
869  spooler->WaitForUpload();
870  unlink(reflog_path.c_str());
871  if (spooler->GetNumberOfErrors()) {
872  LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
873  spooler->GetNumberOfErrors());
874  goto fini;
875  }
876  assert(!reflog_chksum_path.empty());
877  manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
878  }
879 
880  if (preload_cache) {
881  bool retval =
882  ensemble.manifest->ExportBreadcrumb(*preload_cachedir, 0660);
883  assert(retval);
884  } else {
885  // pkcs#7 structure contains content + certificate + signature
886  // So there is no race with whitelist and pkcs7 signature being out of
887  // sync
888  if (ensemble.whitelist_pkcs7_buf) {
890  ".cvmfswhitelist.pkcs7", false);
891  }
892  StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
893  ".cvmfswhitelist", false);
895  ".cvmfspublished", false);
896  }
897  LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %u",
898  ensemble.manifest->revision());
899  }
900 
901  WaitForStorage();
902  LogCvmfs(kLogCvmfs, kLogStdout, "Fetched %" PRId64 " new chunks out of %"
903  PRId64 " processed chunks",
904  atomic_read64(&overall_new), atomic_read64(&overall_chunks));
905  result = 0;
906 
907  fini:
908  if (fd_lockfile >= 0)
909  UnlockFile(fd_lockfile);
910  free(workers);
911  delete spooler;
912  delete pathfilter;
913  return result;
914 }
915 
916 } // namespace swissknife
int return_code
the return value of the spooler operation
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
std::string database_file() const
Definition: reflog.cc:337
const char kSuffixNone
Definition: compat.h:163
bool IsNull() const
Definition: hash.h:379
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
Definition: catalog.cc:427
bool AddHistory(const shash::Any &history)
Definition: reflog.cc:134
unsigned char * raw_manifest_buf
int64_t atomic_int64
Definition: atomic.h:18
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
bool IsValid() const
Definition: dirtab.h:127
static SqliteHistory * Open(const std::string &file_name)
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
Definition: reflog.cc:47
#define PANIC(...)
Definition: exception.h:26
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string dest_path, const bool compress)
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
Definition: posix.cc:1030
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:318
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:245
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
Definition: compression.cc:86
bool AllChunksBegin()
Definition: catalog.cc:422
shash::Any GetPreviousRevision() const
Definition: catalog.cc:555
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
Definition: manifest.h:130
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
Algorithms algorithm
Definition: hash.h:123
string StringifyTime(const time_t seconds, const bool utc)
Definition: string.cc:105
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
static void WaitForStorage()
bool AddCatalog(const shash::Any &catalog)
Definition: reflog.cc:128
unsigned char digest[digest_size_]
Definition: hash.h:122
char algorithm
uint64_t revision() const
Definition: manifest.h:121
static void SpoolerOnUpload(const upload::SpoolerResult &result)
Algorithms
Definition: hash.h:39
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
Definition: posix.cc:816
uint64_t GetLastModified() const
Definition: catalog.cc:533
int64_t String2Int64(const string &value)
Definition: string.cc:222
const char * Code2Ascii(const Failures error)
Definition: download.h:87
unsigned GetDigestSize() const
Definition: hash.h:164
Algorithms
Definition: compression.h:44
bool ExportBreadcrumb(const std::string &directory, const int mode) const
Definition: manifest.cc:232
bool AllChunksEnd()
Definition: catalog.cc:433
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
bool AddMetainfo(const shash::Any &metainfo)
Definition: reflog.cc:140
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
Definition: hash.h:52
bool DecompressPath2File(const string &src, FILE *fdest)
Definition: compression.cc:667
shash::Any certificate() const
Definition: manifest.h:126
bool IsProxyTransferError(const Failures error)
Definition: download.h:75
static void ReportDownloadError(const download::JobInfo &download_job)
void BeginTransaction()
Definition: reflog.cc:295
void SetLogVerbosity(const LogLevels min_level)
Definition: logging.cc:207
unsigned char digest[20]
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
Definition: reflog.cc:322
shash::Any catalog_hash() const
Definition: manifest.h:124
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
Definition: string.cc:78
bool garbage_collectable() const
Definition: manifest.h:129
void DropDatabaseFileOwnership()
Definition: reflog.cc:313
bool DirectoryExists(const std::string &path)
Definition: posix.cc:838
bool AddCertificate(const shash::Any &certificate)
Definition: reflog.cc:121
char Suffix
Definition: hash.h:112
shash::Algorithms GetHashAlgorithm() const
Definition: manifest.h:83
std::string MakePathWithoutSuffix() const
Definition: hash.h:331
std::vector< NestedCatalog > NestedCatalogList
Definition: catalog.h:208
const NestedCatalogList ListOwnNestedCatalogs() const
Definition: catalog.cc:656
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Failures error_code
Definition: download.h:288
Failures Fetch(JobInfo *info)
Definition: download.cc:1719
struct download::JobInfo::@3 destination_mem
shash::Any history() const
Definition: manifest.h:127
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
Definition: compression.cc:678
uint64_t GetNumChunks() const
Definition: catalog.cc:541
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
void CommitTransaction()
Definition: reflog.cc:301
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
Definition: catalog.cc:29
const unsigned kMaxDigestSize
Definition: hash.h:70
bool DecompressPath2Path(const string &src, const string &dest)
Definition: compression.cc:381
std::string meta_info() const
Definition: repository.h:126
const int kLogVerboseMsg
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1913
bool IsHostTransferError(const Failures error)
Definition: download.h:63
Suffix suffix
Definition: hash.h:124
std::string MakePath() const
Definition: hash.h:312
static bool WriteChecksum(const std::string &path, const shash::Any &value)
Definition: reflog.cc:64
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:534
static void size_t size
Definition: smalloc.h:47
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:546
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1149
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:584
const std::string * url
Definition: download.h:151
shash::Any meta_info() const
Definition: manifest.h:131
void UnlockFile(const int filedes)
Definition: posix.cc:1020
const char * Code2Ascii(const Failures error)