GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_pull.cc
Date: 2026-05-19 11:45:12
Exec Total Coverage
Lines: 0 494 0.0%
Branches: 0 350 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Replicates a cvmfs repository. Uses the cvmfs intrinsic Merkle trees
5 * to calculate the difference set.
6 */
7
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10
11
12 #include "swissknife_pull.h"
13
14 #include <inttypes.h>
15 #include <pthread.h>
16 #include <sys/stat.h>
17 #include <unistd.h>
18
19 #include <cstdlib>
20 #include <cstring>
21 #include <string>
22 #include <vector>
23
24 #include "catalog.h"
25 #include "compression/compression.h"
26 #include "crypto/hash.h"
27 #include "history_sqlite.h"
28 #include "manifest.h"
29 #include "manifest_fetch.h"
30 #include "network/download.h"
31 #include "network/sink_path.h"
32 #include "network/sink_mem.h"
33 #include "object_fetcher.h"
34 #include "path_filters/relaxed_path_filter.h"
35 #include "reflog.h"
36 #include "upload.h"
37 #include "util/atomic.h"
38 #include "util/exception.h"
39 #include "util/logging.h"
40 #include "util/posix.h"
41 #include "util/shared_ptr.h"
42 #include "util/smalloc.h"
43 #include "util/string.h"
44
45 using namespace std; // NOLINT
46
47 namespace swissknife {
48
49 namespace {
50
51 typedef HttpObjectFetcher<> ObjectFetcher;
52
53 /**
54 * This just stores an shash::Any in a predictable way to send it through a
55 * POSIX pipe.
56 */
57 class ChunkJob {
58 public:
59 ChunkJob()
60 : suffix(shash::kSuffixNone)
61 , hash_algorithm(shash::kAny)
62 , compression_alg(zlib::kZlibDefault) { }
63
64 ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
65 : suffix(hash.suffix)
66 , hash_algorithm(hash.algorithm)
67 , compression_alg(compression_alg) {
68 memcpy(digest, hash.digest, hash.GetDigestSize());
69 }
70
71 bool IsTerminateJob() const { return (hash_algorithm == shash::kAny); }
72
73 shash::Any hash() const {
74 assert(!IsTerminateJob());
75 return shash::Any(hash_algorithm, digest, suffix);
76 }
77
78 const shash::Suffix suffix;
79 const shash::Algorithms hash_algorithm;
80 const zlib::Algorithms compression_alg;
81 unsigned char digest[shash::kMaxDigestSize];
82 };
83
84 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
85 unlink(result.local_path.c_str());
86 if (result.return_code != 0) {
87 PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
88 result.local_path.c_str(), result.content_hash.ToString().c_str());
89 }
90 }
91
92 SharedPtr<string> stratum0_url;
93 SharedPtr<string> stratum1_url;
94 SharedPtr<string> temp_dir;
95 unsigned num_parallel = 1;
96 bool pull_history = false;
97 bool apply_timestamp_threshold = false;
98 uint64_t timestamp_threshold = 0;
99 bool is_garbage_collectable = false;
100 bool initial_snapshot = false;
101 upload::Spooler *spooler = NULL;
102 int pipe_chunks[2];
103 // required for concurrent reading
104 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
105 unsigned retries = 3;
106 catalog::RelaxedPathFilter *pathfilter = NULL;
107 atomic_int64 overall_chunks;
108 atomic_int64 overall_new;
109 atomic_int64 chunk_queue;
110 bool preload_cache = false;
111 string *preload_cachedir = NULL;
112 bool inspect_existing_catalogs = false;
113 manifest::Reflog *reflog = NULL;
114
115 } // anonymous namespace
116
117
118 static std::string MakePath(const shash::Any &hash) {
119 return (preload_cache)
120 ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
121 : "data/" + hash.MakePath();
122 }
123
124
125 static bool Peek(const string &remote_path) {
126 return (preload_cache) ? FileExists(remote_path) : spooler->Peek(remote_path);
127 }
128
129 static bool Peek(const shash::Any &remote_hash) {
130 return Peek(MakePath(remote_hash));
131 }
132
133 static void ReportDownloadError(const download::JobInfo &download_job) {
134 const download::Failures error_code = download_job.error_code();
135 const int http_code = download_job.http_code();
136 const std::string url = *download_job.url();
137
138 LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
139 url.c_str(), error_code, download::Code2Ascii(error_code));
140
141 switch (error_code) {
142 case download::kFailProxyResolve:
143 case download::kFailHostResolve:
144 LogCvmfs(kLogCvmfs, kLogStderr,
145 "DNS lookup for Stratum 0 failed - "
146 "please check the network connection");
147 break;
148
149 case download::kFailHostHttp:
150 LogCvmfs(kLogCvmfs, kLogStderr,
151 "unexpected HTTP error code %d - "
152 "please check the stratum 0 health",
153 http_code);
154 break;
155
156 case download::kFailBadData:
157 LogCvmfs(kLogCvmfs, kLogStderr,
158 "downloaded corrupted data - "
159 "please check the stratum 0 health");
160 break;
161
162 default:
163 if (download::IsProxyTransferError(error_code)
164 || download::IsHostTransferError(error_code)) {
165 LogCvmfs(kLogCvmfs, kLogStderr,
166 "couldn't reach Stratum 0 - "
167 "please check the network connection");
168 } else {
169 LogCvmfs(kLogCvmfs, kLogStderr,
170 "unexpected error - feel free to file "
171 "a bug report");
172 }
173 }
174 }
175
176
177 static void Store(const string &local_path,
178 const string &remote_path,
179 const bool compressed_src) {
180 if (preload_cache) {
181 if (!compressed_src) {
182 const int retval = rename(local_path.c_str(), remote_path.c_str());
183 if (retval != 0) {
184 PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
185 remote_path.c_str());
186 }
187 } else {
188 // compressed input
189 string tmp_dest;
190 FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
191 if (fdest == NULL) {
192 PANIC(kLogStderr, "Failed to create temporary file '%s'",
193 remote_path.c_str());
194 }
195 int retval = zlib::DecompressPath2File(local_path, fdest);
196 if (!retval) {
197 PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
198 remote_path.c_str());
199 }
200 fclose(fdest);
201 retval = rename(tmp_dest.c_str(), remote_path.c_str());
202 assert(retval == 0);
203 unlink(local_path.c_str());
204 }
205 } else {
206 spooler->Upload(local_path, remote_path);
207 }
208 }
209
210 static void Store(const string &local_path,
211 const shash::Any &remote_hash,
212 const bool compressed_src = true) {
213 Store(local_path, MakePath(remote_hash), compressed_src);
214 }
215
216
217 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
218 const std::string &dest_path, const bool compress) {
219 string tmp_file;
220 FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
221 assert(ftmp);
222 int retval;
223 if (compress) {
224 shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused
225 retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
226 } else {
227 retval = CopyMem2File(buffer, size, ftmp);
228 }
229 assert(retval);
230 fclose(ftmp);
231 Store(tmp_file, dest_path, true);
232 }
233
234 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
235 const shash::Any &dest_hash, const bool compress) {
236 StoreBuffer(buffer, size, MakePath(dest_hash), compress);
237 }
238
239
240 static void WaitForStorage() {
241 if (!preload_cache)
242 spooler->WaitForUpload();
243 }
244
245
246 struct MainWorkerContext {
247 download::DownloadManager *download_manager;
248 };
249
250 static void *MainWorker(void *data) {
251 MainWorkerContext *mwc = static_cast<MainWorkerContext *>(data);
252 download::DownloadManager *download_manager = mwc->download_manager;
253
254 while (1) {
255 ChunkJob next_chunk;
256 {
257 const MutexLockGuard m(&lock_pipe);
258 ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
259 }
260 if (next_chunk.IsTerminateJob())
261 break;
262
263 const shash::Any chunk_hash = next_chunk.hash();
264 const zlib::Algorithms compression_alg = next_chunk.compression_alg;
265 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
266 chunk_hash.ToString().c_str());
267
268 if (!Peek(chunk_hash)) {
269 string tmp_file;
270 FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
271 const string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
272 cvmfs::FileSink filesink(fchunk);
273 download::JobInfo download_chunk(&url_chunk, false, false, &chunk_hash,
274 &filesink);
275
276 const download::Failures download_result = download_manager->Fetch(
277 &download_chunk);
278 if (download_result != download::kFailOk) {
279 ReportDownloadError(download_chunk);
280 PANIC(kLogStderr, "Download error");
281 }
282 fclose(fchunk);
283 Store(tmp_file, chunk_hash,
284 (compression_alg == zlib::kZlibDefault) ? true : false);
285 atomic_inc64(&overall_new);
286 }
287 if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
288 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, ".");
289 atomic_dec64(&chunk_queue);
290 }
291 return NULL;
292 }
293
294
295 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
296 const std::string &path) {
297 assert(catalog);
298
299
300 // Nested catalogs (in a nested code block because goto fail...)
301 {
302 const catalog::Catalog::NestedCatalogList
303 nested_catalogs = catalog->ListOwnNestedCatalogs();
304 for (catalog::Catalog::NestedCatalogList::const_iterator
305 i = nested_catalogs.begin(),
306 iEnd = nested_catalogs.end();
307 i != iEnd;
308 ++i) {
309 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
310 i->mountpoint.c_str());
311 shash::Any previous_catalog_hash; // expected to be null for subcatalog
312 const bool retval = Pull(i->hash, i->mountpoint.ToString(),
313 previous_catalog_hash);
314 if (!retval)
315 return false;
316 }
317 }
318
319 return true;
320 }
321
322 bool CommandPull::Pull(const shash::Any &catalog_hash,
323 const std::string &path,
324 shash::Any &previous_catalog) {
325 int retval;
326 download::Failures dl_retval;
327 assert(shash::kSuffixCatalog == catalog_hash.suffix);
328 previous_catalog.SetNull();
329
330 // Check if the catalog already exists
331 if (Peek(catalog_hash)) {
332 // Preload: dirtab changed
333 if (inspect_existing_catalogs) {
334 if (!preload_cache) {
335 PANIC(kLogStderr, "to be implemented: -t without -c");
336 }
337 catalog::Catalog *catalog = catalog::Catalog::AttachFreely(
338 path, MakePath(catalog_hash), catalog_hash);
339 if (catalog == NULL) {
340 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
341 catalog_hash.ToString().c_str());
342 return false;
343 }
344 const bool retval = PullRecursion(catalog, path);
345 delete catalog;
346 return retval;
347 }
348
349 LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
350 return true;
351 }
352
353 // Check if the catalog matches the pathfilter
354 if (path != "" && // necessary to load the root catalog
355 pathfilter && !pathfilter->IsMatching(path)) {
356 LogCvmfs(kLogCvmfs, kLogStdout,
357 " Catalog in '%s' does not match"
358 " the path specification",
359 path.c_str());
360 return true;
361 }
362
363 const int64_t gauge_chunks = atomic_read64(&overall_chunks);
364 const int64_t gauge_new = atomic_read64(&overall_new);
365
366 // Download and uncompress catalog
367 shash::Any chunk_hash;
368 zlib::Algorithms compression_alg;
369 catalog::Catalog *catalog = NULL;
370 string file_catalog;
371 string file_catalog_vanilla;
372 FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
373 &file_catalog);
374 if (!fcatalog) {
375 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
376 return false;
377 }
378 fclose(fcatalog);
379 FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
380 &file_catalog_vanilla);
381 if (!fcatalog_vanilla) {
382 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
383 unlink(file_catalog.c_str());
384 return false;
385 }
386 const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
387 cvmfs::FileSink filesink(fcatalog_vanilla);
388 download::JobInfo download_catalog(&url_catalog, false, false, &catalog_hash,
389 &filesink);
390 dl_retval = download_manager()->Fetch(&download_catalog);
391 fclose(fcatalog_vanilla);
392 if (dl_retval != download::kFailOk) {
393 if (path == "" && is_garbage_collectable) {
394 LogCvmfs(kLogCvmfs, kLogStdout,
395 "skipping missing root catalog %s - "
396 "probably sweeped by garbage collection",
397 catalog_hash.ToString().c_str());
398 goto pull_skip;
399 } else {
400 ReportDownloadError(download_catalog);
401 goto pull_cleanup;
402 }
403 }
404 retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
405 if (!retval) {
406 LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
407 file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
408 goto pull_cleanup;
409 }
410 if (path.empty() && reflog != NULL) {
411 if (!reflog->AddCatalog(catalog_hash)) {
412 LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
413 goto pull_cleanup;
414 }
415 }
416
417 catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
418 if (catalog == NULL) {
419 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
420 catalog_hash.ToString().c_str());
421 goto pull_cleanup;
422 }
423
424 // Always pull the HEAD root catalog and nested catalogs
425 if (apply_timestamp_threshold && (path == "")
426 && (catalog->GetLastModified() < timestamp_threshold)) {
427 LogCvmfs(kLogCvmfs, kLogStdout,
428 " Pruning at root catalog from %s due to threshold at %s",
429 StringifyTime(catalog->GetLastModified(), false).c_str(),
430 StringifyTime(timestamp_threshold, false).c_str());
431 delete catalog;
432 goto pull_skip;
433 }
434 apply_timestamp_threshold = true;
435
436 // Traverse the chunks
437 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak,
438 " Processing chunks [%" PRIu64 " registered chunks]: ",
439 catalog->GetNumChunks());
440 retval = catalog->AllChunksBegin();
441 if (!retval) {
442 LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
443 goto pull_cleanup;
444 }
445 while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
446 ChunkJob next_chunk(chunk_hash, compression_alg);
447 WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
448 atomic_inc64(&chunk_queue);
449 }
450 catalog->AllChunksEnd();
451 while (atomic_read64(&chunk_queue) != 0) {
452 SafeSleepMs(100);
453 }
454 LogCvmfs(kLogCvmfs, kLogStdout,
455 " fetched %" PRId64 " new chunks out of "
456 "%" PRId64 " unique chunks",
457 atomic_read64(&overall_new) - gauge_new,
458 atomic_read64(&overall_chunks) - gauge_chunks);
459
460 retval = PullRecursion(catalog, path);
461 previous_catalog = catalog->GetPreviousRevision();
462 delete catalog;
463 unlink(file_catalog.c_str());
464
465
466 WaitForStorage();
467 if (!retval)
468 return false;
469 Store(file_catalog_vanilla, catalog_hash);
470 return true;
471
472 pull_cleanup:
473 delete catalog;
474 unlink(file_catalog.c_str());
475 unlink(file_catalog_vanilla.c_str());
476 return false;
477
478 pull_skip:
479 unlink(file_catalog.c_str());
480 unlink(file_catalog_vanilla.c_str());
481 return true;
482 }
483
484
485 int swissknife::CommandPull::Main(const swissknife::ArgumentList &args) {
486 int retval;
487 manifest::Failures m_retval;
488 download::Failures dl_retval;
489 unsigned timeout = 60;
490 const int fd_lockfile = -1;
491 string spooler_definition_str;
492 manifest::ManifestEnsemble ensemble;
493 shash::Any meta_info_hash;
494 string meta_info;
495 shash::Any previous_catalog_hash;
496 shash::Any current_catalog_hash;
497
498 // Option parsing
499 if (args.find('c') != args.end())
500 preload_cache = true;
501 if (args.find('l') != args.end()) {
502 const unsigned log_level = kLogLevel0
503 << String2Uint64(*args.find('l')->second);
504 if (log_level > kLogNone) {
505 LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
506 return 1;
507 }
508 SetLogVerbosity(static_cast<LogLevels>(log_level));
509 }
510 stratum0_url = args.find('u')->second;
511 temp_dir = args.find('x')->second;
512 if (preload_cache) {
513 preload_cachedir = new string(*args.find('r')->second);
514 } else {
515 spooler_definition_str = *args.find('r')->second;
516 }
517 string master_keys = *args.find('k')->second;
518 if (DirectoryExists(master_keys))
519 master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
520 const string repository_name = *args.find('m')->second;
521 if (args.find('n') != args.end())
522 num_parallel = String2Uint64(*args.find('n')->second);
523 if (args.find('t') != args.end())
524 timeout = String2Uint64(*args.find('t')->second);
525 if (args.find('a') != args.end())
526 retries = String2Uint64(*args.find('a')->second);
527 if (args.find('d') != args.end()) {
528 pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
529 assert(pathfilter->IsValid());
530 }
531 if (args.find('p') != args.end())
532 pull_history = true;
533 if (args.find('z') != args.end())
534 inspect_existing_catalogs = true;
535 if (args.find('w') != args.end())
536 stratum1_url = args.find('w')->second;
537 if (args.find('i') != args.end())
538 initial_snapshot = true;
539 shash::Any reflog_hash;
540 string reflog_chksum_path;
541 if (args.find('R') != args.end()) {
542 reflog_chksum_path = *args.find('R')->second;
543 if (!initial_snapshot) {
544 if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
545 LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
546 return 1;
547 }
548 }
549 }
550 if (args.find('Z') != args.end()) {
551 timestamp_threshold = String2Int64(*args.find('Z')->second);
552 }
553
554 if (!preload_cache && stratum1_url.Get() == NULL) {
555 LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
556 return 1;
557 }
558
559 typedef std::vector<history::History::Tag> TagVector;
560 TagVector historic_tags;
561
562 LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
563 stratum0_url->c_str());
564
565 int result = 1;
566
567 // Initialization
568 atomic_init64(&overall_chunks);
569 atomic_init64(&overall_new);
570 atomic_init64(&chunk_queue);
571
572 const bool follow_redirects = false;
573 const unsigned max_pool_handles = num_parallel + 1;
574 const string proxy = (args.find('@') != args.end()) ? *args.find('@')->second
575 : "";
576
577 if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
578 return 1;
579 }
580
581 if (!this->InitSignatureManager(master_keys)) {
582 LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures");
583 return 1;
584 } else {
585 LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: using public key(s) %s",
586 JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
587 }
588
589 unsigned current_group;
590 vector<vector<download::DownloadManager::ProxyInfo> > proxies;
591 download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
592 if (proxies.size() > 0) {
593 string proxy_str = "\nWarning, replicating through proxies\n";
594 proxy_str += " Load-balance groups:\n";
595 for (unsigned i = 0; i < proxies.size(); ++i) {
596 vector<string> urls;
597 for (unsigned j = 0; j < proxies[i].size(); ++j) {
598 urls.push_back(proxies[i][j].url);
599 }
600 proxy_str += " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ")
601 + "\n";
602 }
603 proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] "
604 + proxies[current_group][0].url;
605 LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
606 }
607 download_manager()->SetTimeout(timeout, timeout);
608 download_manager()->SetRetryParameters(retries, 500, 2000);
609 download_manager()->Spawn();
610
611 // init the download helper
612 const ObjectFetcher object_fetcher(repository_name, *stratum0_url, *temp_dir,
613 download_manager(), signature_manager());
614
615 pthread_t *workers = reinterpret_cast<pthread_t *>(
616 smalloc(sizeof(pthread_t) * num_parallel));
617
618 // Check if we have a replica-ready server
619 const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
620 download::JobInfo download_sentinel(&url_sentinel, false);
621 retval = download_manager()->Fetch(&download_sentinel);
622 if (retval != download::kFailOk) {
623 if (download_sentinel.http_code() == 404) {
624 LogCvmfs(kLogCvmfs, kLogStderr,
625 "This is not a CernVM-FS server for replication");
626 } else {
627 LogCvmfs(kLogCvmfs, kLogStderr,
628 "Failed to contact stratum 0 server (%d - %s)", retval,
629 download::Code2Ascii(download_sentinel.error_code()));
630 }
631 goto fini;
632 }
633
634 m_retval = FetchRemoteManifestEnsemble(*stratum0_url, repository_name,
635 &ensemble);
636 if (m_retval != manifest::kFailOk) {
637 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
638 m_retval, manifest::Code2Ascii(m_retval));
639 goto fini;
640 }
641
642 // Get meta info
643 meta_info_hash = ensemble.manifest->meta_info();
644 if (!meta_info_hash.IsNull()) {
645 meta_info_hash = ensemble.manifest->meta_info();
646 const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
647 cvmfs::MemSink metainfo_memsink;
648 download::JobInfo download_metainfo(&url, true, false, &meta_info_hash,
649 &metainfo_memsink);
650 dl_retval = download_manager()->Fetch(&download_metainfo);
651 if (dl_retval != download::kFailOk) {
652 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
653 dl_retval, download::Code2Ascii(dl_retval));
654 goto fini;
655 }
656 meta_info = string(reinterpret_cast<char *>(metainfo_memsink.data()),
657 metainfo_memsink.pos());
658 }
659
660 is_garbage_collectable = ensemble.manifest->garbage_collectable();
661
662 // Manifest available, now the spooler's hash algorithm can be determined
663 // That doesn't actually matter because the replication does no re-hashing
664 if (!preload_cache) {
665 const upload::SpoolerDefinition spooler_definition(
666 spooler_definition_str, ensemble.manifest->GetHashAlgorithm());
667 spooler = upload::Spooler::Construct(spooler_definition);
668 assert(spooler);
669 spooler->RegisterListener(&SpoolerOnUpload);
670 }
671
672 // Open the reflog for modification
673 if (!preload_cache) {
674 if (initial_snapshot) {
675 LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
676 repository_name.c_str());
677 reflog = CreateEmptyReflog(*temp_dir, repository_name);
678 if (reflog == NULL) {
679 LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
680 goto fini;
681 }
682 } else {
683 ObjectFetcher object_fetcher_stratum1(repository_name,
684 *stratum1_url,
685 *temp_dir,
686 download_manager(),
687 signature_manager());
688
689 if (!reflog_hash.IsNull()) {
690 reflog = FetchReflog(&object_fetcher_stratum1, repository_name,
691 reflog_hash);
692 assert(reflog != NULL);
693 } else {
694 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
695 if (spooler->Peek(".cvmfsreflog")) {
696 LogCvmfs(kLogCvmfs, kLogStderr,
697 "no reflog hash specified but reflog is present");
698 goto fini;
699 }
700 }
701 }
702
703 if (reflog != NULL) {
704 reflog->BeginTransaction();
705 // On commit: use manifest's hash algorithm
706 reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
707 }
708 }
709
710 // Fetch tag list.
711 // If we are just preloading the cache it is not strictly necessarily to
712 // download the entire tag list
713 // TODO(molina): add user option to download tags when preloading the cache
714 if (!ensemble.manifest->history().IsNull() && !preload_cache) {
715 const shash::Any history_hash = ensemble.manifest->history();
716 const string history_url = *stratum0_url + "/data/"
717 + history_hash.MakePath();
718 const string history_path = *temp_dir + "/" + history_hash.ToString();
719
720 cvmfs::PathSink pathsink(history_path);
721 download::JobInfo download_history(&history_url, false, false,
722 &history_hash, &pathsink);
723 dl_retval = download_manager()->Fetch(&download_history);
724 if (dl_retval != download::kFailOk) {
725 ReportDownloadError(download_history);
726 goto fini;
727 }
728 const std::string history_db_path = history_path + ".uncompressed";
729 retval = zlib::DecompressPath2Path(history_path, history_db_path);
730 assert(retval);
731 history::History *tag_db = history::SqliteHistory::Open(history_db_path);
732 if (NULL == tag_db) {
733 LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
734 history_db_path.c_str());
735 unlink(history_db_path.c_str());
736 goto fini;
737 }
738 retval = tag_db->List(&historic_tags);
739 delete tag_db;
740 unlink(history_db_path.c_str());
741 if (!retval) {
742 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
743 history_db_path.c_str());
744 goto fini;
745 }
746
747 LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots",
748 historic_tags.size());
749 // TODO(jblomer): We should repliacte the previous history dbs, too,
750 // in order to avoid races on fail-over between non-synchronized stratum 1s
751 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
752 Store(history_path, history_hash);
753 WaitForStorage();
754 unlink(history_path.c_str());
755 if (reflog != NULL && !reflog->AddHistory(history_hash)) {
756 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
757 goto fini;
758 }
759 }
760
761 // Starting threads
762 MakePipe(pipe_chunks);
763 LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
764 MainWorkerContext mwc;
765 mwc.download_manager = download_manager();
766 for (unsigned i = 0; i < num_parallel; ++i) {
767 const int retval = pthread_create(&workers[i], NULL, MainWorker,
768 static_cast<void *>(&mwc));
769 assert(retval == 0);
770 }
771
772 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
773 current_catalog_hash = ensemble.manifest->catalog_hash();
774 do {
775 retval = Pull(current_catalog_hash, "", previous_catalog_hash);
776 if (pull_history && !previous_catalog_hash.IsNull()) {
777 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
778 previous_catalog_hash.ToString().c_str());
779 }
780 current_catalog_hash = previous_catalog_hash;
781 } while (pull_history && !previous_catalog_hash.IsNull());
782
783 if (!historic_tags.empty()) {
784 LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
785 }
786 for (TagVector::const_iterator i = historic_tags.begin(),
787 iend = historic_tags.end();
788 i != iend;
789 ++i) {
790 if (Peek(i->root_hash))
791 continue;
792 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
793 i->name.c_str());
794 apply_timestamp_threshold = false;
795
796 bool retval2;
797 current_catalog_hash = i->root_hash;
798 do {
799 retval2 = Pull(current_catalog_hash, "", previous_catalog_hash);
800 if (pull_history && !previous_catalog_hash.IsNull()) {
801 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
802 previous_catalog_hash.ToString().c_str());
803 }
804 current_catalog_hash = previous_catalog_hash;
805 } while (pull_history && !previous_catalog_hash.IsNull());
806 retval = retval && retval2;
807 }
808
809 // Stopping threads
810 LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
811 for (unsigned i = 0; i < num_parallel; ++i) {
812 ChunkJob terminate_workers;
813 WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
814 }
815 for (unsigned i = 0; i < num_parallel; ++i) {
816 int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
817 assert(retval == 0);
818 }
819 ClosePipe(pipe_chunks);
820
821 if (!retval)
822 goto fini;
823
824 // Upload manifest ensemble
825 {
826 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
827 WaitForStorage();
828
829 if (!Peek(ensemble.manifest->certificate())) {
830 StoreBuffer(ensemble.cert_buf, ensemble.cert_size,
831 ensemble.manifest->certificate(), true);
832 }
833 if (reflog != NULL
834 && !reflog->AddCertificate(ensemble.manifest->certificate())) {
835 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
836 goto fini;
837 }
838 if (!meta_info_hash.IsNull()) {
839 const unsigned char *info = reinterpret_cast<const unsigned char *>(
840 meta_info.data());
841 StoreBuffer(info, meta_info.size(), meta_info_hash, true);
842 if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
843 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
844 goto fini;
845 }
846 }
847
848 // Create alternative bootstrapping symlinks for VOMS secured repos
849 if (ensemble.manifest->has_alt_catalog_path()) {
850 const bool success = spooler->PlaceBootstrappingShortcut(
851 ensemble.manifest->certificate())
852 && spooler->PlaceBootstrappingShortcut(
853 ensemble.manifest->catalog_hash())
854 && (ensemble.manifest->history().IsNull()
855 || spooler->PlaceBootstrappingShortcut(
856 ensemble.manifest->history()))
857 && (meta_info_hash.IsNull()
858 || spooler->PlaceBootstrappingShortcut(
859 meta_info_hash));
860
861 if (!success) {
862 LogCvmfs(kLogCvmfs, kLogStderr,
863 "failed to place root catalog bootstrapping symlinks");
864 goto fini;
865 }
866 }
867
868 // upload Reflog database
869 if (!preload_cache && reflog != NULL) {
870 reflog->CommitTransaction();
871 reflog->DropDatabaseFileOwnership();
872 const string reflog_path = reflog->database_file();
873 delete reflog;
874 manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
875 WaitForStorage(); // Reduce the duration of reflog /wo checksum
876 spooler->UploadReflog(reflog_path);
877 spooler->WaitForUpload();
878 unlink(reflog_path.c_str());
879 if (spooler->GetNumberOfErrors()) {
880 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
881 spooler->GetNumberOfErrors());
882 goto fini;
883 }
884 assert(!reflog_chksum_path.empty());
885 manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
886 }
887
888 if (preload_cache) {
889 const bool retval = ensemble.manifest->ExportBreadcrumb(*preload_cachedir,
890 0660);
891 assert(retval);
892 } else {
893 // pkcs#7 structure contains content + certificate + signature
894 // So there is no race with whitelist and pkcs7 signature being out of
895 // sync
896 if (ensemble.whitelist_pkcs7_buf) {
897 StoreBuffer(ensemble.whitelist_pkcs7_buf, ensemble.whitelist_pkcs7_size,
898 ".cvmfswhitelist.pkcs7", false);
899 }
900 StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
901 ".cvmfswhitelist", false);
902 StoreBuffer(ensemble.raw_manifest_buf, ensemble.raw_manifest_size,
903 ".cvmfspublished", false);
904 }
905 LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64,
906 ensemble.manifest->revision());
907 }
908
909 WaitForStorage();
910 LogCvmfs(kLogCvmfs, kLogStdout,
911 "Fetched %" PRId64 " new chunks out of %" PRId64 " processed chunks",
912 atomic_read64(&overall_new), atomic_read64(&overall_chunks));
913 result = 0;
914
915 fini:
916 if (fd_lockfile >= 0)
917 UnlockFile(fd_lockfile);
918 free(workers);
919 delete spooler;
920 delete pathfilter;
921 return result;
922 }
923
924 } // namespace swissknife
925