| Line |
Branch |
Exec |
Source |
| 1 |
|
|
/** |
| 2 |
|
|
* This file is part of the CernVM File System. |
| 3 |
|
|
* |
| 4 |
|
|
* Replicates a cvmfs repository. Uses the cvmfs intrinsic Merkle trees |
| 5 |
|
|
* to calculate the difference set. |
| 6 |
|
|
*/ |
| 7 |
|
|
|
| 8 |
|
|
// NOLINTNEXTLINE |
| 9 |
|
|
#define _FILE_OFFSET_BITS 64 |
| 10 |
|
|
// NOLINTNEXTLINE |
| 11 |
|
|
#define __STDC_FORMAT_MACROS |
| 12 |
|
|
|
| 13 |
|
|
|
| 14 |
|
|
#include "swissknife_pull.h" |
| 15 |
|
|
|
| 16 |
|
|
#include <inttypes.h> |
| 17 |
|
|
#include <pthread.h> |
| 18 |
|
|
#include <sys/stat.h> |
| 19 |
|
|
#include <unistd.h> |
| 20 |
|
|
|
| 21 |
|
|
#include <cstdlib> |
| 22 |
|
|
#include <cstring> |
| 23 |
|
|
#include <string> |
| 24 |
|
|
#include <vector> |
| 25 |
|
|
|
| 26 |
|
|
#include "catalog.h" |
| 27 |
|
|
#include "compression/compression.h" |
| 28 |
|
|
#include "crypto/hash.h" |
| 29 |
|
|
#include "crypto/signature.h" |
| 30 |
|
|
#include "history_sqlite.h" |
| 31 |
|
|
#include "manifest.h" |
| 32 |
|
|
#include "manifest_fetch.h" |
| 33 |
|
|
#include "network/download.h" |
| 34 |
|
|
#include "object_fetcher.h" |
| 35 |
|
|
#include "path_filters/relaxed_path_filter.h" |
| 36 |
|
|
#include "reflog.h" |
| 37 |
|
|
#include "upload.h" |
| 38 |
|
|
#include "util/atomic.h" |
| 39 |
|
|
#include "util/concurrency.h" |
| 40 |
|
|
#include "util/exception.h" |
| 41 |
|
|
#include "util/logging.h" |
| 42 |
|
|
#include "util/posix.h" |
| 43 |
|
|
#include "util/shared_ptr.h" |
| 44 |
|
|
#include "util/smalloc.h" |
| 45 |
|
|
#include "util/string.h" |
| 46 |
|
|
|
| 47 |
|
|
using namespace std; // NOLINT |
| 48 |
|
|
|
| 49 |
|
|
namespace swissknife { |
| 50 |
|
|
|
| 51 |
|
|
namespace { |
| 52 |
|
|
|
| 53 |
|
|
typedef HttpObjectFetcher<> ObjectFetcher; |
| 54 |
|
|
|
| 55 |
|
|
/** |
| 56 |
|
|
* This just stores an shash::Any in a predictable way to send it through a |
| 57 |
|
|
* POSIX pipe. |
| 58 |
|
|
*/ |
| 59 |
|
|
class ChunkJob { |
| 60 |
|
|
public: |
| 61 |
|
✗ |
ChunkJob() |
| 62 |
|
✗ |
: suffix(shash::kSuffixNone) |
| 63 |
|
✗ |
, hash_algorithm(shash::kAny) |
| 64 |
|
✗ |
, compression_alg(zlib::kZlibDefault) { } |
| 65 |
|
|
|
| 66 |
|
✗ |
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg) |
| 67 |
|
✗ |
: suffix(hash.suffix) |
| 68 |
|
✗ |
, hash_algorithm(hash.algorithm) |
| 69 |
|
✗ |
, compression_alg(compression_alg) { |
| 70 |
|
✗ |
memcpy(digest, hash.digest, hash.GetDigestSize()); |
| 71 |
|
|
} |
| 72 |
|
|
|
| 73 |
|
✗ |
bool IsTerminateJob() const { return (hash_algorithm == shash::kAny); } |
| 74 |
|
|
|
| 75 |
|
✗ |
shash::Any hash() const { |
| 76 |
|
✗ |
assert(!IsTerminateJob()); |
| 77 |
|
✗ |
return shash::Any(hash_algorithm, digest, suffix); |
| 78 |
|
|
} |
| 79 |
|
|
|
| 80 |
|
|
const shash::Suffix suffix; |
| 81 |
|
|
const shash::Algorithms hash_algorithm; |
| 82 |
|
|
const zlib::Algorithms compression_alg; |
| 83 |
|
|
unsigned char digest[shash::kMaxDigestSize]; |
| 84 |
|
|
}; |
| 85 |
|
|
|
| 86 |
|
✗ |
static void SpoolerOnUpload(const upload::SpoolerResult &result) { |
| 87 |
|
✗ |
unlink(result.local_path.c_str()); |
| 88 |
|
✗ |
if (result.return_code != 0) { |
| 89 |
|
✗ |
PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code, |
| 90 |
|
|
result.local_path.c_str(), result.content_hash.ToString().c_str()); |
| 91 |
|
|
} |
| 92 |
|
|
} |
| 93 |
|
|
|
| 94 |
|
|
SharedPtr<string> stratum0_url; |
| 95 |
|
|
SharedPtr<string> stratum1_url; |
| 96 |
|
|
SharedPtr<string> temp_dir; |
| 97 |
|
|
unsigned num_parallel = 1; |
| 98 |
|
|
bool pull_history = false; |
| 99 |
|
|
bool apply_timestamp_threshold = false; |
| 100 |
|
|
uint64_t timestamp_threshold = 0; |
| 101 |
|
|
bool is_garbage_collectable = false; |
| 102 |
|
|
bool initial_snapshot = false; |
| 103 |
|
|
upload::Spooler *spooler = NULL; |
| 104 |
|
|
int pipe_chunks[2]; |
| 105 |
|
|
// required for concurrent reading |
| 106 |
|
|
pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER; |
| 107 |
|
|
unsigned retries = 3; |
| 108 |
|
|
catalog::RelaxedPathFilter *pathfilter = NULL; |
| 109 |
|
|
atomic_int64 overall_chunks; |
| 110 |
|
|
atomic_int64 overall_new; |
| 111 |
|
|
atomic_int64 chunk_queue; |
| 112 |
|
|
bool preload_cache = false; |
| 113 |
|
|
string *preload_cachedir = NULL; |
| 114 |
|
|
bool inspect_existing_catalogs = false; |
| 115 |
|
|
manifest::Reflog *reflog = NULL; |
| 116 |
|
|
|
| 117 |
|
|
} // anonymous namespace |
| 118 |
|
|
|
| 119 |
|
|
|
| 120 |
|
✗ |
static std::string MakePath(const shash::Any &hash) { |
| 121 |
|
|
return (preload_cache) |
| 122 |
|
✗ |
? *preload_cachedir + "/" + hash.MakePathWithoutSuffix() |
| 123 |
|
✗ |
: "data/" + hash.MakePath(); |
| 124 |
|
|
} |
| 125 |
|
|
|
| 126 |
|
|
|
| 127 |
|
✗ |
static bool Peek(const string &remote_path) { |
| 128 |
|
✗ |
return (preload_cache) ? FileExists(remote_path) : spooler->Peek(remote_path); |
| 129 |
|
|
} |
| 130 |
|
|
|
| 131 |
|
✗ |
static bool Peek(const shash::Any &remote_hash) { |
| 132 |
|
✗ |
return Peek(MakePath(remote_hash)); |
| 133 |
|
|
} |
| 134 |
|
|
|
| 135 |
|
✗ |
static void ReportDownloadError(const download::JobInfo &download_job) { |
| 136 |
|
✗ |
const download::Failures error_code = download_job.error_code(); |
| 137 |
|
✗ |
const int http_code = download_job.http_code(); |
| 138 |
|
✗ |
const std::string url = *download_job.url(); |
| 139 |
|
|
|
| 140 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)", |
| 141 |
|
|
url.c_str(), error_code, download::Code2Ascii(error_code)); |
| 142 |
|
|
|
| 143 |
|
✗ |
switch (error_code) { |
| 144 |
|
✗ |
case download::kFailProxyResolve: |
| 145 |
|
|
case download::kFailHostResolve: |
| 146 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 147 |
|
|
"DNS lookup for Stratum 0 failed - " |
| 148 |
|
|
"please check the network connection"); |
| 149 |
|
✗ |
break; |
| 150 |
|
|
|
| 151 |
|
✗ |
case download::kFailHostHttp: |
| 152 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 153 |
|
|
"unexpected HTTP error code %d - " |
| 154 |
|
|
"please check the stratum 0 health", |
| 155 |
|
|
http_code); |
| 156 |
|
✗ |
break; |
| 157 |
|
|
|
| 158 |
|
✗ |
case download::kFailBadData: |
| 159 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 160 |
|
|
"downloaded corrupted data - " |
| 161 |
|
|
"please check the stratum 0 health"); |
| 162 |
|
✗ |
break; |
| 163 |
|
|
|
| 164 |
|
✗ |
default: |
| 165 |
|
✗ |
if (download::IsProxyTransferError(error_code) |
| 166 |
|
✗ |
|| download::IsHostTransferError(error_code)) { |
| 167 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 168 |
|
|
"couldn't reach Stratum 0 - " |
| 169 |
|
|
"please check the network connection"); |
| 170 |
|
|
} else { |
| 171 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 172 |
|
|
"unexpected error - feel free to file " |
| 173 |
|
|
"a bug report"); |
| 174 |
|
|
} |
| 175 |
|
|
} |
| 176 |
|
|
} |
| 177 |
|
|
|
| 178 |
|
|
|
| 179 |
|
✗ |
static void Store(const string &local_path, |
| 180 |
|
|
const string &remote_path, |
| 181 |
|
|
const bool compressed_src) { |
| 182 |
|
✗ |
if (preload_cache) { |
| 183 |
|
✗ |
if (!compressed_src) { |
| 184 |
|
✗ |
const int retval = rename(local_path.c_str(), remote_path.c_str()); |
| 185 |
|
✗ |
if (retval != 0) { |
| 186 |
|
✗ |
PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(), |
| 187 |
|
|
remote_path.c_str()); |
| 188 |
|
|
} |
| 189 |
|
|
} else { |
| 190 |
|
|
// compressed input |
| 191 |
|
✗ |
string tmp_dest; |
| 192 |
|
✗ |
FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest); |
| 193 |
|
✗ |
if (fdest == NULL) { |
| 194 |
|
✗ |
PANIC(kLogStderr, "Failed to create temporary file '%s'", |
| 195 |
|
|
remote_path.c_str()); |
| 196 |
|
|
} |
| 197 |
|
✗ |
int retval = zlib::DecompressPath2File(local_path, fdest); |
| 198 |
|
✗ |
if (!retval) { |
| 199 |
|
✗ |
PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(), |
| 200 |
|
|
remote_path.c_str()); |
| 201 |
|
|
} |
| 202 |
|
✗ |
fclose(fdest); |
| 203 |
|
✗ |
retval = rename(tmp_dest.c_str(), remote_path.c_str()); |
| 204 |
|
✗ |
assert(retval == 0); |
| 205 |
|
✗ |
unlink(local_path.c_str()); |
| 206 |
|
|
} |
| 207 |
|
|
} else { |
| 208 |
|
✗ |
spooler->Upload(local_path, remote_path); |
| 209 |
|
|
} |
| 210 |
|
|
} |
| 211 |
|
|
|
| 212 |
|
✗ |
static void Store(const string &local_path, |
| 213 |
|
|
const shash::Any &remote_hash, |
| 214 |
|
|
const bool compressed_src = true) { |
| 215 |
|
✗ |
Store(local_path, MakePath(remote_hash), compressed_src); |
| 216 |
|
|
} |
| 217 |
|
|
|
| 218 |
|
|
|
| 219 |
|
✗ |
static void StoreBuffer(const unsigned char *buffer, const unsigned size, |
| 220 |
|
|
const std::string &dest_path, const bool compress) { |
| 221 |
|
✗ |
string tmp_file; |
| 222 |
|
✗ |
FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file); |
| 223 |
|
✗ |
assert(ftmp); |
| 224 |
|
|
int retval; |
| 225 |
|
✗ |
if (compress) { |
| 226 |
|
✗ |
shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused |
| 227 |
|
✗ |
retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy); |
| 228 |
|
|
} else { |
| 229 |
|
✗ |
retval = CopyMem2File(buffer, size, ftmp); |
| 230 |
|
|
} |
| 231 |
|
✗ |
assert(retval); |
| 232 |
|
✗ |
fclose(ftmp); |
| 233 |
|
✗ |
Store(tmp_file, dest_path, true); |
| 234 |
|
|
} |
| 235 |
|
|
|
| 236 |
|
✗ |
static void StoreBuffer(const unsigned char *buffer, const unsigned size, |
| 237 |
|
|
const shash::Any &dest_hash, const bool compress) { |
| 238 |
|
✗ |
StoreBuffer(buffer, size, MakePath(dest_hash), compress); |
| 239 |
|
|
} |
| 240 |
|
|
|
| 241 |
|
|
|
| 242 |
|
✗ |
static void WaitForStorage() { |
| 243 |
|
✗ |
if (!preload_cache) |
| 244 |
|
✗ |
spooler->WaitForUpload(); |
| 245 |
|
|
} |
| 246 |
|
|
|
| 247 |
|
|
|
| 248 |
|
|
struct MainWorkerContext { |
| 249 |
|
|
download::DownloadManager *download_manager; |
| 250 |
|
|
}; |
| 251 |
|
|
|
| 252 |
|
✗ |
static void *MainWorker(void *data) { |
| 253 |
|
✗ |
MainWorkerContext *mwc = static_cast<MainWorkerContext *>(data); |
| 254 |
|
✗ |
download::DownloadManager *download_manager = mwc->download_manager; |
| 255 |
|
|
|
| 256 |
|
|
while (1) { |
| 257 |
|
✗ |
ChunkJob next_chunk; |
| 258 |
|
|
{ |
| 259 |
|
✗ |
const MutexLockGuard m(&lock_pipe); |
| 260 |
|
✗ |
ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk)); |
| 261 |
|
|
} |
| 262 |
|
✗ |
if (next_chunk.IsTerminateJob()) |
| 263 |
|
✗ |
break; |
| 264 |
|
|
|
| 265 |
|
✗ |
const shash::Any chunk_hash = next_chunk.hash(); |
| 266 |
|
✗ |
const zlib::Algorithms compression_alg = next_chunk.compression_alg; |
| 267 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s", |
| 268 |
|
|
chunk_hash.ToString().c_str()); |
| 269 |
|
|
|
| 270 |
|
✗ |
if (!Peek(chunk_hash)) { |
| 271 |
|
✗ |
string tmp_file; |
| 272 |
|
✗ |
FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file); |
| 273 |
|
✗ |
const string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath(); |
| 274 |
|
✗ |
cvmfs::FileSink filesink(fchunk); |
| 275 |
|
|
download::JobInfo download_chunk(&url_chunk, false, false, &chunk_hash, |
| 276 |
|
✗ |
&filesink); |
| 277 |
|
|
|
| 278 |
|
✗ |
const download::Failures download_result = download_manager->Fetch( |
| 279 |
|
|
&download_chunk); |
| 280 |
|
✗ |
if (download_result != download::kFailOk) { |
| 281 |
|
✗ |
ReportDownloadError(download_chunk); |
| 282 |
|
✗ |
PANIC(kLogStderr, "Download error"); |
| 283 |
|
|
} |
| 284 |
|
✗ |
fclose(fchunk); |
| 285 |
|
✗ |
Store(tmp_file, chunk_hash, |
| 286 |
|
|
(compression_alg == zlib::kZlibDefault) ? true : false); |
| 287 |
|
✗ |
atomic_inc64(&overall_new); |
| 288 |
|
|
} |
| 289 |
|
✗ |
if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0) |
| 290 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, "."); |
| 291 |
|
✗ |
atomic_dec64(&chunk_queue); |
| 292 |
|
|
} |
| 293 |
|
✗ |
return NULL; |
| 294 |
|
|
} |
| 295 |
|
|
|
| 296 |
|
|
|
| 297 |
|
✗ |
bool CommandPull::PullRecursion(catalog::Catalog *catalog, |
| 298 |
|
|
const std::string &path) { |
| 299 |
|
✗ |
assert(catalog); |
| 300 |
|
|
|
| 301 |
|
|
|
| 302 |
|
|
// Nested catalogs (in a nested code block because goto fail...) |
| 303 |
|
|
{ |
| 304 |
|
|
const catalog::Catalog::NestedCatalogList |
| 305 |
|
✗ |
nested_catalogs = catalog->ListOwnNestedCatalogs(); |
| 306 |
|
✗ |
for (catalog::Catalog::NestedCatalogList::const_iterator |
| 307 |
|
✗ |
i = nested_catalogs.begin(), |
| 308 |
|
✗ |
iEnd = nested_catalogs.end(); |
| 309 |
|
✗ |
i != iEnd; |
| 310 |
|
✗ |
++i) { |
| 311 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s", |
| 312 |
|
|
i->mountpoint.c_str()); |
| 313 |
|
✗ |
shash::Any previous_catalog_hash; // expected to be null for subcatalog |
| 314 |
|
✗ |
const bool retval = Pull(i->hash, i->mountpoint.ToString(), |
| 315 |
|
|
previous_catalog_hash); |
| 316 |
|
✗ |
if (!retval) |
| 317 |
|
✗ |
return false; |
| 318 |
|
|
} |
| 319 |
|
|
} |
| 320 |
|
|
|
| 321 |
|
✗ |
return true; |
| 322 |
|
|
} |
| 323 |
|
|
|
| 324 |
|
✗ |
bool CommandPull::Pull(const shash::Any &catalog_hash, |
| 325 |
|
|
const std::string &path, |
| 326 |
|
|
shash::Any &previous_catalog) { |
| 327 |
|
|
int retval; |
| 328 |
|
|
download::Failures dl_retval; |
| 329 |
|
✗ |
assert(shash::kSuffixCatalog == catalog_hash.suffix); |
| 330 |
|
✗ |
previous_catalog.SetNull(); |
| 331 |
|
|
|
| 332 |
|
|
// Check if the catalog already exists |
| 333 |
|
✗ |
if (Peek(catalog_hash)) { |
| 334 |
|
|
// Preload: dirtab changed |
| 335 |
|
✗ |
if (inspect_existing_catalogs) { |
| 336 |
|
✗ |
if (!preload_cache) { |
| 337 |
|
✗ |
PANIC(kLogStderr, "to be implemented: -t without -c"); |
| 338 |
|
|
} |
| 339 |
|
✗ |
catalog::Catalog *catalog = catalog::Catalog::AttachFreely( |
| 340 |
|
✗ |
path, MakePath(catalog_hash), catalog_hash); |
| 341 |
|
✗ |
if (catalog == NULL) { |
| 342 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s", |
| 343 |
|
|
catalog_hash.ToString().c_str()); |
| 344 |
|
✗ |
return false; |
| 345 |
|
|
} |
| 346 |
|
✗ |
const bool retval = PullRecursion(catalog, path); |
| 347 |
|
✗ |
delete catalog; |
| 348 |
|
✗ |
return retval; |
| 349 |
|
|
} |
| 350 |
|
|
|
| 351 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date"); |
| 352 |
|
✗ |
return true; |
| 353 |
|
|
} |
| 354 |
|
|
|
| 355 |
|
|
// Check if the catalog matches the pathfilter |
| 356 |
|
✗ |
if (path != "" && // necessary to load the root catalog |
| 357 |
|
✗ |
pathfilter && !pathfilter->IsMatching(path)) { |
| 358 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
| 359 |
|
|
" Catalog in '%s' does not match" |
| 360 |
|
|
" the path specification", |
| 361 |
|
|
path.c_str()); |
| 362 |
|
✗ |
return true; |
| 363 |
|
|
} |
| 364 |
|
|
|
| 365 |
|
✗ |
const int64_t gauge_chunks = atomic_read64(&overall_chunks); |
| 366 |
|
✗ |
const int64_t gauge_new = atomic_read64(&overall_new); |
| 367 |
|
|
|
| 368 |
|
|
// Download and uncompress catalog |
| 369 |
|
✗ |
shash::Any chunk_hash; |
| 370 |
|
|
zlib::Algorithms compression_alg; |
| 371 |
|
✗ |
catalog::Catalog *catalog = NULL; |
| 372 |
|
✗ |
string file_catalog; |
| 373 |
|
✗ |
string file_catalog_vanilla; |
| 374 |
|
✗ |
FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", |
| 375 |
|
|
&file_catalog); |
| 376 |
|
✗ |
if (!fcatalog) { |
| 377 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "I/O error"); |
| 378 |
|
✗ |
return false; |
| 379 |
|
|
} |
| 380 |
|
✗ |
fclose(fcatalog); |
| 381 |
|
✗ |
FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", |
| 382 |
|
|
&file_catalog_vanilla); |
| 383 |
|
✗ |
if (!fcatalog_vanilla) { |
| 384 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "I/O error"); |
| 385 |
|
✗ |
unlink(file_catalog.c_str()); |
| 386 |
|
✗ |
return false; |
| 387 |
|
|
} |
| 388 |
|
✗ |
const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath(); |
| 389 |
|
✗ |
cvmfs::FileSink filesink(fcatalog_vanilla); |
| 390 |
|
|
download::JobInfo download_catalog(&url_catalog, false, false, &catalog_hash, |
| 391 |
|
✗ |
&filesink); |
| 392 |
|
✗ |
dl_retval = download_manager()->Fetch(&download_catalog); |
| 393 |
|
✗ |
fclose(fcatalog_vanilla); |
| 394 |
|
✗ |
if (dl_retval != download::kFailOk) { |
| 395 |
|
✗ |
if (path == "" && is_garbage_collectable) { |
| 396 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
| 397 |
|
|
"skipping missing root catalog %s - " |
| 398 |
|
|
"probably sweeped by garbage collection", |
| 399 |
|
|
catalog_hash.ToString().c_str()); |
| 400 |
|
✗ |
goto pull_skip; |
| 401 |
|
|
} else { |
| 402 |
|
✗ |
ReportDownloadError(download_catalog); |
| 403 |
|
✗ |
goto pull_cleanup; |
| 404 |
|
|
} |
| 405 |
|
|
} |
| 406 |
|
✗ |
retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog); |
| 407 |
|
✗ |
if (!retval) { |
| 408 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)", |
| 409 |
|
|
file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str()); |
| 410 |
|
✗ |
goto pull_cleanup; |
| 411 |
|
|
} |
| 412 |
|
✗ |
if (path.empty() && reflog != NULL) { |
| 413 |
|
✗ |
if (!reflog->AddCatalog(catalog_hash)) { |
| 414 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog."); |
| 415 |
|
✗ |
goto pull_cleanup; |
| 416 |
|
|
} |
| 417 |
|
|
} |
| 418 |
|
|
|
| 419 |
|
✗ |
catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash); |
| 420 |
|
✗ |
if (catalog == NULL) { |
| 421 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s", |
| 422 |
|
|
catalog_hash.ToString().c_str()); |
| 423 |
|
✗ |
goto pull_cleanup; |
| 424 |
|
|
} |
| 425 |
|
|
|
| 426 |
|
|
// Always pull the HEAD root catalog and nested catalogs |
| 427 |
|
✗ |
if (apply_timestamp_threshold && (path == "") |
| 428 |
|
✗ |
&& (catalog->GetLastModified() < timestamp_threshold)) { |
| 429 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
| 430 |
|
|
" Pruning at root catalog from %s due to threshold at %s", |
| 431 |
|
|
StringifyTime(catalog->GetLastModified(), false).c_str(), |
| 432 |
|
|
StringifyTime(timestamp_threshold, false).c_str()); |
| 433 |
|
✗ |
delete catalog; |
| 434 |
|
✗ |
goto pull_skip; |
| 435 |
|
|
} |
| 436 |
|
✗ |
apply_timestamp_threshold = true; |
| 437 |
|
|
|
| 438 |
|
|
// Traverse the chunks |
| 439 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, |
| 440 |
|
|
" Processing chunks [%" PRIu64 " registered chunks]: ", |
| 441 |
|
|
catalog->GetNumChunks()); |
| 442 |
|
✗ |
retval = catalog->AllChunksBegin(); |
| 443 |
|
✗ |
if (!retval) { |
| 444 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks"); |
| 445 |
|
✗ |
goto pull_cleanup; |
| 446 |
|
|
} |
| 447 |
|
✗ |
while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) { |
| 448 |
|
✗ |
ChunkJob next_chunk(chunk_hash, compression_alg); |
| 449 |
|
✗ |
WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk)); |
| 450 |
|
✗ |
atomic_inc64(&chunk_queue); |
| 451 |
|
|
} |
| 452 |
|
✗ |
catalog->AllChunksEnd(); |
| 453 |
|
✗ |
while (atomic_read64(&chunk_queue) != 0) { |
| 454 |
|
✗ |
SafeSleepMs(100); |
| 455 |
|
|
} |
| 456 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
| 457 |
|
|
" fetched %" PRId64 " new chunks out of " |
| 458 |
|
|
"%" PRId64 " unique chunks", |
| 459 |
|
|
atomic_read64(&overall_new) - gauge_new, |
| 460 |
|
|
atomic_read64(&overall_chunks) - gauge_chunks); |
| 461 |
|
|
|
| 462 |
|
✗ |
retval = PullRecursion(catalog, path); |
| 463 |
|
✗ |
previous_catalog = catalog->GetPreviousRevision(); |
| 464 |
|
✗ |
delete catalog; |
| 465 |
|
✗ |
unlink(file_catalog.c_str()); |
| 466 |
|
|
|
| 467 |
|
|
|
| 468 |
|
✗ |
WaitForStorage(); |
| 469 |
|
✗ |
if (!retval) |
| 470 |
|
✗ |
return false; |
| 471 |
|
✗ |
Store(file_catalog_vanilla, catalog_hash); |
| 472 |
|
✗ |
return true; |
| 473 |
|
|
|
| 474 |
|
✗ |
pull_cleanup: |
| 475 |
|
✗ |
delete catalog; |
| 476 |
|
✗ |
unlink(file_catalog.c_str()); |
| 477 |
|
✗ |
unlink(file_catalog_vanilla.c_str()); |
| 478 |
|
✗ |
return false; |
| 479 |
|
|
|
| 480 |
|
✗ |
pull_skip: |
| 481 |
|
✗ |
unlink(file_catalog.c_str()); |
| 482 |
|
✗ |
unlink(file_catalog_vanilla.c_str()); |
| 483 |
|
✗ |
return true; |
| 484 |
|
|
} |
| 485 |
|
|
|
| 486 |
|
|
|
| 487 |
|
✗ |
int swissknife::CommandPull::Main(const swissknife::ArgumentList &args) { |
| 488 |
|
|
int retval; |
| 489 |
|
|
manifest::Failures m_retval; |
| 490 |
|
|
download::Failures dl_retval; |
| 491 |
|
✗ |
unsigned timeout = 60; |
| 492 |
|
✗ |
const int fd_lockfile = -1; |
| 493 |
|
✗ |
string spooler_definition_str; |
| 494 |
|
✗ |
manifest::ManifestEnsemble ensemble; |
| 495 |
|
✗ |
shash::Any meta_info_hash; |
| 496 |
|
✗ |
string meta_info; |
| 497 |
|
✗ |
shash::Any previous_catalog_hash; |
| 498 |
|
✗ |
shash::Any current_catalog_hash; |
| 499 |
|
|
|
| 500 |
|
|
// Option parsing |
| 501 |
|
✗ |
if (args.find('c') != args.end()) |
| 502 |
|
✗ |
preload_cache = true; |
| 503 |
|
✗ |
if (args.find('l') != args.end()) { |
| 504 |
|
|
const unsigned log_level = kLogLevel0 |
| 505 |
|
✗ |
<< String2Uint64(*args.find('l')->second); |
| 506 |
|
✗ |
if (log_level > kLogNone) { |
| 507 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level"); |
| 508 |
|
✗ |
return 1; |
| 509 |
|
|
} |
| 510 |
|
✗ |
SetLogVerbosity(static_cast<LogLevels>(log_level)); |
| 511 |
|
|
} |
| 512 |
|
✗ |
stratum0_url = args.find('u')->second; |
| 513 |
|
✗ |
temp_dir = args.find('x')->second; |
| 514 |
|
✗ |
if (preload_cache) { |
| 515 |
|
✗ |
preload_cachedir = new string(*args.find('r')->second); |
| 516 |
|
|
} else { |
| 517 |
|
✗ |
spooler_definition_str = *args.find('r')->second; |
| 518 |
|
|
} |
| 519 |
|
✗ |
string master_keys = *args.find('k')->second; |
| 520 |
|
✗ |
if (DirectoryExists(master_keys)) |
| 521 |
|
✗ |
master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":"); |
| 522 |
|
✗ |
const string repository_name = *args.find('m')->second; |
| 523 |
|
✗ |
if (args.find('n') != args.end()) |
| 524 |
|
✗ |
num_parallel = String2Uint64(*args.find('n')->second); |
| 525 |
|
✗ |
if (args.find('t') != args.end()) |
| 526 |
|
✗ |
timeout = String2Uint64(*args.find('t')->second); |
| 527 |
|
✗ |
if (args.find('a') != args.end()) |
| 528 |
|
✗ |
retries = String2Uint64(*args.find('a')->second); |
| 529 |
|
✗ |
if (args.find('d') != args.end()) { |
| 530 |
|
✗ |
pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second); |
| 531 |
|
✗ |
assert(pathfilter->IsValid()); |
| 532 |
|
|
} |
| 533 |
|
✗ |
if (args.find('p') != args.end()) |
| 534 |
|
✗ |
pull_history = true; |
| 535 |
|
✗ |
if (args.find('z') != args.end()) |
| 536 |
|
✗ |
inspect_existing_catalogs = true; |
| 537 |
|
✗ |
if (args.find('w') != args.end()) |
| 538 |
|
✗ |
stratum1_url = args.find('w')->second; |
| 539 |
|
✗ |
if (args.find('i') != args.end()) |
| 540 |
|
✗ |
initial_snapshot = true; |
| 541 |
|
✗ |
shash::Any reflog_hash; |
| 542 |
|
✗ |
string reflog_chksum_path; |
| 543 |
|
✗ |
if (args.find('R') != args.end()) { |
| 544 |
|
✗ |
reflog_chksum_path = *args.find('R')->second; |
| 545 |
|
✗ |
if (!initial_snapshot) { |
| 546 |
|
✗ |
if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) { |
| 547 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum"); |
| 548 |
|
✗ |
return 1; |
| 549 |
|
|
} |
| 550 |
|
|
} |
| 551 |
|
|
} |
| 552 |
|
✗ |
if (args.find('Z') != args.end()) { |
| 553 |
|
✗ |
timestamp_threshold = String2Int64(*args.find('Z')->second); |
| 554 |
|
|
} |
| 555 |
|
|
|
| 556 |
|
✗ |
if (!preload_cache && stratum1_url.Get() == NULL) { |
| 557 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>"); |
| 558 |
|
✗ |
return 1; |
| 559 |
|
|
} |
| 560 |
|
|
|
| 561 |
|
|
typedef std::vector<history::History::Tag> TagVector; |
| 562 |
|
✗ |
TagVector historic_tags; |
| 563 |
|
|
|
| 564 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s", |
| 565 |
|
|
stratum0_url->c_str()); |
| 566 |
|
|
|
| 567 |
|
✗ |
int result = 1; |
| 568 |
|
|
|
| 569 |
|
|
// Initialization |
| 570 |
|
✗ |
atomic_init64(&overall_chunks); |
| 571 |
|
✗ |
atomic_init64(&overall_new); |
| 572 |
|
✗ |
atomic_init64(&chunk_queue); |
| 573 |
|
|
|
| 574 |
|
✗ |
const bool follow_redirects = false; |
| 575 |
|
✗ |
const unsigned max_pool_handles = num_parallel + 1; |
| 576 |
|
✗ |
const string proxy = (args.find('@') != args.end()) ? *args.find('@')->second |
| 577 |
|
✗ |
: ""; |
| 578 |
|
|
|
| 579 |
|
✗ |
if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) { |
| 580 |
|
✗ |
return 1; |
| 581 |
|
|
} |
| 582 |
|
|
|
| 583 |
|
✗ |
if (!this->InitSignatureManager(master_keys)) { |
| 584 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures"); |
| 585 |
|
✗ |
return 1; |
| 586 |
|
|
} else { |
| 587 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: using public key(s) %s", |
| 588 |
|
|
JoinStrings(SplitString(master_keys, ':'), ", ").c_str()); |
| 589 |
|
|
} |
| 590 |
|
|
|
| 591 |
|
|
unsigned current_group; |
| 592 |
|
✗ |
vector<vector<download::DownloadManager::ProxyInfo> > proxies; |
| 593 |
|
✗ |
download_manager()->GetProxyInfo(&proxies, ¤t_group, NULL); |
| 594 |
|
✗ |
if (proxies.size() > 0) { |
| 595 |
|
✗ |
string proxy_str = "\nWarning, replicating through proxies\n"; |
| 596 |
|
✗ |
proxy_str += " Load-balance groups:\n"; |
| 597 |
|
✗ |
for (unsigned i = 0; i < proxies.size(); ++i) { |
| 598 |
|
✗ |
vector<string> urls; |
| 599 |
|
✗ |
for (unsigned j = 0; j < proxies[i].size(); ++j) { |
| 600 |
|
✗ |
urls.push_back(proxies[i][j].url); |
| 601 |
|
|
} |
| 602 |
|
✗ |
proxy_str += " [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") |
| 603 |
|
✗ |
+ "\n"; |
| 604 |
|
|
} |
| 605 |
|
✗ |
proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] " |
| 606 |
|
✗ |
+ proxies[current_group][0].url; |
| 607 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str()); |
| 608 |
|
|
} |
| 609 |
|
✗ |
download_manager()->SetTimeout(timeout, timeout); |
| 610 |
|
✗ |
download_manager()->SetRetryParameters(retries, 500, 2000); |
| 611 |
|
✗ |
download_manager()->Spawn(); |
| 612 |
|
|
|
| 613 |
|
|
// init the download helper |
| 614 |
|
✗ |
const ObjectFetcher object_fetcher(repository_name, *stratum0_url, *temp_dir, |
| 615 |
|
✗ |
download_manager(), signature_manager()); |
| 616 |
|
|
|
| 617 |
|
|
pthread_t *workers = reinterpret_cast<pthread_t *>( |
| 618 |
|
✗ |
smalloc(sizeof(pthread_t) * num_parallel)); |
| 619 |
|
|
|
| 620 |
|
|
// Check if we have a replica-ready server |
| 621 |
|
✗ |
const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica"; |
| 622 |
|
✗ |
download::JobInfo download_sentinel(&url_sentinel, false); |
| 623 |
|
✗ |
retval = download_manager()->Fetch(&download_sentinel); |
| 624 |
|
✗ |
if (retval != download::kFailOk) { |
| 625 |
|
✗ |
if (download_sentinel.http_code() == 404) { |
| 626 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 627 |
|
|
"This is not a CernVM-FS server for replication"); |
| 628 |
|
|
} else { |
| 629 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 630 |
|
|
"Failed to contact stratum 0 server (%d - %s)", retval, |
| 631 |
|
|
download::Code2Ascii(download_sentinel.error_code())); |
| 632 |
|
|
} |
| 633 |
|
✗ |
goto fini; |
| 634 |
|
|
} |
| 635 |
|
|
|
| 636 |
|
✗ |
m_retval = FetchRemoteManifestEnsemble(*stratum0_url, repository_name, |
| 637 |
|
|
&ensemble); |
| 638 |
|
✗ |
if (m_retval != manifest::kFailOk) { |
| 639 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)", |
| 640 |
|
|
m_retval, manifest::Code2Ascii(m_retval)); |
| 641 |
|
✗ |
goto fini; |
| 642 |
|
|
} |
| 643 |
|
|
|
| 644 |
|
|
// Get meta info |
| 645 |
|
✗ |
meta_info_hash = ensemble.manifest->meta_info(); |
| 646 |
|
✗ |
if (!meta_info_hash.IsNull()) { |
| 647 |
|
✗ |
meta_info_hash = ensemble.manifest->meta_info(); |
| 648 |
|
✗ |
const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath(); |
| 649 |
|
✗ |
cvmfs::MemSink metainfo_memsink; |
| 650 |
|
|
download::JobInfo download_metainfo(&url, true, false, &meta_info_hash, |
| 651 |
|
✗ |
&metainfo_memsink); |
| 652 |
|
✗ |
dl_retval = download_manager()->Fetch(&download_metainfo); |
| 653 |
|
✗ |
if (dl_retval != download::kFailOk) { |
| 654 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)", |
| 655 |
|
|
dl_retval, download::Code2Ascii(dl_retval)); |
| 656 |
|
✗ |
goto fini; |
| 657 |
|
|
} |
| 658 |
|
✗ |
meta_info = string(reinterpret_cast<char *>(metainfo_memsink.data()), |
| 659 |
|
✗ |
metainfo_memsink.pos()); |
| 660 |
|
|
} |
| 661 |
|
|
|
| 662 |
|
✗ |
is_garbage_collectable = ensemble.manifest->garbage_collectable(); |
| 663 |
|
|
|
| 664 |
|
|
// Manifest available, now the spooler's hash algorithm can be determined |
| 665 |
|
|
// That doesn't actually matter because the replication does no re-hashing |
| 666 |
|
✗ |
if (!preload_cache) { |
| 667 |
|
|
const upload::SpoolerDefinition spooler_definition( |
| 668 |
|
✗ |
spooler_definition_str, ensemble.manifest->GetHashAlgorithm()); |
| 669 |
|
✗ |
spooler = upload::Spooler::Construct(spooler_definition); |
| 670 |
|
✗ |
assert(spooler); |
| 671 |
|
✗ |
spooler->RegisterListener(&SpoolerOnUpload); |
| 672 |
|
|
} |
| 673 |
|
|
|
| 674 |
|
|
// Open the reflog for modification |
| 675 |
|
✗ |
if (!preload_cache) { |
| 676 |
|
✗ |
if (initial_snapshot) { |
| 677 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'", |
| 678 |
|
|
repository_name.c_str()); |
| 679 |
|
✗ |
reflog = CreateEmptyReflog(*temp_dir, repository_name); |
| 680 |
|
✗ |
if (reflog == NULL) { |
| 681 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog"); |
| 682 |
|
✗ |
goto fini; |
| 683 |
|
|
} |
| 684 |
|
|
} else { |
| 685 |
|
|
ObjectFetcher object_fetcher_stratum1(repository_name, |
| 686 |
|
✗ |
*stratum1_url, |
| 687 |
|
✗ |
*temp_dir, |
| 688 |
|
|
download_manager(), |
| 689 |
|
✗ |
signature_manager()); |
| 690 |
|
|
|
| 691 |
|
✗ |
if (!reflog_hash.IsNull()) { |
| 692 |
|
✗ |
reflog = FetchReflog(&object_fetcher_stratum1, repository_name, |
| 693 |
|
|
reflog_hash); |
| 694 |
|
✗ |
assert(reflog != NULL); |
| 695 |
|
|
} else { |
| 696 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)"); |
| 697 |
|
✗ |
if (spooler->Peek(".cvmfsreflog")) { |
| 698 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 699 |
|
|
"no reflog hash specified but reflog is present"); |
| 700 |
|
✗ |
goto fini; |
| 701 |
|
|
} |
| 702 |
|
|
} |
| 703 |
|
|
} |
| 704 |
|
|
|
| 705 |
|
✗ |
if (reflog != NULL) { |
| 706 |
|
✗ |
reflog->BeginTransaction(); |
| 707 |
|
|
// On commit: use manifest's hash algorithm |
| 708 |
|
✗ |
reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm(); |
| 709 |
|
|
} |
| 710 |
|
|
} |
| 711 |
|
|
|
| 712 |
|
|
// Fetch tag list. |
| 713 |
|
|
// If we are just preloading the cache it is not strictly necessarily to |
| 714 |
|
|
// download the entire tag list |
| 715 |
|
|
// TODO(molina): add user option to download tags when preloading the cache |
| 716 |
|
✗ |
if (!ensemble.manifest->history().IsNull() && !preload_cache) { |
| 717 |
|
✗ |
const shash::Any history_hash = ensemble.manifest->history(); |
| 718 |
|
✗ |
const string history_url = *stratum0_url + "/data/" |
| 719 |
|
✗ |
+ history_hash.MakePath(); |
| 720 |
|
✗ |
const string history_path = *temp_dir + "/" + history_hash.ToString(); |
| 721 |
|
|
|
| 722 |
|
✗ |
cvmfs::PathSink pathsink(history_path); |
| 723 |
|
|
download::JobInfo download_history(&history_url, false, false, |
| 724 |
|
✗ |
&history_hash, &pathsink); |
| 725 |
|
✗ |
dl_retval = download_manager()->Fetch(&download_history); |
| 726 |
|
✗ |
if (dl_retval != download::kFailOk) { |
| 727 |
|
✗ |
ReportDownloadError(download_history); |
| 728 |
|
✗ |
goto fini; |
| 729 |
|
|
} |
| 730 |
|
✗ |
const std::string history_db_path = history_path + ".uncompressed"; |
| 731 |
|
✗ |
retval = zlib::DecompressPath2Path(history_path, history_db_path); |
| 732 |
|
✗ |
assert(retval); |
| 733 |
|
✗ |
history::History *tag_db = history::SqliteHistory::Open(history_db_path); |
| 734 |
|
✗ |
if (NULL == tag_db) { |
| 735 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)", |
| 736 |
|
|
history_db_path.c_str()); |
| 737 |
|
✗ |
unlink(history_db_path.c_str()); |
| 738 |
|
✗ |
goto fini; |
| 739 |
|
|
} |
| 740 |
|
✗ |
retval = tag_db->List(&historic_tags); |
| 741 |
|
✗ |
delete tag_db; |
| 742 |
|
✗ |
unlink(history_db_path.c_str()); |
| 743 |
|
✗ |
if (!retval) { |
| 744 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)", |
| 745 |
|
|
history_db_path.c_str()); |
| 746 |
|
✗ |
goto fini; |
| 747 |
|
|
} |
| 748 |
|
|
|
| 749 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots", |
| 750 |
|
|
historic_tags.size()); |
| 751 |
|
|
// TODO(jblomer): We should repliacte the previous history dbs, too, |
| 752 |
|
|
// in order to avoid races on fail-over between non-synchronized stratum 1s |
| 753 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database"); |
| 754 |
|
✗ |
Store(history_path, history_hash); |
| 755 |
|
✗ |
WaitForStorage(); |
| 756 |
|
✗ |
unlink(history_path.c_str()); |
| 757 |
|
✗ |
if (reflog != NULL && !reflog->AddHistory(history_hash)) { |
| 758 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog."); |
| 759 |
|
✗ |
goto fini; |
| 760 |
|
|
} |
| 761 |
|
|
} |
| 762 |
|
|
|
| 763 |
|
|
// Starting threads |
| 764 |
|
✗ |
MakePipe(pipe_chunks); |
| 765 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel); |
| 766 |
|
|
MainWorkerContext mwc; |
| 767 |
|
✗ |
mwc.download_manager = download_manager(); |
| 768 |
|
✗ |
for (unsigned i = 0; i < num_parallel; ++i) { |
| 769 |
|
✗ |
const int retval = pthread_create(&workers[i], NULL, MainWorker, |
| 770 |
|
|
static_cast<void *>(&mwc)); |
| 771 |
|
✗ |
assert(retval == 0); |
| 772 |
|
|
} |
| 773 |
|
|
|
| 774 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /"); |
| 775 |
|
✗ |
current_catalog_hash = ensemble.manifest->catalog_hash(); |
| 776 |
|
|
do { |
| 777 |
|
✗ |
retval = Pull(current_catalog_hash, "", previous_catalog_hash); |
| 778 |
|
✗ |
if (pull_history && !previous_catalog_hash.IsNull()) { |
| 779 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s", |
| 780 |
|
|
previous_catalog_hash.ToString().c_str()); |
| 781 |
|
|
} |
| 782 |
|
✗ |
current_catalog_hash = previous_catalog_hash; |
| 783 |
|
✗ |
} while (pull_history && !previous_catalog_hash.IsNull()); |
| 784 |
|
|
|
| 785 |
|
✗ |
if (!historic_tags.empty()) { |
| 786 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots..."); |
| 787 |
|
|
} |
| 788 |
|
✗ |
for (TagVector::const_iterator i = historic_tags.begin(), |
| 789 |
|
✗ |
iend = historic_tags.end(); |
| 790 |
|
✗ |
i != iend; |
| 791 |
|
✗ |
++i) { |
| 792 |
|
✗ |
if (Peek(i->root_hash)) |
| 793 |
|
✗ |
continue; |
| 794 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag", |
| 795 |
|
|
i->name.c_str()); |
| 796 |
|
✗ |
apply_timestamp_threshold = false; |
| 797 |
|
|
|
| 798 |
|
|
bool retval2; |
| 799 |
|
✗ |
current_catalog_hash = i->root_hash; |
| 800 |
|
|
do { |
| 801 |
|
✗ |
retval2 = Pull(current_catalog_hash, "", previous_catalog_hash); |
| 802 |
|
✗ |
if (pull_history && !previous_catalog_hash.IsNull()) { |
| 803 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s", |
| 804 |
|
|
previous_catalog_hash.ToString().c_str()); |
| 805 |
|
|
} |
| 806 |
|
✗ |
current_catalog_hash = previous_catalog_hash; |
| 807 |
|
✗ |
} while (pull_history && !previous_catalog_hash.IsNull()); |
| 808 |
|
✗ |
retval = retval && retval2; |
| 809 |
|
|
} |
| 810 |
|
|
|
| 811 |
|
|
// Stopping threads |
| 812 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel); |
| 813 |
|
✗ |
for (unsigned i = 0; i < num_parallel; ++i) { |
| 814 |
|
✗ |
ChunkJob terminate_workers; |
| 815 |
|
✗ |
WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers)); |
| 816 |
|
|
} |
| 817 |
|
✗ |
for (unsigned i = 0; i < num_parallel; ++i) { |
| 818 |
|
✗ |
int retval = pthread_join(workers[i], NULL); // NOLINT (false positive) |
| 819 |
|
✗ |
assert(retval == 0); |
| 820 |
|
|
} |
| 821 |
|
✗ |
ClosePipe(pipe_chunks); |
| 822 |
|
|
|
| 823 |
|
✗ |
if (!retval) |
| 824 |
|
✗ |
goto fini; |
| 825 |
|
|
|
| 826 |
|
|
// Upload manifest ensemble |
| 827 |
|
|
{ |
| 828 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble"); |
| 829 |
|
✗ |
WaitForStorage(); |
| 830 |
|
|
|
| 831 |
|
✗ |
if (!Peek(ensemble.manifest->certificate())) { |
| 832 |
|
✗ |
StoreBuffer(ensemble.cert_buf, ensemble.cert_size, |
| 833 |
|
✗ |
ensemble.manifest->certificate(), true); |
| 834 |
|
|
} |
| 835 |
|
✗ |
if (reflog != NULL |
| 836 |
|
✗ |
&& !reflog->AddCertificate(ensemble.manifest->certificate())) { |
| 837 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog."); |
| 838 |
|
✗ |
goto fini; |
| 839 |
|
|
} |
| 840 |
|
✗ |
if (!meta_info_hash.IsNull()) { |
| 841 |
|
|
const unsigned char *info = reinterpret_cast<const unsigned char *>( |
| 842 |
|
✗ |
meta_info.data()); |
| 843 |
|
✗ |
StoreBuffer(info, meta_info.size(), meta_info_hash, true); |
| 844 |
|
✗ |
if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) { |
| 845 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog."); |
| 846 |
|
✗ |
goto fini; |
| 847 |
|
|
} |
| 848 |
|
|
} |
| 849 |
|
|
|
| 850 |
|
|
// Create alternative bootstrapping symlinks for VOMS secured repos |
| 851 |
|
✗ |
if (ensemble.manifest->has_alt_catalog_path()) { |
| 852 |
|
✗ |
const bool success = spooler->PlaceBootstrappingShortcut( |
| 853 |
|
✗ |
ensemble.manifest->certificate()) |
| 854 |
|
✗ |
&& spooler->PlaceBootstrappingShortcut( |
| 855 |
|
✗ |
ensemble.manifest->catalog_hash()) |
| 856 |
|
✗ |
&& (ensemble.manifest->history().IsNull() |
| 857 |
|
✗ |
|| spooler->PlaceBootstrappingShortcut( |
| 858 |
|
✗ |
ensemble.manifest->history())) |
| 859 |
|
✗ |
&& (meta_info_hash.IsNull() |
| 860 |
|
✗ |
|| spooler->PlaceBootstrappingShortcut( |
| 861 |
|
✗ |
meta_info_hash)); |
| 862 |
|
|
|
| 863 |
|
✗ |
if (!success) { |
| 864 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
| 865 |
|
|
"failed to place root catalog bootstrapping symlinks"); |
| 866 |
|
✗ |
goto fini; |
| 867 |
|
|
} |
| 868 |
|
|
} |
| 869 |
|
|
|
| 870 |
|
|
// upload Reflog database |
| 871 |
|
✗ |
if (!preload_cache && reflog != NULL) { |
| 872 |
|
✗ |
reflog->CommitTransaction(); |
| 873 |
|
✗ |
reflog->DropDatabaseFileOwnership(); |
| 874 |
|
✗ |
const string reflog_path = reflog->database_file(); |
| 875 |
|
✗ |
delete reflog; |
| 876 |
|
✗ |
manifest::Reflog::HashDatabase(reflog_path, &reflog_hash); |
| 877 |
|
✗ |
WaitForStorage(); // Reduce the duration of reflog /wo checksum |
| 878 |
|
✗ |
spooler->UploadReflog(reflog_path); |
| 879 |
|
✗ |
spooler->WaitForUpload(); |
| 880 |
|
✗ |
unlink(reflog_path.c_str()); |
| 881 |
|
✗ |
if (spooler->GetNumberOfErrors()) { |
| 882 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)", |
| 883 |
|
|
spooler->GetNumberOfErrors()); |
| 884 |
|
✗ |
goto fini; |
| 885 |
|
|
} |
| 886 |
|
✗ |
assert(!reflog_chksum_path.empty()); |
| 887 |
|
✗ |
manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash); |
| 888 |
|
|
} |
| 889 |
|
|
|
| 890 |
|
✗ |
if (preload_cache) { |
| 891 |
|
✗ |
const bool retval = ensemble.manifest->ExportBreadcrumb(*preload_cachedir, |
| 892 |
|
|
0660); |
| 893 |
|
✗ |
assert(retval); |
| 894 |
|
|
} else { |
| 895 |
|
|
// pkcs#7 structure contains content + certificate + signature |
| 896 |
|
|
// So there is no race with whitelist and pkcs7 signature being out of |
| 897 |
|
|
// sync |
| 898 |
|
✗ |
if (ensemble.whitelist_pkcs7_buf) { |
| 899 |
|
✗ |
StoreBuffer(ensemble.whitelist_pkcs7_buf, ensemble.whitelist_pkcs7_size, |
| 900 |
|
|
".cvmfswhitelist.pkcs7", false); |
| 901 |
|
|
} |
| 902 |
|
✗ |
StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size, |
| 903 |
|
|
".cvmfswhitelist", false); |
| 904 |
|
✗ |
StoreBuffer(ensemble.raw_manifest_buf, ensemble.raw_manifest_size, |
| 905 |
|
|
".cvmfspublished", false); |
| 906 |
|
|
} |
| 907 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64, |
| 908 |
|
|
ensemble.manifest->revision()); |
| 909 |
|
|
} |
| 910 |
|
|
|
| 911 |
|
✗ |
WaitForStorage(); |
| 912 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
| 913 |
|
|
"Fetched %" PRId64 " new chunks out of %" PRId64 " processed chunks", |
| 914 |
|
|
atomic_read64(&overall_new), atomic_read64(&overall_chunks)); |
| 915 |
|
✗ |
result = 0; |
| 916 |
|
|
|
| 917 |
|
✗ |
fini: |
| 918 |
|
|
if (fd_lockfile >= 0) |
| 919 |
|
|
UnlockFile(fd_lockfile); |
| 920 |
|
✗ |
free(workers); |
| 921 |
|
✗ |
delete spooler; |
| 922 |
|
✗ |
delete pathfilter; |
| 923 |
|
✗ |
return result; |
| 924 |
|
|
} |
| 925 |
|
|
|
| 926 |
|
|
} // namespace swissknife |
| 927 |
|
|
|