GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_pull.cc
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 0 494 0.0%
Branches: 0 350 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 *
4 * Replicates a cvmfs repository. Uses the cvmfs intrinsic Merkle trees
5 * to calculate the difference set.
6 */
7
8 // NOLINTNEXTLINE
9 #define _FILE_OFFSET_BITS 64
10 // NOLINTNEXTLINE
11 #define __STDC_FORMAT_MACROS
12
13
14 #include "swissknife_pull.h"
15
16 #include <inttypes.h>
17 #include <pthread.h>
18 #include <sys/stat.h>
19 #include <unistd.h>
20
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 #include <vector>
25
26 #include "catalog.h"
27 #include "compression/compression.h"
28 #include "crypto/hash.h"
29 #include "crypto/signature.h"
30 #include "history_sqlite.h"
31 #include "manifest.h"
32 #include "manifest_fetch.h"
33 #include "network/download.h"
34 #include "object_fetcher.h"
35 #include "path_filters/relaxed_path_filter.h"
36 #include "reflog.h"
37 #include "upload.h"
38 #include "util/atomic.h"
39 #include "util/concurrency.h"
40 #include "util/exception.h"
41 #include "util/logging.h"
42 #include "util/posix.h"
43 #include "util/shared_ptr.h"
44 #include "util/smalloc.h"
45 #include "util/string.h"
46
47 using namespace std; // NOLINT
48
49 namespace swissknife {
50
51 namespace {
52
53 typedef HttpObjectFetcher<> ObjectFetcher;
54
55 /**
56 * This just stores an shash::Any in a predictable way to send it through a
57 * POSIX pipe.
58 */
59 class ChunkJob {
60 public:
61 ChunkJob()
62 : suffix(shash::kSuffixNone)
63 , hash_algorithm(shash::kAny)
64 , compression_alg(zlib::kZlibDefault) { }
65
66 ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
67 : suffix(hash.suffix)
68 , hash_algorithm(hash.algorithm)
69 , compression_alg(compression_alg) {
70 memcpy(digest, hash.digest, hash.GetDigestSize());
71 }
72
73 bool IsTerminateJob() const { return (hash_algorithm == shash::kAny); }
74
75 shash::Any hash() const {
76 assert(!IsTerminateJob());
77 return shash::Any(hash_algorithm, digest, suffix);
78 }
79
80 const shash::Suffix suffix;
81 const shash::Algorithms hash_algorithm;
82 const zlib::Algorithms compression_alg;
83 unsigned char digest[shash::kMaxDigestSize];
84 };
85
86 static void SpoolerOnUpload(const upload::SpoolerResult &result) {
87 unlink(result.local_path.c_str());
88 if (result.return_code != 0) {
89 PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code,
90 result.local_path.c_str(), result.content_hash.ToString().c_str());
91 }
92 }
93
94 SharedPtr<string> stratum0_url;
95 SharedPtr<string> stratum1_url;
96 SharedPtr<string> temp_dir;
97 unsigned num_parallel = 1;
98 bool pull_history = false;
99 bool apply_timestamp_threshold = false;
100 uint64_t timestamp_threshold = 0;
101 bool is_garbage_collectable = false;
102 bool initial_snapshot = false;
103 upload::Spooler *spooler = NULL;
104 int pipe_chunks[2];
105 // required for concurrent reading
106 pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER;
107 unsigned retries = 3;
108 catalog::RelaxedPathFilter *pathfilter = NULL;
109 atomic_int64 overall_chunks;
110 atomic_int64 overall_new;
111 atomic_int64 chunk_queue;
112 bool preload_cache = false;
113 string *preload_cachedir = NULL;
114 bool inspect_existing_catalogs = false;
115 manifest::Reflog *reflog = NULL;
116
117 } // anonymous namespace
118
119
120 static std::string MakePath(const shash::Any &hash) {
121 return (preload_cache)
122 ? *preload_cachedir + "/" + hash.MakePathWithoutSuffix()
123 : "data/" + hash.MakePath();
124 }
125
126
127 static bool Peek(const string &remote_path) {
128 return (preload_cache) ? FileExists(remote_path) : spooler->Peek(remote_path);
129 }
130
131 static bool Peek(const shash::Any &remote_hash) {
132 return Peek(MakePath(remote_hash));
133 }
134
135 static void ReportDownloadError(const download::JobInfo &download_job) {
136 const download::Failures error_code = download_job.error_code();
137 const int http_code = download_job.http_code();
138 const std::string url = *download_job.url();
139
140 LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)",
141 url.c_str(), error_code, download::Code2Ascii(error_code));
142
143 switch (error_code) {
144 case download::kFailProxyResolve:
145 case download::kFailHostResolve:
146 LogCvmfs(kLogCvmfs, kLogStderr,
147 "DNS lookup for Stratum 0 failed - "
148 "please check the network connection");
149 break;
150
151 case download::kFailHostHttp:
152 LogCvmfs(kLogCvmfs, kLogStderr,
153 "unexpected HTTP error code %d - "
154 "please check the stratum 0 health",
155 http_code);
156 break;
157
158 case download::kFailBadData:
159 LogCvmfs(kLogCvmfs, kLogStderr,
160 "downloaded corrupted data - "
161 "please check the stratum 0 health");
162 break;
163
164 default:
165 if (download::IsProxyTransferError(error_code)
166 || download::IsHostTransferError(error_code)) {
167 LogCvmfs(kLogCvmfs, kLogStderr,
168 "couldn't reach Stratum 0 - "
169 "please check the network connection");
170 } else {
171 LogCvmfs(kLogCvmfs, kLogStderr,
172 "unexpected error - feel free to file "
173 "a bug report");
174 }
175 }
176 }
177
178
179 static void Store(const string &local_path,
180 const string &remote_path,
181 const bool compressed_src) {
182 if (preload_cache) {
183 if (!compressed_src) {
184 const int retval = rename(local_path.c_str(), remote_path.c_str());
185 if (retval != 0) {
186 PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(),
187 remote_path.c_str());
188 }
189 } else {
190 // compressed input
191 string tmp_dest;
192 FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest);
193 if (fdest == NULL) {
194 PANIC(kLogStderr, "Failed to create temporary file '%s'",
195 remote_path.c_str());
196 }
197 int retval = zlib::DecompressPath2File(local_path, fdest);
198 if (!retval) {
199 PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(),
200 remote_path.c_str());
201 }
202 fclose(fdest);
203 retval = rename(tmp_dest.c_str(), remote_path.c_str());
204 assert(retval == 0);
205 unlink(local_path.c_str());
206 }
207 } else {
208 spooler->Upload(local_path, remote_path);
209 }
210 }
211
212 static void Store(const string &local_path,
213 const shash::Any &remote_hash,
214 const bool compressed_src = true) {
215 Store(local_path, MakePath(remote_hash), compressed_src);
216 }
217
218
219 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
220 const std::string &dest_path, const bool compress) {
221 string tmp_file;
222 FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
223 assert(ftmp);
224 int retval;
225 if (compress) {
226 shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused
227 retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy);
228 } else {
229 retval = CopyMem2File(buffer, size, ftmp);
230 }
231 assert(retval);
232 fclose(ftmp);
233 Store(tmp_file, dest_path, true);
234 }
235
236 static void StoreBuffer(const unsigned char *buffer, const unsigned size,
237 const shash::Any &dest_hash, const bool compress) {
238 StoreBuffer(buffer, size, MakePath(dest_hash), compress);
239 }
240
241
242 static void WaitForStorage() {
243 if (!preload_cache)
244 spooler->WaitForUpload();
245 }
246
247
248 struct MainWorkerContext {
249 download::DownloadManager *download_manager;
250 };
251
252 static void *MainWorker(void *data) {
253 MainWorkerContext *mwc = static_cast<MainWorkerContext *>(data);
254 download::DownloadManager *download_manager = mwc->download_manager;
255
256 while (1) {
257 ChunkJob next_chunk;
258 {
259 const MutexLockGuard m(&lock_pipe);
260 ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk));
261 }
262 if (next_chunk.IsTerminateJob())
263 break;
264
265 const shash::Any chunk_hash = next_chunk.hash();
266 const zlib::Algorithms compression_alg = next_chunk.compression_alg;
267 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s",
268 chunk_hash.ToString().c_str());
269
270 if (!Peek(chunk_hash)) {
271 string tmp_file;
272 FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file);
273 const string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath();
274 cvmfs::FileSink filesink(fchunk);
275 download::JobInfo download_chunk(&url_chunk, false, false, &chunk_hash,
276 &filesink);
277
278 const download::Failures download_result = download_manager->Fetch(
279 &download_chunk);
280 if (download_result != download::kFailOk) {
281 ReportDownloadError(download_chunk);
282 PANIC(kLogStderr, "Download error");
283 }
284 fclose(fchunk);
285 Store(tmp_file, chunk_hash,
286 (compression_alg == zlib::kZlibDefault) ? true : false);
287 atomic_inc64(&overall_new);
288 }
289 if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0)
290 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, ".");
291 atomic_dec64(&chunk_queue);
292 }
293 return NULL;
294 }
295
296
297 bool CommandPull::PullRecursion(catalog::Catalog *catalog,
298 const std::string &path) {
299 assert(catalog);
300
301
302 // Nested catalogs (in a nested code block because goto fail...)
303 {
304 const catalog::Catalog::NestedCatalogList
305 nested_catalogs = catalog->ListOwnNestedCatalogs();
306 for (catalog::Catalog::NestedCatalogList::const_iterator
307 i = nested_catalogs.begin(),
308 iEnd = nested_catalogs.end();
309 i != iEnd;
310 ++i) {
311 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s",
312 i->mountpoint.c_str());
313 shash::Any previous_catalog_hash; // expected to be null for subcatalog
314 const bool retval = Pull(i->hash, i->mountpoint.ToString(),
315 previous_catalog_hash);
316 if (!retval)
317 return false;
318 }
319 }
320
321 return true;
322 }
323
324 bool CommandPull::Pull(const shash::Any &catalog_hash,
325 const std::string &path,
326 shash::Any &previous_catalog) {
327 int retval;
328 download::Failures dl_retval;
329 assert(shash::kSuffixCatalog == catalog_hash.suffix);
330 previous_catalog.SetNull();
331
332 // Check if the catalog already exists
333 if (Peek(catalog_hash)) {
334 // Preload: dirtab changed
335 if (inspect_existing_catalogs) {
336 if (!preload_cache) {
337 PANIC(kLogStderr, "to be implemented: -t without -c");
338 }
339 catalog::Catalog *catalog = catalog::Catalog::AttachFreely(
340 path, MakePath(catalog_hash), catalog_hash);
341 if (catalog == NULL) {
342 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
343 catalog_hash.ToString().c_str());
344 return false;
345 }
346 const bool retval = PullRecursion(catalog, path);
347 delete catalog;
348 return retval;
349 }
350
351 LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date");
352 return true;
353 }
354
355 // Check if the catalog matches the pathfilter
356 if (path != "" && // necessary to load the root catalog
357 pathfilter && !pathfilter->IsMatching(path)) {
358 LogCvmfs(kLogCvmfs, kLogStdout,
359 " Catalog in '%s' does not match"
360 " the path specification",
361 path.c_str());
362 return true;
363 }
364
365 const int64_t gauge_chunks = atomic_read64(&overall_chunks);
366 const int64_t gauge_new = atomic_read64(&overall_new);
367
368 // Download and uncompress catalog
369 shash::Any chunk_hash;
370 zlib::Algorithms compression_alg;
371 catalog::Catalog *catalog = NULL;
372 string file_catalog;
373 string file_catalog_vanilla;
374 FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
375 &file_catalog);
376 if (!fcatalog) {
377 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
378 return false;
379 }
380 fclose(fcatalog);
381 FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w",
382 &file_catalog_vanilla);
383 if (!fcatalog_vanilla) {
384 LogCvmfs(kLogCvmfs, kLogStderr, "I/O error");
385 unlink(file_catalog.c_str());
386 return false;
387 }
388 const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath();
389 cvmfs::FileSink filesink(fcatalog_vanilla);
390 download::JobInfo download_catalog(&url_catalog, false, false, &catalog_hash,
391 &filesink);
392 dl_retval = download_manager()->Fetch(&download_catalog);
393 fclose(fcatalog_vanilla);
394 if (dl_retval != download::kFailOk) {
395 if (path == "" && is_garbage_collectable) {
396 LogCvmfs(kLogCvmfs, kLogStdout,
397 "skipping missing root catalog %s - "
398 "probably sweeped by garbage collection",
399 catalog_hash.ToString().c_str());
400 goto pull_skip;
401 } else {
402 ReportDownloadError(download_catalog);
403 goto pull_cleanup;
404 }
405 }
406 retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog);
407 if (!retval) {
408 LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)",
409 file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str());
410 goto pull_cleanup;
411 }
412 if (path.empty() && reflog != NULL) {
413 if (!reflog->AddCatalog(catalog_hash)) {
414 LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog.");
415 goto pull_cleanup;
416 }
417 }
418
419 catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash);
420 if (catalog == NULL) {
421 LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s",
422 catalog_hash.ToString().c_str());
423 goto pull_cleanup;
424 }
425
426 // Always pull the HEAD root catalog and nested catalogs
427 if (apply_timestamp_threshold && (path == "")
428 && (catalog->GetLastModified() < timestamp_threshold)) {
429 LogCvmfs(kLogCvmfs, kLogStdout,
430 " Pruning at root catalog from %s due to threshold at %s",
431 StringifyTime(catalog->GetLastModified(), false).c_str(),
432 StringifyTime(timestamp_threshold, false).c_str());
433 delete catalog;
434 goto pull_skip;
435 }
436 apply_timestamp_threshold = true;
437
438 // Traverse the chunks
439 LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak,
440 " Processing chunks [%" PRIu64 " registered chunks]: ",
441 catalog->GetNumChunks());
442 retval = catalog->AllChunksBegin();
443 if (!retval) {
444 LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks");
445 goto pull_cleanup;
446 }
447 while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) {
448 ChunkJob next_chunk(chunk_hash, compression_alg);
449 WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk));
450 atomic_inc64(&chunk_queue);
451 }
452 catalog->AllChunksEnd();
453 while (atomic_read64(&chunk_queue) != 0) {
454 SafeSleepMs(100);
455 }
456 LogCvmfs(kLogCvmfs, kLogStdout,
457 " fetched %" PRId64 " new chunks out of "
458 "%" PRId64 " unique chunks",
459 atomic_read64(&overall_new) - gauge_new,
460 atomic_read64(&overall_chunks) - gauge_chunks);
461
462 retval = PullRecursion(catalog, path);
463 previous_catalog = catalog->GetPreviousRevision();
464 delete catalog;
465 unlink(file_catalog.c_str());
466
467
468 WaitForStorage();
469 if (!retval)
470 return false;
471 Store(file_catalog_vanilla, catalog_hash);
472 return true;
473
474 pull_cleanup:
475 delete catalog;
476 unlink(file_catalog.c_str());
477 unlink(file_catalog_vanilla.c_str());
478 return false;
479
480 pull_skip:
481 unlink(file_catalog.c_str());
482 unlink(file_catalog_vanilla.c_str());
483 return true;
484 }
485
486
487 int swissknife::CommandPull::Main(const swissknife::ArgumentList &args) {
488 int retval;
489 manifest::Failures m_retval;
490 download::Failures dl_retval;
491 unsigned timeout = 60;
492 const int fd_lockfile = -1;
493 string spooler_definition_str;
494 manifest::ManifestEnsemble ensemble;
495 shash::Any meta_info_hash;
496 string meta_info;
497 shash::Any previous_catalog_hash;
498 shash::Any current_catalog_hash;
499
500 // Option parsing
501 if (args.find('c') != args.end())
502 preload_cache = true;
503 if (args.find('l') != args.end()) {
504 const unsigned log_level = kLogLevel0
505 << String2Uint64(*args.find('l')->second);
506 if (log_level > kLogNone) {
507 LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level");
508 return 1;
509 }
510 SetLogVerbosity(static_cast<LogLevels>(log_level));
511 }
512 stratum0_url = args.find('u')->second;
513 temp_dir = args.find('x')->second;
514 if (preload_cache) {
515 preload_cachedir = new string(*args.find('r')->second);
516 } else {
517 spooler_definition_str = *args.find('r')->second;
518 }
519 string master_keys = *args.find('k')->second;
520 if (DirectoryExists(master_keys))
521 master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":");
522 const string repository_name = *args.find('m')->second;
523 if (args.find('n') != args.end())
524 num_parallel = String2Uint64(*args.find('n')->second);
525 if (args.find('t') != args.end())
526 timeout = String2Uint64(*args.find('t')->second);
527 if (args.find('a') != args.end())
528 retries = String2Uint64(*args.find('a')->second);
529 if (args.find('d') != args.end()) {
530 pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second);
531 assert(pathfilter->IsValid());
532 }
533 if (args.find('p') != args.end())
534 pull_history = true;
535 if (args.find('z') != args.end())
536 inspect_existing_catalogs = true;
537 if (args.find('w') != args.end())
538 stratum1_url = args.find('w')->second;
539 if (args.find('i') != args.end())
540 initial_snapshot = true;
541 shash::Any reflog_hash;
542 string reflog_chksum_path;
543 if (args.find('R') != args.end()) {
544 reflog_chksum_path = *args.find('R')->second;
545 if (!initial_snapshot) {
546 if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) {
547 LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum");
548 return 1;
549 }
550 }
551 }
552 if (args.find('Z') != args.end()) {
553 timestamp_threshold = String2Int64(*args.find('Z')->second);
554 }
555
556 if (!preload_cache && stratum1_url.Get() == NULL) {
557 LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>");
558 return 1;
559 }
560
561 typedef std::vector<history::History::Tag> TagVector;
562 TagVector historic_tags;
563
564 LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s",
565 stratum0_url->c_str());
566
567 int result = 1;
568
569 // Initialization
570 atomic_init64(&overall_chunks);
571 atomic_init64(&overall_new);
572 atomic_init64(&chunk_queue);
573
574 const bool follow_redirects = false;
575 const unsigned max_pool_handles = num_parallel + 1;
576 const string proxy = (args.find('@') != args.end()) ? *args.find('@')->second
577 : "";
578
579 if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
580 return 1;
581 }
582
583 if (!this->InitSignatureManager(master_keys)) {
584 LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures");
585 return 1;
586 } else {
587 LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: using public key(s) %s",
588 JoinStrings(SplitString(master_keys, ':'), ", ").c_str());
589 }
590
591 unsigned current_group;
592 vector<vector<download::DownloadManager::ProxyInfo> > proxies;
593 download_manager()->GetProxyInfo(&proxies, &current_group, NULL);
594 if (proxies.size() > 0) {
595 string proxy_str = "\nWarning, replicating through proxies\n";
596 proxy_str += " Load-balance groups:\n";
597 for (unsigned i = 0; i < proxies.size(); ++i) {
598 vector<string> urls;
599 for (unsigned j = 0; j < proxies[i].size(); ++j) {
600 urls.push_back(proxies[i][j].url);
601 }
602 proxy_str += " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ")
603 + "\n";
604 }
605 proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] "
606 + proxies[current_group][0].url;
607 LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str());
608 }
609 download_manager()->SetTimeout(timeout, timeout);
610 download_manager()->SetRetryParameters(retries, 500, 2000);
611 download_manager()->Spawn();
612
613 // init the download helper
614 const ObjectFetcher object_fetcher(repository_name, *stratum0_url, *temp_dir,
615 download_manager(), signature_manager());
616
617 pthread_t *workers = reinterpret_cast<pthread_t *>(
618 smalloc(sizeof(pthread_t) * num_parallel));
619
620 // Check if we have a replica-ready server
621 const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica";
622 download::JobInfo download_sentinel(&url_sentinel, false);
623 retval = download_manager()->Fetch(&download_sentinel);
624 if (retval != download::kFailOk) {
625 if (download_sentinel.http_code() == 404) {
626 LogCvmfs(kLogCvmfs, kLogStderr,
627 "This is not a CernVM-FS server for replication");
628 } else {
629 LogCvmfs(kLogCvmfs, kLogStderr,
630 "Failed to contact stratum 0 server (%d - %s)", retval,
631 download::Code2Ascii(download_sentinel.error_code()));
632 }
633 goto fini;
634 }
635
636 m_retval = FetchRemoteManifestEnsemble(*stratum0_url, repository_name,
637 &ensemble);
638 if (m_retval != manifest::kFailOk) {
639 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)",
640 m_retval, manifest::Code2Ascii(m_retval));
641 goto fini;
642 }
643
644 // Get meta info
645 meta_info_hash = ensemble.manifest->meta_info();
646 if (!meta_info_hash.IsNull()) {
647 meta_info_hash = ensemble.manifest->meta_info();
648 const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath();
649 cvmfs::MemSink metainfo_memsink;
650 download::JobInfo download_metainfo(&url, true, false, &meta_info_hash,
651 &metainfo_memsink);
652 dl_retval = download_manager()->Fetch(&download_metainfo);
653 if (dl_retval != download::kFailOk) {
654 LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)",
655 dl_retval, download::Code2Ascii(dl_retval));
656 goto fini;
657 }
658 meta_info = string(reinterpret_cast<char *>(metainfo_memsink.data()),
659 metainfo_memsink.pos());
660 }
661
662 is_garbage_collectable = ensemble.manifest->garbage_collectable();
663
664 // Manifest available, now the spooler's hash algorithm can be determined
665 // That doesn't actually matter because the replication does no re-hashing
666 if (!preload_cache) {
667 const upload::SpoolerDefinition spooler_definition(
668 spooler_definition_str, ensemble.manifest->GetHashAlgorithm());
669 spooler = upload::Spooler::Construct(spooler_definition);
670 assert(spooler);
671 spooler->RegisterListener(&SpoolerOnUpload);
672 }
673
674 // Open the reflog for modification
675 if (!preload_cache) {
676 if (initial_snapshot) {
677 LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'",
678 repository_name.c_str());
679 reflog = CreateEmptyReflog(*temp_dir, repository_name);
680 if (reflog == NULL) {
681 LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog");
682 goto fini;
683 }
684 } else {
685 ObjectFetcher object_fetcher_stratum1(repository_name,
686 *stratum1_url,
687 *temp_dir,
688 download_manager(),
689 signature_manager());
690
691 if (!reflog_hash.IsNull()) {
692 reflog = FetchReflog(&object_fetcher_stratum1, repository_name,
693 reflog_hash);
694 assert(reflog != NULL);
695 } else {
696 LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)");
697 if (spooler->Peek(".cvmfsreflog")) {
698 LogCvmfs(kLogCvmfs, kLogStderr,
699 "no reflog hash specified but reflog is present");
700 goto fini;
701 }
702 }
703 }
704
705 if (reflog != NULL) {
706 reflog->BeginTransaction();
707 // On commit: use manifest's hash algorithm
708 reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm();
709 }
710 }
711
712 // Fetch tag list.
713 // If we are just preloading the cache it is not strictly necessarily to
714 // download the entire tag list
715 // TODO(molina): add user option to download tags when preloading the cache
716 if (!ensemble.manifest->history().IsNull() && !preload_cache) {
717 const shash::Any history_hash = ensemble.manifest->history();
718 const string history_url = *stratum0_url + "/data/"
719 + history_hash.MakePath();
720 const string history_path = *temp_dir + "/" + history_hash.ToString();
721
722 cvmfs::PathSink pathsink(history_path);
723 download::JobInfo download_history(&history_url, false, false,
724 &history_hash, &pathsink);
725 dl_retval = download_manager()->Fetch(&download_history);
726 if (dl_retval != download::kFailOk) {
727 ReportDownloadError(download_history);
728 goto fini;
729 }
730 const std::string history_db_path = history_path + ".uncompressed";
731 retval = zlib::DecompressPath2Path(history_path, history_db_path);
732 assert(retval);
733 history::History *tag_db = history::SqliteHistory::Open(history_db_path);
734 if (NULL == tag_db) {
735 LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)",
736 history_db_path.c_str());
737 unlink(history_db_path.c_str());
738 goto fini;
739 }
740 retval = tag_db->List(&historic_tags);
741 delete tag_db;
742 unlink(history_db_path.c_str());
743 if (!retval) {
744 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)",
745 history_db_path.c_str());
746 goto fini;
747 }
748
749 LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots",
750 historic_tags.size());
751 // TODO(jblomer): We should repliacte the previous history dbs, too,
752 // in order to avoid races on fail-over between non-synchronized stratum 1s
753 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database");
754 Store(history_path, history_hash);
755 WaitForStorage();
756 unlink(history_path.c_str());
757 if (reflog != NULL && !reflog->AddHistory(history_hash)) {
758 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog.");
759 goto fini;
760 }
761 }
762
763 // Starting threads
764 MakePipe(pipe_chunks);
765 LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel);
766 MainWorkerContext mwc;
767 mwc.download_manager = download_manager();
768 for (unsigned i = 0; i < num_parallel; ++i) {
769 const int retval = pthread_create(&workers[i], NULL, MainWorker,
770 static_cast<void *>(&mwc));
771 assert(retval == 0);
772 }
773
774 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /");
775 current_catalog_hash = ensemble.manifest->catalog_hash();
776 do {
777 retval = Pull(current_catalog_hash, "", previous_catalog_hash);
778 if (pull_history && !previous_catalog_hash.IsNull()) {
779 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
780 previous_catalog_hash.ToString().c_str());
781 }
782 current_catalog_hash = previous_catalog_hash;
783 } while (pull_history && !previous_catalog_hash.IsNull());
784
785 if (!historic_tags.empty()) {
786 LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots...");
787 }
788 for (TagVector::const_iterator i = historic_tags.begin(),
789 iend = historic_tags.end();
790 i != iend;
791 ++i) {
792 if (Peek(i->root_hash))
793 continue;
794 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag",
795 i->name.c_str());
796 apply_timestamp_threshold = false;
797
798 bool retval2;
799 current_catalog_hash = i->root_hash;
800 do {
801 retval2 = Pull(current_catalog_hash, "", previous_catalog_hash);
802 if (pull_history && !previous_catalog_hash.IsNull()) {
803 LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s",
804 previous_catalog_hash.ToString().c_str());
805 }
806 current_catalog_hash = previous_catalog_hash;
807 } while (pull_history && !previous_catalog_hash.IsNull());
808 retval = retval && retval2;
809 }
810
811 // Stopping threads
812 LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel);
813 for (unsigned i = 0; i < num_parallel; ++i) {
814 ChunkJob terminate_workers;
815 WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers));
816 }
817 for (unsigned i = 0; i < num_parallel; ++i) {
818 int retval = pthread_join(workers[i], NULL); // NOLINT (false positive)
819 assert(retval == 0);
820 }
821 ClosePipe(pipe_chunks);
822
823 if (!retval)
824 goto fini;
825
826 // Upload manifest ensemble
827 {
828 LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble");
829 WaitForStorage();
830
831 if (!Peek(ensemble.manifest->certificate())) {
832 StoreBuffer(ensemble.cert_buf, ensemble.cert_size,
833 ensemble.manifest->certificate(), true);
834 }
835 if (reflog != NULL
836 && !reflog->AddCertificate(ensemble.manifest->certificate())) {
837 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog.");
838 goto fini;
839 }
840 if (!meta_info_hash.IsNull()) {
841 const unsigned char *info = reinterpret_cast<const unsigned char *>(
842 meta_info.data());
843 StoreBuffer(info, meta_info.size(), meta_info_hash, true);
844 if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) {
845 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog.");
846 goto fini;
847 }
848 }
849
850 // Create alternative bootstrapping symlinks for VOMS secured repos
851 if (ensemble.manifest->has_alt_catalog_path()) {
852 const bool success = spooler->PlaceBootstrappingShortcut(
853 ensemble.manifest->certificate())
854 && spooler->PlaceBootstrappingShortcut(
855 ensemble.manifest->catalog_hash())
856 && (ensemble.manifest->history().IsNull()
857 || spooler->PlaceBootstrappingShortcut(
858 ensemble.manifest->history()))
859 && (meta_info_hash.IsNull()
860 || spooler->PlaceBootstrappingShortcut(
861 meta_info_hash));
862
863 if (!success) {
864 LogCvmfs(kLogCvmfs, kLogStderr,
865 "failed to place root catalog bootstrapping symlinks");
866 goto fini;
867 }
868 }
869
870 // upload Reflog database
871 if (!preload_cache && reflog != NULL) {
872 reflog->CommitTransaction();
873 reflog->DropDatabaseFileOwnership();
874 const string reflog_path = reflog->database_file();
875 delete reflog;
876 manifest::Reflog::HashDatabase(reflog_path, &reflog_hash);
877 WaitForStorage(); // Reduce the duration of reflog /wo checksum
878 spooler->UploadReflog(reflog_path);
879 spooler->WaitForUpload();
880 unlink(reflog_path.c_str());
881 if (spooler->GetNumberOfErrors()) {
882 LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)",
883 spooler->GetNumberOfErrors());
884 goto fini;
885 }
886 assert(!reflog_chksum_path.empty());
887 manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash);
888 }
889
890 if (preload_cache) {
891 const bool retval = ensemble.manifest->ExportBreadcrumb(*preload_cachedir,
892 0660);
893 assert(retval);
894 } else {
895 // pkcs#7 structure contains content + certificate + signature
896 // So there is no race with whitelist and pkcs7 signature being out of
897 // sync
898 if (ensemble.whitelist_pkcs7_buf) {
899 StoreBuffer(ensemble.whitelist_pkcs7_buf, ensemble.whitelist_pkcs7_size,
900 ".cvmfswhitelist.pkcs7", false);
901 }
902 StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size,
903 ".cvmfswhitelist", false);
904 StoreBuffer(ensemble.raw_manifest_buf, ensemble.raw_manifest_size,
905 ".cvmfspublished", false);
906 }
907 LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64,
908 ensemble.manifest->revision());
909 }
910
911 WaitForStorage();
912 LogCvmfs(kLogCvmfs, kLogStdout,
913 "Fetched %" PRId64 " new chunks out of %" PRId64 " processed chunks",
914 atomic_read64(&overall_new), atomic_read64(&overall_chunks));
915 result = 0;
916
917 fini:
918 if (fd_lockfile >= 0)
919 UnlockFile(fd_lockfile);
920 free(workers);
921 delete spooler;
922 delete pathfilter;
923 return result;
924 }
925
926 } // namespace swissknife
927