GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_pull.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 0 528 0.0%
Branches: 0 394 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Replicates a cvmfs repository. Uses the cvmfs intrinsic Merkle trees
5 * to calculate the difference set.
6 */
7
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10
11
12 #include "swissknife_pull.h"
13
14 #include <inttypes.h>
15 #include <pthread.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18
19 #include <cstdlib>
20 #include <cstring>
21 #include <string>
22 #include <vector>
23
24 #include "catalog.h"
25 #include "compression/compression.h"
26 #include "crypto/hash.h"
27 #include "history_sqlite.h"
28 #include "manifest.h"
29 #include "manifest_fetch.h"
30 #include "network/download.h"
31 #include "network/sink_path.h"
32 #include "network/sink_mem.h"
33 #include "object_fetcher.h"
34 #include "path_filters/inclusion_spec.h"
35 #include "path_filters/relaxed_path_filter.h"
36 #include "reflog.h"
37 #include "upload.h"
38 #include "util/atomic.h"
39 #include "util/exception.h"
40 #include "util/logging.h"
41 #include "util/posix.h"
42 #include "util/shared_ptr.h"
43 #include "util/smalloc.h"
44 #include "util/string.h"
45
46 using namespace std; // NOLINT
47
48 namespace swissknife {
49
50 namespace {
51
52 typedef HttpObjectFetcher<> ObjectFetcher;
53
54 /**
55 * This just stores an shash::Any in a predictable way to send it through a
56 * POSIX pipe.
57 */
58 class ChunkJob {
59 public:
60 ChunkJob()
61 : suffix(shash::kSuffixNone)
62 , hash_algorithm(shash::kAny)
63 , compression_alg(zlib::kZlibDefault) { }
64
65 ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
66 : suffix(hash.suffix)
67 , hash_algorithm(hash.algorithm)
68 , compression_alg(compression_alg) {
69 memcpy(digest, hash.digest, hash.GetDigestSize());
70 }
71
72 bool IsTerminateJob() const { return (hash_algorithm == shash::kAny); }
73
74 shash::Any hash() const {
75 assert(!IsTerminateJob());
76 return shash::Any(hash_algorithm, digest, suffix);
77 }
78
79 const shash::Suffix suffix;
80 const shash::Algorithms hash_algorithm;
81 const zlib::Algorithms compression_alg;
82 unsigned char digest[shash::kMaxDigestSize];
83 };
84
85 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
86 unlink(result.local_path.c_str());
87 if (result.return_code != 0) {
88 PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
89 result.local_path.c_str(), result.content_hash.ToString().c_str());
90 }
91 }
92
93 SharedPtr<string> stratum0_url;
94 SharedPtr<string> stratum1_url;
95 SharedPtr<string> temp_dir;
96 unsigned num_parallel = 1;
97 bool pull_history = false;
98 bool apply_timestamp_threshold = false;
99 uint64_t timestamp_threshold = 0;
100 bool is_garbage_collectable = false;
101 bool initial_snapshot = false;
102 upload::Spooler *spooler = NULL;
103 int pipe_chunks[2];
104 // required for concurrent reading
105 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
106 unsigned retries = 3;
107 catalog::RelaxedPathFilter *pathfilter = NULL;
108 catalog::InclusionSpec *inclusion_spec = NULL;
109 atomic_int64 overall_chunks;
110 atomic_int64 overall_new;
111 atomic_int64 chunk_queue;
112 bool preload_cache = false;
113 string *preload_cachedir = NULL;
114 bool inspect_existing_catalogs = false;
115 manifest::Reflog *reflog = NULL;
116
117 } // anonymous namespace
118
119
120 static std::string MakePath(const shash::Any &hash) {
121 return (preload_cache)
122 ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
123 : "data/" + hash.MakePath();
124 }
125
126
127 static bool Peek(const string &remote_path) {
128 return (preload_cache) ? FileExists(remote_path) : spooler->Peek(remote_path);
129 }
130
131 static bool Peek(const shash::Any &remote_hash) {
132 return Peek(MakePath(remote_hash));
133 }
134
135 static void ReportDownloadError(const download::JobInfo &download_job) {
136 const download::Failures error_code = download_job.error_code();
137 const int http_code = download_job.http_code();
138 const std::string url = *download_job.url();
139
140 LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
141 url.c_str(), error_code, download::Code2Ascii(error_code));
142
143 switch (error_code) {
144 case download::kFailProxyResolve:
145 case download::kFailHostResolve:
146 LogCvmfs(kLogCvmfs, kLogStderr,
147 "DNS lookup for Stratum 0 failed - "
148 "please check the network connection");
149 break;
150
151 case download::kFailHostHttp:
152 LogCvmfs(kLogCvmfs, kLogStderr,
153 "unexpected HTTP error code %d - "
154 "please check the stratum 0 health",
155 http_code);
156 break;
157
158 case download::kFailBadData:
159 LogCvmfs(kLogCvmfs, kLogStderr,
160 "downloaded corrupted data - "
161 "please check the stratum 0 health");
162 break;
163
164 default:
165 if (download::IsProxyTransferError(error_code)
166 || download::IsHostTransferError(error_code)) {
167 LogCvmfs(kLogCvmfs, kLogStderr,
168 "couldn't reach Stratum 0 - "
169 "please check the network connection");
170 } else {
171 LogCvmfs(kLogCvmfs, kLogStderr,
172 "unexpected error - feel free to file "
173 "a bug report");
174 }
175 }
176 }
177
178
179 static void Store(const string &local_path,
180 const string &remote_path,
181 const bool compressed_src) {
182 if (preload_cache) {
183 if (!compressed_src) {
184 const int retval = rename(local_path.c_str(), remote_path.c_str());
185 if (retval != 0) {
186 PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
187 remote_path.c_str());
188 }
189 } else {
190 // compressed input
191 string tmp_dest;
192 FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
193 if (fdest == NULL) {
194 PANIC(kLogStderr, "Failed to create temporary file '%s'",
195 remote_path.c_str());
196 }
197 int retval = zlib::DecompressPath2File(local_path, fdest);
198 if (!retval) {
199 PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
200 remote_path.c_str());
201 }
202 fclose(fdest);
203 retval = rename(tmp_dest.c_str(), remote_path.c_str());
204 assert(retval == 0);
205 unlink(local_path.c_str());
206 }
207 } else {
208 spooler->Upload(local_path, remote_path);
209 }
210 }
211
212 static void Store(const string &local_path,
213 const shash::Any &remote_hash,
214 const bool compressed_src = true) {
215 Store(local_path, MakePath(remote_hash), compressed_src);
216 }
217
218
219 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
220 const std::string &dest_path, const bool compress) {
221 string tmp_file;
222 FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
223 assert(ftmp);
224 int retval;
225 if (compress) {
226 shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused
227 retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
228 } else {
229 retval = CopyMem2File(buffer, size, ftmp);
230 }
231 assert(retval);
232 fclose(ftmp);
233 Store(tmp_file, dest_path, true);
234 }
235
236 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
237 const shash::Any &dest_hash, const bool compress) {
238 StoreBuffer(buffer, size, MakePath(dest_hash), compress);
239 }
240
241
242 static void WaitForStorage() {
243 if (!preload_cache)
244 spooler->WaitForUpload();
245 }
246
247
248 struct MainWorkerContext {
249 download::DownloadManager *download_manager;
250 };
251
252 static void *MainWorker(void *data) {
253 MainWorkerContext *mwc = static_cast<MainWorkerContext *>(data);
254 download::DownloadManager *download_manager = mwc->download_manager;
255
256 while (1) {
257 ChunkJob next_chunk;
258 {
259 const MutexLockGuard m(&lock_pipe);
260 ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
261 }
262 if (next_chunk.IsTerminateJob())
263 break;
264
265 const shash::Any chunk_hash = next_chunk.hash();
266 const zlib::Algorithms compression_alg = next_chunk.compression_alg;
267 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
268 chunk_hash.ToString().c_str());
269
270 if (!Peek(chunk_hash)) {
271 string tmp_file;
272 FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
273 const string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
274 cvmfs::FileSink filesink(fchunk);
275 download::JobInfo download_chunk(&url_chunk, false, false, &chunk_hash,
276 &filesink);
277
278 const download::Failures download_result = download_manager->Fetch(
279 &download_chunk);
280 if (download_result != download::kFailOk) {
281 ReportDownloadError(download_chunk);
282 PANIC(kLogStderr, "Download error");
283 }
284 fclose(fchunk);
285 Store(tmp_file, chunk_hash,
286 (compression_alg == zlib::kZlibDefault) ? true : false);
287 atomic_inc64(&overall_new);
288 }
289 if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
290 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, ".");
291 atomic_dec64(&chunk_queue);
292 }
293 return NULL;
294 }
295
296
297 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
298 const std::string &path) {
299 assert(catalog);
300
301
302 // Nested catalogs (in a nested code block because goto fail...)
303 {
304 const catalog::Catalog::NestedCatalogList
305 nested_catalogs = catalog->ListOwnNestedCatalogs();
306 for (catalog::Catalog::NestedCatalogList::const_iterator
307 i = nested_catalogs.begin(),
308 iEnd = nested_catalogs.end();
309 i != iEnd;
310 ++i) {
311 // Partial replication: prune entire subtrees that contain no included
312 // path. IsExcluded() is true exactly when the subtree rooted at this
313 // mountpoint is disjoint from the inclusion set, so neither this nested
314 // catalog nor any of its descendants need to be replicated. Ancestor
315 // catalogs on the way to an included subtree are *not* excluded and are
316 // therefore still pulled and recursed into.
317 if (inclusion_spec
318 && inclusion_spec->IsExcluded(i->mountpoint.ToString())) {
319 LogCvmfs(kLogCvmfs, kLogStdout,
320 "Pruning excluded subtree (catalog + objects) at %s",
321 i->mountpoint.c_str());
322 continue;
323 }
324 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
325 i->mountpoint.c_str());
326 shash::Any previous_catalog_hash; // expected to be null for subcatalog
327 const bool retval = Pull(i->hash, i->mountpoint.ToString(),
328 previous_catalog_hash);
329 if (!retval)
330 return false;
331 }
332 }
333
334 return true;
335 }
336
337 bool CommandPull::Pull(const shash::Any &catalog_hash,
338 const std::string &path,
339 shash::Any &previous_catalog,
340 bool is_historic_catalog) {
341 int retval;
342 download::Failures dl_retval;
343 assert(shash::kSuffixCatalog == catalog_hash.suffix);
344 previous_catalog.SetNull();
345
346 // Check if the catalog already exists
347 if (Peek(catalog_hash)) {
348 // Preload: dirtab changed
349 if (inspect_existing_catalogs) {
350 if (!preload_cache) {
351 PANIC(kLogStderr, "to be implemented: -t without -c");
352 }
353 catalog::Catalog *catalog = catalog::Catalog::AttachFreely(
354 path, MakePath(catalog_hash), catalog_hash);
355 if (catalog == NULL) {
356 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
357 catalog_hash.ToString().c_str());
358 return false;
359 }
360 const bool retval = PullRecursion(catalog, path);
361 delete catalog;
362 return retval;
363 }
364
365 LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
366 return true;
367 }
368
369 // Check if the catalog matches the pathfilter
370 if (path != "" && // necessary to load the root catalog
371 pathfilter && !pathfilter->IsMatching(path)) {
372 LogCvmfs(kLogCvmfs, kLogStdout,
373 " Catalog in '%s' does not match"
374 " the path specification",
375 path.c_str());
376 return true;
377 }
378
379 const int64_t gauge_chunks = atomic_read64(&overall_chunks);
380 const int64_t gauge_new = atomic_read64(&overall_new);
381
382 // Download and uncompress catalog
383 shash::Any chunk_hash;
384 zlib::Algorithms compression_alg;
385 catalog::Catalog *catalog = NULL;
386 string file_catalog;
387 string file_catalog_vanilla;
388 FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
389 &file_catalog);
390 if (!fcatalog) {
391 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
392 return false;
393 }
394 fclose(fcatalog);
395 FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
396 &file_catalog_vanilla);
397 if (!fcatalog_vanilla) {
398 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
399 unlink(file_catalog.c_str());
400 return false;
401 }
402 const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
403 cvmfs::FileSink filesink(fcatalog_vanilla);
404 download::JobInfo download_catalog(&url_catalog, false, false, &catalog_hash,
405 &filesink);
406 dl_retval = download_manager()->Fetch(&download_catalog);
407 fclose(fcatalog_vanilla);
408 if (dl_retval != download::kFailOk) {
409 const bool genuinely_missing = (dl_retval == download::kFailHostHttp)
410 && (download_catalog.http_code() == 404);
411 if (is_historic_catalog && is_garbage_collectable && genuinely_missing) {
412 LogCvmfs(kLogCvmfs, kLogStdout,
413 "skipping missing catalog %s - "
414 "probably swept by garbage collection",
415 catalog_hash.ToString().c_str());
416 goto pull_skip;
417 } else {
418 ReportDownloadError(download_catalog);
419 goto pull_cleanup;
420 }
421 }
422 retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
423 if (!retval) {
424 LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
425 file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
426 goto pull_cleanup;
427 }
428 if (path.empty() && reflog != NULL) {
429 if (!reflog->AddCatalog(catalog_hash)) {
430 LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
431 goto pull_cleanup;
432 }
433 }
434
435 catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
436 if (catalog == NULL) {
437 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
438 catalog_hash.ToString().c_str());
439 goto pull_cleanup;
440 }
441
442 // Always pull the HEAD root catalog and nested catalogs
443 if (apply_timestamp_threshold && (path == "")
444 && (catalog->GetLastModified() < timestamp_threshold)) {
445 LogCvmfs(kLogCvmfs, kLogStdout,
446 " Pruning at root catalog from %s due to threshold at %s",
447 StringifyTime(catalog->GetLastModified(), false).c_str(),
448 StringifyTime(timestamp_threshold, false).c_str());
449 delete catalog;
450 goto pull_skip;
451 }
452 apply_timestamp_threshold = true;
453
454 // Traverse the chunks. In partial replication mode, excluded subtrees are
455 // pruned before reaching this point (see PullRecursion), so any catalog we
456 // pull here has its objects replicated. Ancestor catalogs on the way to an
457 // included subtree are replicated in full, which keeps navigation intact.
458 {
459 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak,
460 " Processing chunks [%" PRIu64 " registered chunks]: ",
461 catalog->GetNumChunks());
462 retval = catalog->AllChunksBegin();
463 if (!retval) {
464 LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
465 goto pull_cleanup;
466 }
467 while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
468 ChunkJob next_chunk(chunk_hash, compression_alg);
469 WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
470 atomic_inc64(&chunk_queue);
471 }
472 catalog->AllChunksEnd();
473 while (atomic_read64(&chunk_queue) != 0) {
474 SafeSleepMs(100);
475 }
476 LogCvmfs(kLogCvmfs, kLogStdout,
477 " fetched %" PRId64 " new chunks out of "
478 "%" PRId64 " unique chunks",
479 atomic_read64(&overall_new) - gauge_new,
480 atomic_read64(&overall_chunks) - gauge_chunks);
481 }
482
483 retval = PullRecursion(catalog, path);
484 previous_catalog = catalog->GetPreviousRevision();
485 delete catalog;
486 unlink(file_catalog.c_str());
487
488
489 WaitForStorage();
490 if (!retval)
491 return false;
492 Store(file_catalog_vanilla, catalog_hash);
493 return true;
494
495 pull_cleanup:
496 delete catalog;
497 unlink(file_catalog.c_str());
498 unlink(file_catalog_vanilla.c_str());
499 return false;
500
501 pull_skip:
502 unlink(file_catalog.c_str());
503 unlink(file_catalog_vanilla.c_str());
504 return true;
505 }
506
507
508 int swissknife::CommandPull::Main(const swissknife::ArgumentList &args) {
509 int retval;
510 manifest::Failures m_retval;
511 download::Failures dl_retval;
512 unsigned timeout = 60;
513 const int fd_lockfile = -1;
514 string spooler_definition_str;
515 manifest::ManifestEnsemble ensemble;
516 shash::Any meta_info_hash;
517 string meta_info;
518 shash::Any previous_catalog_hash;
519 shash::Any current_catalog_hash;
520
521 // Option parsing
522 if (args.find('c') != args.end())
523 preload_cache = true;
524 if (args.find('l') != args.end()) {
525 const unsigned log_level = kLogLevel0
526 << String2Uint64(*args.find('l')->second);
527 if (log_level > kLogNone) {
528 LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
529 return 1;
530 }
531 SetLogVerbosity(static_cast<LogLevels>(log_level));
532 }
533 stratum0_url = args.find('u')->second;
534 temp_dir = args.find('x')->second;
535 if (preload_cache) {
536 preload_cachedir = new string(*args.find('r')->second);
537 } else {
538 spooler_definition_str = *args.find('r')->second;
539 }
540 string master_keys = *args.find('k')->second;
541 if (DirectoryExists(master_keys))
542 master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
543 const string repository_name = *args.find('m')->second;
544 if (args.find('n') != args.end())
545 num_parallel = String2Uint64(*args.find('n')->second);
546 if (args.find('t') != args.end())
547 timeout = String2Uint64(*args.find('t')->second);
548 if (args.find('a') != args.end())
549 retries = String2Uint64(*args.find('a')->second);
550 if (args.find('d') != args.end()) {
551 pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
552 assert(pathfilter->IsValid());
553 }
554 if (args.find('E') != args.end()) {
555 if (pathfilter != NULL) {
556 LogCvmfs(kLogCvmfs, kLogStderr,
557 "Options -d and -E are mutually exclusive");
558 return 1;
559 }
560 inclusion_spec =
561 catalog::InclusionSpec::Create(*args.find('E')->second);
562 if (inclusion_spec == NULL || !inclusion_spec->IsValid()) {
563 LogCvmfs(kLogCvmfs, kLogStderr,
564 "Failed to parse inclusion spec from '%s'",
565 args.find('E')->second->c_str());
566 return 1;
567 }
568 LogCvmfs(kLogCvmfs, kLogStdout,
569 "CernVM-FS: partial replication with inclusion spec (version %d)",
570 inclusion_spec->version());
571 }
572 if (args.find('p') != args.end())
573 pull_history = true;
574 if (args.find('z') != args.end())
575 inspect_existing_catalogs = true;
576 if (args.find('w') != args.end())
577 stratum1_url = args.find('w')->second;
578 if (args.find('i') != args.end())
579 initial_snapshot = true;
580 shash::Any reflog_hash;
581 string reflog_chksum_path;
582 if (args.find('R') != args.end()) {
583 reflog_chksum_path = *args.find('R')->second;
584 if (!initial_snapshot) {
585 if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
586 LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
587 return 1;
588 }
589 }
590 }
591 if (args.find('Z') != args.end()) {
592 timestamp_threshold = String2Int64(*args.find('Z')->second);
593 }
594
595 if (!preload_cache && stratum1_url.Get() == NULL) {
596 LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
597 return 1;
598 }
599
600 typedef std::vector<history::History::Tag> TagVector;
601 TagVector historic_tags;
602
603 LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
604 stratum0_url->c_str());
605
606 int result = 1;
607
608 // Initialization
609 atomic_init64(&overall_chunks);
610 atomic_init64(&overall_new);
611 atomic_init64(&chunk_queue);
612
613 const bool follow_redirects = false;
614 const unsigned max_pool_handles = num_parallel + 1;
615 const string proxy = (args.find('@') != args.end()) ? *args.find('@')->second
616 : "";
617
618 if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
619 return 1;
620 }
621
622 if (!this->InitSignatureManager(master_keys)) {
623 LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures");
624 return 1;
625 } else {
626 LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: using public key(s) %s",
627 JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
628 }
629
630 unsigned current_group;
631 vector<vector<download::DownloadManager::ProxyInfo> > proxies;
632 download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
633 if (proxies.size() > 0) {
634 string proxy_str = "\nWarning, replicating through proxies\n";
635 proxy_str += " Load-balance groups:\n";
636 for (unsigned i = 0; i < proxies.size(); ++i) {
637 vector<string> urls;
638 for (unsigned j = 0; j < proxies[i].size(); ++j) {
639 urls.push_back(proxies[i][j].url);
640 }
641 proxy_str += " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ")
642 + "\n";
643 }
644 proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] "
645 + proxies[current_group][0].url;
646 LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
647 }
648 download_manager()->SetTimeout(timeout, timeout);
649 download_manager()->SetRetryParameters(retries, 500, 2000);
650 download_manager()->Spawn();
651
652 // init the download helper
653 const ObjectFetcher object_fetcher(repository_name, *stratum0_url, *temp_dir,
654 download_manager(), signature_manager());
655
656 pthread_t *workers = reinterpret_cast<pthread_t *>(
657 smalloc(sizeof(pthread_t) * num_parallel));
658
659 // Check if we have a replica-ready server
660 const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
661 download::JobInfo download_sentinel(&url_sentinel, false);
662 retval = download_manager()->Fetch(&download_sentinel);
663 if (retval != download::kFailOk) {
664 if (download_sentinel.http_code() == 404) {
665 LogCvmfs(kLogCvmfs, kLogStderr,
666 "This is not a CernVM-FS server for replication");
667 } else {
668 LogCvmfs(kLogCvmfs, kLogStderr,
669 "Failed to contact stratum 0 server (%d - %s)", retval,
670 download::Code2Ascii(download_sentinel.error_code()));
671 }
672 goto fini;
673 }
674
675 m_retval = FetchRemoteManifestEnsemble(*stratum0_url, repository_name,
676 &ensemble);
677 if (m_retval != manifest::kFailOk) {
678 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
679 m_retval, manifest::Code2Ascii(m_retval));
680 goto fini;
681 }
682
683 // Get meta info
684 meta_info_hash = ensemble.manifest->meta_info();
685 if (!meta_info_hash.IsNull()) {
686 meta_info_hash = ensemble.manifest->meta_info();
687 const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
688 cvmfs::MemSink metainfo_memsink;
689 download::JobInfo download_metainfo(&url, true, false, &meta_info_hash,
690 &metainfo_memsink);
691 dl_retval = download_manager()->Fetch(&download_metainfo);
692 if (dl_retval != download::kFailOk) {
693 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
694 dl_retval, download::Code2Ascii(dl_retval));
695 goto fini;
696 }
697 meta_info = string(reinterpret_cast<char *>(metainfo_memsink.data()),
698 metainfo_memsink.pos());
699 }
700
701 is_garbage_collectable = ensemble.manifest->garbage_collectable();
702
703 // Manifest available, now the spooler's hash algorithm can be determined
704 // That doesn't actually matter because the replication does no re-hashing
705 if (!preload_cache) {
706 const upload::SpoolerDefinition spooler_definition(
707 spooler_definition_str, ensemble.manifest->GetHashAlgorithm());
708 spooler = upload::Spooler::Construct(spooler_definition);
709 assert(spooler);
710 spooler->RegisterListener(&SpoolerOnUpload);
711 }
712
713 // Open the reflog for modification
714 if (!preload_cache) {
715 if (initial_snapshot) {
716 LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
717 repository_name.c_str());
718 reflog = CreateEmptyReflog(*temp_dir, repository_name);
719 if (reflog == NULL) {
720 LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
721 goto fini;
722 }
723 } else {
724 ObjectFetcher object_fetcher_stratum1(repository_name,
725 *stratum1_url,
726 *temp_dir,
727 download_manager(),
728 signature_manager());
729
730 if (!reflog_hash.IsNull()) {
731 reflog = FetchReflog(&object_fetcher_stratum1, repository_name,
732 reflog_hash);
733 assert(reflog != NULL);
734 } else {
735 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
736 if (spooler->Peek(".cvmfsreflog")) {
737 LogCvmfs(kLogCvmfs, kLogStderr,
738 "no reflog hash specified but reflog is present");
739 goto fini;
740 }
741 }
742 }
743
744 if (reflog != NULL) {
745 reflog->BeginTransaction();
746 // On commit: use manifest's hash algorithm
747 reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
748 }
749 }
750
751 // Fetch tag list.
752 // If we are just preloading the cache it is not strictly necessarily to
753 // download the entire tag list
754 // TODO(molina): add user option to download tags when preloading the cache
755 if (!ensemble.manifest->history().IsNull() && !preload_cache) {
756 const shash::Any history_hash = ensemble.manifest->history();
757 const string history_url = *stratum0_url + "/data/"
758 + history_hash.MakePath();
759 const string history_path = *temp_dir + "/" + history_hash.ToString();
760
761 cvmfs::PathSink pathsink(history_path);
762 download::JobInfo download_history(&history_url, false, false,
763 &history_hash, &pathsink);
764 dl_retval = download_manager()->Fetch(&download_history);
765 if (dl_retval != download::kFailOk) {
766 ReportDownloadError(download_history);
767 goto fini;
768 }
769 const std::string history_db_path = history_path + ".uncompressed";
770 retval = zlib::DecompressPath2Path(history_path, history_db_path);
771 assert(retval);
772 history::History *tag_db = history::SqliteHistory::Open(history_db_path);
773 if (NULL == tag_db) {
774 LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
775 history_db_path.c_str());
776 unlink(history_db_path.c_str());
777 goto fini;
778 }
779 retval = tag_db->List(&historic_tags);
780 delete tag_db;
781 unlink(history_db_path.c_str());
782 if (!retval) {
783 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
784 history_db_path.c_str());
785 goto fini;
786 }
787
788 LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots",
789 historic_tags.size());
790 // TODO(jblomer): We should repliacte the previous history dbs, too,
791 // in order to avoid races on fail-over between non-synchronized stratum 1s
792 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
793 Store(history_path, history_hash);
794 WaitForStorage();
795 unlink(history_path.c_str());
796 if (reflog != NULL && !reflog->AddHistory(history_hash)) {
797 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
798 goto fini;
799 }
800 }
801
802 // Starting threads
803 MakePipe(pipe_chunks);
804 LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
805 MainWorkerContext mwc;
806 mwc.download_manager = download_manager();
807 for (unsigned i = 0; i < num_parallel; ++i) {
808 const int retval = pthread_create(&workers[i], NULL, MainWorker,
809 static_cast<void *>(&mwc));
810 assert(retval == 0);
811 }
812
813 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
814 current_catalog_hash = ensemble.manifest->catalog_hash();
815 retval = 1;
816 {
817 bool is_historic = false;
818 do {
819 retval = static_cast<int>(
820 Pull(current_catalog_hash, "", previous_catalog_hash, is_historic)
821 && (retval != 0));
822 if (pull_history && !previous_catalog_hash.IsNull()) {
823 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
824 previous_catalog_hash.ToString().c_str());
825 }
826 current_catalog_hash = previous_catalog_hash;
827 is_historic = true;
828 } while (pull_history && !previous_catalog_hash.IsNull());
829 }
830
831 if (!historic_tags.empty()) {
832 LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
833 }
834 for (TagVector::const_iterator i = historic_tags.begin(),
835 iend = historic_tags.end();
836 i != iend;
837 ++i) {
838 if (Peek(i->root_hash))
839 continue;
840 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
841 i->name.c_str());
842 apply_timestamp_threshold = false;
843
844 bool retval2 = true;
845 current_catalog_hash = i->root_hash;
846 do {
847 retval2 = Pull(current_catalog_hash, "", previous_catalog_hash,
848 true /* is_historic_catalog */)
849 && retval2;
850 if (pull_history && !previous_catalog_hash.IsNull()) {
851 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
852 previous_catalog_hash.ToString().c_str());
853 }
854 current_catalog_hash = previous_catalog_hash;
855 } while (pull_history && !previous_catalog_hash.IsNull());
856 retval = retval && retval2;
857 }
858
859 // Stopping threads
860 LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
861 for (unsigned i = 0; i < num_parallel; ++i) {
862 ChunkJob terminate_workers;
863 WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
864 }
865 for (unsigned i = 0; i < num_parallel; ++i) {
866 int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
867 assert(retval == 0);
868 }
869 ClosePipe(pipe_chunks);
870
871 if (!retval)
872 goto fini;
873
874 WaitForStorage();
875 if (!Peek(ensemble.manifest->catalog_hash())) {
876 LogCvmfs(kLogCvmfs, kLogStderr,
877 "root catalog %s is missing from the destination - refusing to "
878 "publish an incomplete revision",
879 ensemble.manifest->catalog_hash().ToString().c_str());
880 goto fini;
881 }
882
883 // Upload manifest ensemble
884 {
885 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
886 WaitForStorage();
887
888 if (!Peek(ensemble.manifest->certificate())) {
889 StoreBuffer(ensemble.cert_buf, ensemble.cert_size,
890 ensemble.manifest->certificate(), true);
891 }
892 if (reflog != NULL
893 && !reflog->AddCertificate(ensemble.manifest->certificate())) {
894 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
895 goto fini;
896 }
897 if (!meta_info_hash.IsNull()) {
898 const unsigned char *info = reinterpret_cast<const unsigned char *>(
899 meta_info.data());
900 StoreBuffer(info, meta_info.size(), meta_info_hash, true);
901 if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
902 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
903 goto fini;
904 }
905 }
906
907 // Create alternative bootstrapping symlinks for VOMS secured repos
908 if (ensemble.manifest->has_alt_catalog_path()) {
909 const bool success = spooler->PlaceBootstrappingShortcut(
910 ensemble.manifest->certificate())
911 && spooler->PlaceBootstrappingShortcut(
912 ensemble.manifest->catalog_hash())
913 && (ensemble.manifest->history().IsNull()
914 || spooler->PlaceBootstrappingShortcut(
915 ensemble.manifest->history()))
916 && (meta_info_hash.IsNull()
917 || spooler->PlaceBootstrappingShortcut(
918 meta_info_hash));
919
920 if (!success) {
921 LogCvmfs(kLogCvmfs, kLogStderr,
922 "failed to place root catalog bootstrapping symlinks");
923 goto fini;
924 }
925 }
926
927 // upload Reflog database
928 if (!preload_cache && reflog != NULL) {
929 reflog->CommitTransaction();
930 reflog->DropDatabaseFileOwnership();
931 const string reflog_path = reflog->database_file();
932 delete reflog;
933 manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
934 WaitForStorage(); // Reduce the duration of reflog /wo checksum
935 spooler->UploadReflog(reflog_path);
936 spooler->WaitForUpload();
937 unlink(reflog_path.c_str());
938 if (spooler->GetNumberOfErrors()) {
939 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
940 spooler->GetNumberOfErrors());
941 goto fini;
942 }
943 assert(!reflog_chksum_path.empty());
944 manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
945 }
946
947 if (preload_cache) {
948 const bool retval = ensemble.manifest->ExportBreadcrumb(*preload_cachedir,
949 0660);
950 assert(retval);
951 } else {
952 // pkcs#7 structure contains content + certificate + signature
953 // So there is no race with whitelist and pkcs7 signature being out of
954 // sync
955 if (ensemble.whitelist_pkcs7_buf) {
956 StoreBuffer(ensemble.whitelist_pkcs7_buf, ensemble.whitelist_pkcs7_size,
957 ".cvmfswhitelist.pkcs7", false);
958 }
959 StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
960 ".cvmfswhitelist", false);
961 StoreBuffer(ensemble.raw_manifest_buf, ensemble.raw_manifest_size,
962 ".cvmfspublished", false);
963
964 // Upload the partial replication spec to the backend
965 if (inclusion_spec != NULL) {
966 const std::string &spec_content = inclusion_spec->content();
967 StoreBuffer(
968 reinterpret_cast<const unsigned char *>(spec_content.data()),
969 spec_content.size(),
970 ".cvmfs_partial_replication", false);
971 LogCvmfs(kLogCvmfs, kLogStdout,
972 "Uploaded partial replication spec to backend");
973 }
974 }
975 LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64,
976 ensemble.manifest->revision());
977 }
978
979 WaitForStorage();
980 LogCvmfs(kLogCvmfs, kLogStdout,
981 "Fetched %" PRId64 " new chunks out of %" PRId64 " processed chunks",
982 atomic_read64(&overall_new), atomic_read64(&overall_chunks));
983 result = 0;
984
985 fini:
986 if (fd_lockfile >= 0)
987 UnlockFile(fd_lockfile);
988 free(workers);
989 delete spooler;
990 delete pathfilter;
991 delete inclusion_spec;
992 inclusion_spec = NULL;
993 return result;
994 }
995
996 } // namespace swissknife
997