GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_pull.cc
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 0 487 0.0%
Branches: 0 334 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Replicates a cvmfs repository. Uses the cvmfs intrinsic Merkle trees
5 * to calculate the difference set.
6 */
7
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10 // NOLINTNEXTLINE
11 #define __STDC_FORMAT_MACROS
12
13 #include "cvmfs_config.h"
14 #include "swissknife_pull.h"
15
16 #include <inttypes.h>
17 #include <pthread.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25
26 #include "catalog.h"
27 #include "compression.h"
28 #include "crypto/hash.h"
29 #include "crypto/signature.h"
30 #include "history_sqlite.h"
31 #include "manifest.h"
32 #include "manifest_fetch.h"
33 #include "network/download.h"
34 #include "object_fetcher.h"
35 #include "path_filters/relaxed_path_filter.h"
36 #include "reflog.h"
37 #include "upload.h"
38 #include "util/atomic.h"
39 #include "util/concurrency.h"
40 #include "util/exception.h"
41 #include "util/logging.h"
42 #include "util/posix.h"
43 #include "util/shared_ptr.h"
44 #include "util/smalloc.h"
45 #include "util/string.h"
46
47 using namespace std; // NOLINT
48
49 namespace swissknife {
50
51 namespace {
52
53 typedef HttpObjectFetcher<> ObjectFetcher;
54
55 /**
56 * This just stores an shash::Any in a predictable way to send it through a
57 * POSIX pipe.
58 */
59 class ChunkJob {
60 public:
61 ChunkJob()
62 : suffix(shash::kSuffixNone)
63 , hash_algorithm(shash::kAny)
64 , compression_alg(zlib::kZlibDefault) {}
65
66 ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
67 : suffix(hash.suffix)
68 , hash_algorithm(hash.algorithm)
69 , compression_alg(compression_alg)
70 {
71 memcpy(digest, hash.digest, hash.GetDigestSize());
72 }
73
74 bool IsTerminateJob() const {
75 return (hash_algorithm == shash::kAny);
76 }
77
78 shash::Any hash() const {
79 assert(!IsTerminateJob());
80 return shash::Any(hash_algorithm,
81 digest,
82 suffix);
83 }
84
85 const shash::Suffix suffix;
86 const shash::Algorithms hash_algorithm;
87 const zlib::Algorithms compression_alg;
88 unsigned char digest[shash::kMaxDigestSize];
89 };
90
91 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
92 unlink(result.local_path.c_str());
93 if (result.return_code != 0) {
94 PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
95 result.local_path.c_str(), result.content_hash.ToString().c_str());
96 }
97 }
98
99 SharedPtr<string> stratum0_url;
100 SharedPtr<string> stratum1_url;
101 SharedPtr<string> temp_dir;
102 unsigned num_parallel = 1;
103 bool pull_history = false;
104 bool apply_timestamp_threshold = false;
105 uint64_t timestamp_threshold = 0;
106 bool is_garbage_collectable = false;
107 bool initial_snapshot = false;
108 upload::Spooler *spooler = NULL;
109 int pipe_chunks[2];
110 // required for concurrent reading
111 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
112 unsigned retries = 3;
113 catalog::RelaxedPathFilter *pathfilter = NULL;
114 atomic_int64 overall_chunks;
115 atomic_int64 overall_new;
116 atomic_int64 chunk_queue;
117 bool preload_cache = false;
118 string *preload_cachedir = NULL;
119 bool inspect_existing_catalogs = false;
120 manifest::Reflog *reflog = NULL;
121
122 } // anonymous namespace
123
124
125 static std::string MakePath(const shash::Any &hash) {
126 return (preload_cache)
127 ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
128 : "data/" + hash.MakePath();
129 }
130
131
132 static bool Peek(const string &remote_path) {
133 return (preload_cache) ? FileExists(remote_path)
134 : spooler->Peek(remote_path);
135 }
136
137 static bool Peek(const shash::Any &remote_hash) {
138 return Peek(MakePath(remote_hash));
139 }
140
141 static void ReportDownloadError(const download::JobInfo &download_job) {
142 const download::Failures error_code = download_job.error_code();
143 const int http_code = download_job.http_code();
144 const std::string url = *download_job.url();
145
146 LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
147 url.c_str(), error_code, download::Code2Ascii(error_code));
148
149 switch (error_code) {
150 case download::kFailProxyResolve:
151 case download::kFailHostResolve:
152 LogCvmfs(kLogCvmfs, kLogStderr, "DNS lookup for Stratum 0 failed - "
153 "please check the network connection");
154 break;
155
156 case download::kFailHostHttp:
157 LogCvmfs(kLogCvmfs, kLogStderr, "unexpected HTTP error code %d - "
158 "please check the stratum 0 health", http_code);
159 break;
160
161 case download::kFailBadData:
162 LogCvmfs(kLogCvmfs, kLogStderr, "downloaded corrupted data - "
163 "please check the stratum 0 health");
164 break;
165
166 default:
167 if (download::IsProxyTransferError(error_code) ||
168 download::IsHostTransferError(error_code)) {
169 LogCvmfs(kLogCvmfs, kLogStderr, "couldn't reach Stratum 0 - "
170 "please check the network connection");
171 } else {
172 LogCvmfs(kLogCvmfs, kLogStderr, "unexpected error - feel free to file "
173 "a bug report");
174 }
175 }
176 }
177
178
179 static void Store(
180 const string &local_path,
181 const string &remote_path,
182 const bool compressed_src)
183 {
184 if (preload_cache) {
185 if (!compressed_src) {
186 int retval = rename(local_path.c_str(), remote_path.c_str());
187 if (retval != 0) {
188 PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
189 remote_path.c_str());
190 }
191 } else {
192 // compressed input
193 string tmp_dest;
194 FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
195 if (fdest == NULL) {
196 PANIC(kLogStderr, "Failed to create temporary file '%s'",
197 remote_path.c_str());
198 }
199 int retval = zlib::DecompressPath2File(local_path, fdest);
200 if (!retval) {
201 PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
202 remote_path.c_str());
203 }
204 fclose(fdest);
205 retval = rename(tmp_dest.c_str(), remote_path.c_str());
206 assert(retval == 0);
207 unlink(local_path.c_str());
208 }
209 } else {
210 spooler->Upload(local_path, remote_path);
211 }
212 }
213
214 static void Store(
215 const string &local_path,
216 const shash::Any &remote_hash,
217 const bool compressed_src = true)
218 {
219 Store(local_path, MakePath(remote_hash), compressed_src);
220 }
221
222
223 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
224 const std::string &dest_path, const bool compress) {
225 string tmp_file;
226 FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
227 assert(ftmp);
228 int retval;
229 if (compress) {
230 shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused
231 retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
232 } else {
233 retval = CopyMem2File(buffer, size, ftmp);
234 }
235 assert(retval);
236 fclose(ftmp);
237 Store(tmp_file, dest_path, true);
238 }
239
240 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
241 const shash::Any &dest_hash, const bool compress) {
242 StoreBuffer(buffer, size, MakePath(dest_hash), compress);
243 }
244
245
246 static void WaitForStorage() {
247 if (!preload_cache) spooler->WaitForUpload();
248 }
249
250
251 struct MainWorkerContext {
252 download::DownloadManager *download_manager;
253 };
254
255 static void *MainWorker(void *data) {
256 MainWorkerContext *mwc = static_cast<MainWorkerContext*>(data);
257 download::DownloadManager *download_manager = mwc->download_manager;
258
259 while (1) {
260 ChunkJob next_chunk;
261 {
262 MutexLockGuard m(&lock_pipe);
263 ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
264 }
265 if (next_chunk.IsTerminateJob())
266 break;
267
268 shash::Any chunk_hash = next_chunk.hash();
269 zlib::Algorithms compression_alg = next_chunk.compression_alg;
270 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
271 chunk_hash.ToString().c_str());
272
273 if (!Peek(chunk_hash)) {
274 string tmp_file;
275 FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
276 &tmp_file);
277 string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
278 cvmfs::FileSink filesink(fchunk);
279 download::JobInfo download_chunk(&url_chunk, false, false,
280 &chunk_hash, &filesink);
281
282 const download::Failures download_result =
283 download_manager->Fetch(&download_chunk);
284 if (download_result != download::kFailOk) {
285 ReportDownloadError(download_chunk);
286 PANIC(kLogStderr, "Download error");
287 }
288 fclose(fchunk);
289 Store(tmp_file, chunk_hash,
290 (compression_alg == zlib::kZlibDefault) ? true : false);
291 atomic_inc64(&overall_new);
292 }
293 if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
294 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, ".");
295 atomic_dec64(&chunk_queue);
296 }
297 return NULL;
298 }
299
300
301 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
302 const std::string &path) {
303 assert(catalog);
304
305 // Previous catalogs
306 if (pull_history) {
307 shash::Any previous_catalog = catalog->GetPreviousRevision();
308 if (previous_catalog.IsNull()) {
309 LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history");
310 } else {
311 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
312 previous_catalog.ToString().c_str());
313 bool retval = Pull(previous_catalog, path);
314 if (!retval)
315 return false;
316 }
317 }
318
319 // Nested catalogs (in a nested code block because goto fail...)
320 {
321 const catalog::Catalog::NestedCatalogList nested_catalogs =
322 catalog->ListOwnNestedCatalogs();
323 for (catalog::Catalog::NestedCatalogList::const_iterator i =
324 nested_catalogs.begin(), iEnd = nested_catalogs.end();
325 i != iEnd; ++i)
326 {
327 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
328 i->mountpoint.c_str());
329 bool retval = Pull(i->hash, i->mountpoint.ToString());
330 if (!retval)
331 return false;
332 }
333 }
334
335 return true;
336 }
337
338 bool CommandPull::Pull(const shash::Any &catalog_hash,
339 const std::string &path) {
340 int retval;
341 download::Failures dl_retval;
342 assert(shash::kSuffixCatalog == catalog_hash.suffix);
343
344 // Check if the catalog already exists
345 if (Peek(catalog_hash)) {
346 // Preload: dirtab changed
347 if (inspect_existing_catalogs) {
348 if (!preload_cache) {
349 PANIC(kLogStderr, "to be implemented: -t without -c");
350 }
351 catalog::Catalog *catalog = catalog::Catalog::AttachFreely(
352 path, MakePath(catalog_hash), catalog_hash);
353 if (catalog == NULL) {
354 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
355 catalog_hash.ToString().c_str());
356 return false;
357 }
358 bool retval = PullRecursion(catalog, path);
359 delete catalog;
360 return retval;
361 }
362
363 LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
364 return true;
365 }
366
367 // Check if the catalog matches the pathfilter
368 if (path != "" && // necessary to load the root catalog
369 pathfilter &&
370 !pathfilter->IsMatching(path)) {
371 LogCvmfs(kLogCvmfs, kLogStdout, " Catalog in '%s' does not match"
372 " the path specification", path.c_str());
373 return true;
374 }
375
376 int64_t gauge_chunks = atomic_read64(&overall_chunks);
377 int64_t gauge_new = atomic_read64(&overall_new);
378
379 // Download and uncompress catalog
380 shash::Any chunk_hash;
381 zlib::Algorithms compression_alg;
382 catalog::Catalog *catalog = NULL;
383 string file_catalog;
384 string file_catalog_vanilla;
385 FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
386 &file_catalog);
387 if (!fcatalog) {
388 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
389 return false;
390 }
391 fclose(fcatalog);
392 FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
393 &file_catalog_vanilla);
394 if (!fcatalog_vanilla) {
395 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
396 unlink(file_catalog.c_str());
397 return false;
398 }
399 const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
400 cvmfs::FileSink filesink(fcatalog_vanilla);
401 download::JobInfo download_catalog(&url_catalog, false, false,
402 &catalog_hash, &filesink);
403 dl_retval = download_manager()->Fetch(&download_catalog);
404 fclose(fcatalog_vanilla);
405 if (dl_retval != download::kFailOk) {
406 if (path == "" && is_garbage_collectable) {
407 LogCvmfs(kLogCvmfs, kLogStdout, "skipping missing root catalog %s - "
408 "probably sweeped by garbage collection",
409 catalog_hash.ToString().c_str());
410 goto pull_skip;
411 } else {
412 ReportDownloadError(download_catalog);
413 goto pull_cleanup;
414 }
415 }
416 retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
417 if (!retval) {
418 LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
419 file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
420 goto pull_cleanup;
421 }
422 if (path.empty() && reflog != NULL) {
423 if (!reflog->AddCatalog(catalog_hash)) {
424 LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
425 goto pull_cleanup;
426 }
427 }
428
429 catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
430 if (catalog == NULL) {
431 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
432 catalog_hash.ToString().c_str());
433 goto pull_cleanup;
434 }
435
436 // Always pull the HEAD root catalog and nested catalogs
437 if (apply_timestamp_threshold && (path == "") &&
438 (catalog->GetLastModified() < timestamp_threshold))
439 {
440 LogCvmfs(kLogCvmfs, kLogStdout,
441 " Pruning at root catalog from %s due to threshold at %s",
442 StringifyTime(catalog->GetLastModified(), false).c_str(),
443 StringifyTime(timestamp_threshold, false).c_str());
444 delete catalog;
445 goto pull_skip;
446 }
447 apply_timestamp_threshold = true;
448
449 // Traverse the chunks
450 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak,
451 " Processing chunks [%" PRIu64 " registered chunks]: ",
452 catalog->GetNumChunks());
453 retval = catalog->AllChunksBegin();
454 if (!retval) {
455 LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
456 goto pull_cleanup;
457 }
458 while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
459 ChunkJob next_chunk(chunk_hash, compression_alg);
460 WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
461 atomic_inc64(&chunk_queue);
462 }
463 catalog->AllChunksEnd();
464 while (atomic_read64(&chunk_queue) != 0) {
465 SafeSleepMs(100);
466 }
467 LogCvmfs(kLogCvmfs, kLogStdout, " fetched %" PRId64 " new chunks out of "
468 "%" PRId64 " unique chunks",
469 atomic_read64(&overall_new)-gauge_new,
470 atomic_read64(&overall_chunks)-gauge_chunks);
471
472 retval = PullRecursion(catalog, path);
473
474 delete catalog;
475 unlink(file_catalog.c_str());
476 WaitForStorage();
477 if (!retval)
478 return false;
479 Store(file_catalog_vanilla, catalog_hash);
480 return true;
481
482 pull_cleanup:
483 delete catalog;
484 unlink(file_catalog.c_str());
485 unlink(file_catalog_vanilla.c_str());
486 return false;
487
488 pull_skip:
489 unlink(file_catalog.c_str());
490 unlink(file_catalog_vanilla.c_str());
491 return true;
492 }
493
494
495 int swissknife::CommandPull::Main(const swissknife::ArgumentList &args) {
496 int retval;
497 manifest::Failures m_retval;
498 download::Failures dl_retval;
499 unsigned timeout = 60;
500 int fd_lockfile = -1;
501 string spooler_definition_str;
502 manifest::ManifestEnsemble ensemble;
503 shash::Any meta_info_hash;
504 string meta_info;
505
506 // Option parsing
507 if (args.find('c') != args.end())
508 preload_cache = true;
509 if (args.find('l') != args.end()) {
510 unsigned log_level =
511 kLogLevel0 << String2Uint64(*args.find('l')->second);
512 if (log_level > kLogNone) {
513 LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
514 return 1;
515 }
516 SetLogVerbosity(static_cast<LogLevels>(log_level));
517 }
518 stratum0_url = args.find('u')->second;
519 temp_dir = args.find('x')->second;
520 if (preload_cache) {
521 preload_cachedir = new string(*args.find('r')->second);
522 } else {
523 spooler_definition_str = *args.find('r')->second;
524 }
525 string master_keys = *args.find('k')->second;
526 if (DirectoryExists(master_keys))
527 master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
528 const string repository_name = *args.find('m')->second;
529 if (args.find('n') != args.end())
530 num_parallel = String2Uint64(*args.find('n')->second);
531 if (args.find('t') != args.end())
532 timeout = String2Uint64(*args.find('t')->second);
533 if (args.find('a') != args.end())
534 retries = String2Uint64(*args.find('a')->second);
535 if (args.find('d') != args.end()) {
536 pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
537 assert(pathfilter->IsValid());
538 }
539 if (args.find('p') != args.end())
540 pull_history = true;
541 if (args.find('z') != args.end())
542 inspect_existing_catalogs = true;
543 if (args.find('w') != args.end())
544 stratum1_url = args.find('w')->second;
545 if (args.find('i') != args.end())
546 initial_snapshot = true;
547 shash::Any reflog_hash;
548 string reflog_chksum_path;
549 if (args.find('R') != args.end()) {
550 reflog_chksum_path = *args.find('R')->second;
551 if (!initial_snapshot) {
552 if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
553 LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
554 return 1;
555 }
556 }
557 }
558 if (args.find('Z') != args.end()) {
559 timestamp_threshold = String2Int64(*args.find('Z')->second);
560 }
561
562 if (!preload_cache && stratum1_url.Get() == NULL) {
563 LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
564 return 1;
565 }
566
567 typedef std::vector<history::History::Tag> TagVector;
568 TagVector historic_tags;
569
570 LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
571 stratum0_url->c_str());
572
573 int result = 1;
574
575 // Initialization
576 atomic_init64(&overall_chunks);
577 atomic_init64(&overall_new);
578 atomic_init64(&chunk_queue);
579
580 const bool follow_redirects = false;
581 const unsigned max_pool_handles = num_parallel+1;
582 const string proxy =
583 (args.find('@') != args.end()) ? *args.find('@')->second : "";
584
585 if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
586 return 1;
587 }
588
589 if (!this->InitVerifyingSignatureManager(master_keys)) {
590 LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures");
591 return 1;
592 } else {
593 LogCvmfs(kLogCvmfs, kLogStdout,
594 "CernVM-FS: using public key(s) %s",
595 JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
596 }
597
598 unsigned current_group;
599 vector< vector<download::DownloadManager::ProxyInfo> > proxies;
600 download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
601 if (proxies.size() > 0) {
602 string proxy_str = "\nWarning, replicating through proxies\n";
603 proxy_str += " Load-balance groups:\n";
604 for (unsigned i = 0; i < proxies.size(); ++i) {
605 vector<string> urls;
606 for (unsigned j = 0; j < proxies[i].size(); ++j) {
607 urls.push_back(proxies[i][j].url);
608 }
609 proxy_str +=
610 " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") + "\n";
611 }
612 proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] " +
613 proxies[current_group][0].url;
614 LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
615 }
616 download_manager()->SetTimeout(timeout, timeout);
617 download_manager()->SetRetryParameters(retries, 500, 2000);
618 download_manager()->Spawn();
619
620 // init the download helper
621 ObjectFetcher object_fetcher(repository_name,
622 *stratum0_url,
623 *temp_dir,
624 download_manager(),
625 signature_manager());
626
627 pthread_t *workers =
628 reinterpret_cast<pthread_t *>(smalloc(sizeof(pthread_t) * num_parallel));
629
630 // Check if we have a replica-ready server
631 const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
632 download::JobInfo download_sentinel(&url_sentinel, false);
633 retval = download_manager()->Fetch(&download_sentinel);
634 if (retval != download::kFailOk) {
635 if (download_sentinel.http_code() == 404) {
636 LogCvmfs(kLogCvmfs, kLogStderr,
637 "This is not a CernVM-FS server for replication");
638 } else {
639 LogCvmfs(kLogCvmfs, kLogStderr,
640 "Failed to contact stratum 0 server (%d - %s)",
641 retval, download::Code2Ascii(download_sentinel.error_code()));
642 }
643 goto fini;
644 }
645
646 m_retval = FetchRemoteManifestEnsemble(*stratum0_url,
647 repository_name,
648 &ensemble);
649 if (m_retval != manifest::kFailOk) {
650 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
651 m_retval, manifest::Code2Ascii(m_retval));
652 goto fini;
653 }
654
655 // Get meta info
656 meta_info_hash = ensemble.manifest->meta_info();
657 if (!meta_info_hash.IsNull()) {
658 meta_info_hash = ensemble.manifest->meta_info();
659 const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
660 cvmfs::MemSink metainfo_memsink;
661 download::JobInfo download_metainfo(&url, true, false, &meta_info_hash,
662 &metainfo_memsink);
663 dl_retval = download_manager()->Fetch(&download_metainfo);
664 if (dl_retval != download::kFailOk) {
665 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
666 dl_retval, download::Code2Ascii(dl_retval));
667 goto fini;
668 }
669 meta_info = string(reinterpret_cast<char*>(metainfo_memsink.data()),
670 metainfo_memsink.pos());
671 }
672
673 is_garbage_collectable = ensemble.manifest->garbage_collectable();
674
675 // Manifest available, now the spooler's hash algorithm can be determined
676 // That doesn't actually matter because the replication does no re-hashing
677 if (!preload_cache) {
678 const upload::SpoolerDefinition
679 spooler_definition(spooler_definition_str,
680 ensemble.manifest->GetHashAlgorithm());
681 spooler = upload::Spooler::Construct(spooler_definition);
682 assert(spooler);
683 spooler->RegisterListener(&SpoolerOnUpload);
684 }
685
686 // Open the reflog for modification
687 if (!preload_cache) {
688 if (initial_snapshot) {
689 LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
690 repository_name.c_str());
691 reflog = CreateEmptyReflog(*temp_dir, repository_name);
692 if (reflog == NULL) {
693 LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
694 goto fini;
695 }
696 } else {
697 ObjectFetcher object_fetcher_stratum1(repository_name,
698 *stratum1_url,
699 *temp_dir,
700 download_manager(),
701 signature_manager());
702
703 if (!reflog_hash.IsNull()) {
704 reflog =
705 FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash);
706 assert(reflog != NULL);
707 } else {
708 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
709 if (spooler->Peek(".cvmfsreflog")) {
710 LogCvmfs(kLogCvmfs, kLogStderr,
711 "no reflog hash specified but reflog is present");
712 goto fini;
713 }
714 }
715 }
716
717 if (reflog != NULL) {
718 reflog->BeginTransaction();
719 // On commit: use manifest's hash algorithm
720 reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
721 }
722 }
723
724 // Fetch tag list.
725 // If we are just preloading the cache it is not strictly necessarily to
726 // download the entire tag list
727 // TODO(molina): add user option to download tags when preloading the cache
728 if (!ensemble.manifest->history().IsNull() && !preload_cache) {
729 shash::Any history_hash = ensemble.manifest->history();
730 const string history_url = *stratum0_url + "/data/"
731 + history_hash.MakePath();
732 const string history_path = *temp_dir + "/" + history_hash.ToString();
733
734 cvmfs::PathSink pathsink(history_path);
735 download::JobInfo download_history(&history_url, false, false,
736 &history_hash, &pathsink);
737 dl_retval = download_manager()->Fetch(&download_history);
738 if (dl_retval != download::kFailOk) {
739 ReportDownloadError(download_history);
740 goto fini;
741 }
742 const std::string history_db_path = history_path + ".uncompressed";
743 retval = zlib::DecompressPath2Path(history_path, history_db_path);
744 assert(retval);
745 history::History *tag_db = history::SqliteHistory::Open(history_db_path);
746 if (NULL == tag_db) {
747 LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
748 history_db_path.c_str());
749 unlink(history_db_path.c_str());
750 goto fini;
751 }
752 retval = tag_db->List(&historic_tags);
753 delete tag_db;
754 unlink(history_db_path.c_str());
755 if (!retval) {
756 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
757 history_db_path.c_str());
758 goto fini;
759 }
760
761 LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots",
762 historic_tags.size());
763 // TODO(jblomer): We should repliacte the previous history dbs, too,
764 // in order to avoid races on fail-over between non-synchronized stratum 1s
765 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
766 Store(history_path, history_hash);
767 WaitForStorage();
768 unlink(history_path.c_str());
769 if (reflog != NULL && !reflog->AddHistory(history_hash)) {
770 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
771 goto fini;
772 }
773 }
774
775 // Starting threads
776 MakePipe(pipe_chunks);
777 LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
778 MainWorkerContext mwc;
779 mwc.download_manager = download_manager();
780 for (unsigned i = 0; i < num_parallel; ++i) {
781 int retval = pthread_create(&workers[i], NULL, MainWorker,
782 static_cast<void*>(&mwc));
783 assert(retval == 0);
784 }
785
786 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
787 retval = Pull(ensemble.manifest->catalog_hash(), "");
788 pull_history = false;
789 if (!historic_tags.empty()) {
790 LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
791 }
792 for (TagVector::const_iterator i = historic_tags.begin(),
793 iend = historic_tags.end();
794 i != iend; ++i)
795 {
796 if (Peek(i->root_hash))
797 continue;
798 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
799 i->name.c_str());
800 apply_timestamp_threshold = false;
801 bool retval2 = Pull(i->root_hash, "");
802 retval = retval && retval2;
803 }
804
805 // Stopping threads
806 LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
807 for (unsigned i = 0; i < num_parallel; ++i) {
808 ChunkJob terminate_workers;
809 WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
810 }
811 for (unsigned i = 0; i < num_parallel; ++i) {
812 int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
813 assert(retval == 0);
814 }
815 ClosePipe(pipe_chunks);
816
817 if (!retval)
818 goto fini;
819
820 // Upload manifest ensemble
821 {
822 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
823 WaitForStorage();
824
825 if (!Peek(ensemble.manifest->certificate())) {
826 StoreBuffer(ensemble.cert_buf,
827 ensemble.cert_size,
828 ensemble.manifest->certificate(), true);
829 }
830 if (reflog != NULL &&
831 !reflog->AddCertificate(ensemble.manifest->certificate())) {
832 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
833 goto fini;
834 }
835 if (!meta_info_hash.IsNull()) {
836 const unsigned char *info = reinterpret_cast<const unsigned char *>(
837 meta_info.data());
838 StoreBuffer(info, meta_info.size(), meta_info_hash, true);
839 if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
840 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
841 goto fini;
842 }
843 }
844
845 // Create alternative bootstrapping symlinks for VOMS secured repos
846 if (ensemble.manifest->has_alt_catalog_path()) {
847 const bool success =
848 spooler->PlaceBootstrappingShortcut(ensemble.manifest->certificate()) &&
849 spooler->PlaceBootstrappingShortcut(ensemble.manifest->catalog_hash())
850 && (ensemble.manifest->history().IsNull() ||
851 spooler->PlaceBootstrappingShortcut(ensemble.manifest->history()))
852 && (meta_info_hash.IsNull() ||
853 spooler->PlaceBootstrappingShortcut(meta_info_hash));
854
855 if (!success) {
856 LogCvmfs(kLogCvmfs, kLogStderr,
857 "failed to place root catalog bootstrapping symlinks");
858 goto fini;
859 }
860 }
861
862 // upload Reflog database
863 if (!preload_cache && reflog != NULL) {
864 reflog->CommitTransaction();
865 reflog->DropDatabaseFileOwnership();
866 string reflog_path = reflog->database_file();
867 delete reflog;
868 manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
869 WaitForStorage(); // Reduce the duration of reflog /wo checksum
870 spooler->UploadReflog(reflog_path);
871 spooler->WaitForUpload();
872 unlink(reflog_path.c_str());
873 if (spooler->GetNumberOfErrors()) {
874 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
875 spooler->GetNumberOfErrors());
876 goto fini;
877 }
878 assert(!reflog_chksum_path.empty());
879 manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
880 }
881
882 if (preload_cache) {
883 bool retval =
884 ensemble.manifest->ExportBreadcrumb(*preload_cachedir, 0660);
885 assert(retval);
886 } else {
887 // pkcs#7 structure contains content + certificate + signature
888 // So there is no race with whitelist and pkcs7 signature being out of
889 // sync
890 if (ensemble.whitelist_pkcs7_buf) {
891 StoreBuffer(ensemble.whitelist_pkcs7_buf, ensemble.whitelist_pkcs7_size,
892 ".cvmfswhitelist.pkcs7", false);
893 }
894 StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
895 ".cvmfswhitelist", false);
896 StoreBuffer(ensemble.raw_manifest_buf, ensemble.raw_manifest_size,
897 ".cvmfspublished", false);
898 }
899 LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64,
900 ensemble.manifest->revision());
901 }
902
903 WaitForStorage();
904 LogCvmfs(kLogCvmfs, kLogStdout, "Fetched %" PRId64 " new chunks out of %"
905 PRId64 " processed chunks",
906 atomic_read64(&overall_new), atomic_read64(&overall_chunks));
907 result = 0;
908
909 fini:
910 if (fd_lockfile >= 0)
911 UnlockFile(fd_lockfile);
912 free(workers);
913 delete spooler;
914 delete pathfilter;
915 return result;
916 }
917
918 } // namespace swissknife
919