Line |
Branch |
Exec |
Source |
1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
* |
4 |
|
|
* Replicates a cvmfs repository. Uses the cvmfs intrinsic Merkle trees |
5 |
|
|
* to calculate the difference set. |
6 |
|
|
*/ |
7 |
|
|
|
8 |
|
|
// NOLINTNEXTLINE |
9 |
|
|
#define _FILE_OFFSET_BITS 64 |
10 |
|
|
// NOLINTNEXTLINE |
11 |
|
|
#define __STDC_FORMAT_MACROS |
12 |
|
|
|
13 |
|
|
|
14 |
|
|
#include "swissknife_pull.h" |
15 |
|
|
|
16 |
|
|
#include <inttypes.h> |
17 |
|
|
#include <pthread.h> |
18 |
|
|
#include <sys/stat.h> |
19 |
|
|
#include <unistd.h> |
20 |
|
|
|
21 |
|
|
#include <cstdlib> |
22 |
|
|
#include <cstring> |
23 |
|
|
#include <string> |
24 |
|
|
#include <vector> |
25 |
|
|
|
26 |
|
|
#include "catalog.h" |
27 |
|
|
#include "compression/compression.h" |
28 |
|
|
#include "crypto/hash.h" |
29 |
|
|
#include "crypto/signature.h" |
30 |
|
|
#include "history_sqlite.h" |
31 |
|
|
#include "manifest.h" |
32 |
|
|
#include "manifest_fetch.h" |
33 |
|
|
#include "network/download.h" |
34 |
|
|
#include "object_fetcher.h" |
35 |
|
|
#include "path_filters/relaxed_path_filter.h" |
36 |
|
|
#include "reflog.h" |
37 |
|
|
#include "upload.h" |
38 |
|
|
#include "util/atomic.h" |
39 |
|
|
#include "util/concurrency.h" |
40 |
|
|
#include "util/exception.h" |
41 |
|
|
#include "util/logging.h" |
42 |
|
|
#include "util/posix.h" |
43 |
|
|
#include "util/shared_ptr.h" |
44 |
|
|
#include "util/smalloc.h" |
45 |
|
|
#include "util/string.h" |
46 |
|
|
|
47 |
|
|
using namespace std; // NOLINT |
48 |
|
|
|
49 |
|
|
namespace swissknife { |
50 |
|
|
|
51 |
|
|
namespace { |
52 |
|
|
|
53 |
|
|
typedef HttpObjectFetcher<> ObjectFetcher; |
54 |
|
|
|
55 |
|
|
/** |
56 |
|
|
* This just stores an shash::Any in a predictable way to send it through a |
57 |
|
|
* POSIX pipe. |
58 |
|
|
*/ |
59 |
|
|
class ChunkJob { |
60 |
|
|
public: |
61 |
|
✗ |
ChunkJob() |
62 |
|
✗ |
: suffix(shash::kSuffixNone) |
63 |
|
✗ |
, hash_algorithm(shash::kAny) |
64 |
|
✗ |
, compression_alg(zlib::kZlibDefault) {} |
65 |
|
|
|
66 |
|
✗ |
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg) |
67 |
|
✗ |
: suffix(hash.suffix) |
68 |
|
✗ |
, hash_algorithm(hash.algorithm) |
69 |
|
✗ |
, compression_alg(compression_alg) |
70 |
|
|
{ |
71 |
|
✗ |
memcpy(digest, hash.digest, hash.GetDigestSize()); |
72 |
|
|
} |
73 |
|
|
|
74 |
|
✗ |
bool IsTerminateJob() const { |
75 |
|
✗ |
return (hash_algorithm == shash::kAny); |
76 |
|
|
} |
77 |
|
|
|
78 |
|
✗ |
shash::Any hash() const { |
79 |
|
✗ |
assert(!IsTerminateJob()); |
80 |
|
✗ |
return shash::Any(hash_algorithm, |
81 |
|
✗ |
digest, |
82 |
|
✗ |
suffix); |
83 |
|
|
} |
84 |
|
|
|
85 |
|
|
const shash::Suffix suffix; |
86 |
|
|
const shash::Algorithms hash_algorithm; |
87 |
|
|
const zlib::Algorithms compression_alg; |
88 |
|
|
unsigned char digest[shash::kMaxDigestSize]; |
89 |
|
|
}; |
90 |
|
|
|
91 |
|
✗ |
static void SpoolerOnUpload(const upload::SpoolerResult &result) { |
92 |
|
✗ |
unlink(result.local_path.c_str()); |
93 |
|
✗ |
if (result.return_code != 0) { |
94 |
|
✗ |
PANIC(kLogStderr, "spooler failure %d (%s, hash: %s)", result.return_code, |
95 |
|
|
result.local_path.c_str(), result.content_hash.ToString().c_str()); |
96 |
|
|
} |
97 |
|
|
} |
98 |
|
|
|
99 |
|
|
SharedPtr<string> stratum0_url; |
100 |
|
|
SharedPtr<string> stratum1_url; |
101 |
|
|
SharedPtr<string> temp_dir; |
102 |
|
|
unsigned num_parallel = 1; |
103 |
|
|
bool pull_history = false; |
104 |
|
|
bool apply_timestamp_threshold = false; |
105 |
|
|
uint64_t timestamp_threshold = 0; |
106 |
|
|
bool is_garbage_collectable = false; |
107 |
|
|
bool initial_snapshot = false; |
108 |
|
|
upload::Spooler *spooler = NULL; |
109 |
|
|
int pipe_chunks[2]; |
110 |
|
|
// required for concurrent reading |
111 |
|
|
pthread_mutex_t lock_pipe = PTHREAD_MUTEX_INITIALIZER; |
112 |
|
|
unsigned retries = 3; |
113 |
|
|
catalog::RelaxedPathFilter *pathfilter = NULL; |
114 |
|
|
atomic_int64 overall_chunks; |
115 |
|
|
atomic_int64 overall_new; |
116 |
|
|
atomic_int64 chunk_queue; |
117 |
|
|
bool preload_cache = false; |
118 |
|
|
string *preload_cachedir = NULL; |
119 |
|
|
bool inspect_existing_catalogs = false; |
120 |
|
|
manifest::Reflog *reflog = NULL; |
121 |
|
|
|
122 |
|
|
} // anonymous namespace |
123 |
|
|
|
124 |
|
|
|
125 |
|
✗ |
static std::string MakePath(const shash::Any &hash) { |
126 |
|
|
return (preload_cache) |
127 |
|
✗ |
? *preload_cachedir + "/" + hash.MakePathWithoutSuffix() |
128 |
|
✗ |
: "data/" + hash.MakePath(); |
129 |
|
|
} |
130 |
|
|
|
131 |
|
|
|
132 |
|
✗ |
static bool Peek(const string &remote_path) { |
133 |
|
✗ |
return (preload_cache) ? FileExists(remote_path) |
134 |
|
✗ |
: spooler->Peek(remote_path); |
135 |
|
|
} |
136 |
|
|
|
137 |
|
✗ |
static bool Peek(const shash::Any &remote_hash) { |
138 |
|
✗ |
return Peek(MakePath(remote_hash)); |
139 |
|
|
} |
140 |
|
|
|
141 |
|
✗ |
static void ReportDownloadError(const download::JobInfo &download_job) { |
142 |
|
✗ |
const download::Failures error_code = download_job.error_code(); |
143 |
|
✗ |
const int http_code = download_job.http_code(); |
144 |
|
✗ |
const std::string url = *download_job.url(); |
145 |
|
|
|
146 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to download %s (%d - %s)", |
147 |
|
|
url.c_str(), error_code, download::Code2Ascii(error_code)); |
148 |
|
|
|
149 |
|
✗ |
switch (error_code) { |
150 |
|
✗ |
case download::kFailProxyResolve: |
151 |
|
|
case download::kFailHostResolve: |
152 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "DNS lookup for Stratum 0 failed - " |
153 |
|
|
"please check the network connection"); |
154 |
|
✗ |
break; |
155 |
|
|
|
156 |
|
✗ |
case download::kFailHostHttp: |
157 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "unexpected HTTP error code %d - " |
158 |
|
|
"please check the stratum 0 health", http_code); |
159 |
|
✗ |
break; |
160 |
|
|
|
161 |
|
✗ |
case download::kFailBadData: |
162 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "downloaded corrupted data - " |
163 |
|
|
"please check the stratum 0 health"); |
164 |
|
✗ |
break; |
165 |
|
|
|
166 |
|
✗ |
default: |
167 |
|
✗ |
if (download::IsProxyTransferError(error_code) || |
168 |
|
✗ |
download::IsHostTransferError(error_code)) { |
169 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "couldn't reach Stratum 0 - " |
170 |
|
|
"please check the network connection"); |
171 |
|
|
} else { |
172 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "unexpected error - feel free to file " |
173 |
|
|
"a bug report"); |
174 |
|
|
} |
175 |
|
|
} |
176 |
|
|
} |
177 |
|
|
|
178 |
|
|
|
179 |
|
✗ |
static void Store( |
180 |
|
|
const string &local_path, |
181 |
|
|
const string &remote_path, |
182 |
|
|
const bool compressed_src) |
183 |
|
|
{ |
184 |
|
✗ |
if (preload_cache) { |
185 |
|
✗ |
if (!compressed_src) { |
186 |
|
✗ |
int retval = rename(local_path.c_str(), remote_path.c_str()); |
187 |
|
✗ |
if (retval != 0) { |
188 |
|
✗ |
PANIC(kLogStderr, "Failed to move '%s' to '%s'", local_path.c_str(), |
189 |
|
|
remote_path.c_str()); |
190 |
|
|
} |
191 |
|
|
} else { |
192 |
|
|
// compressed input |
193 |
|
✗ |
string tmp_dest; |
194 |
|
✗ |
FILE *fdest = CreateTempFile(remote_path, 0660, "w", &tmp_dest); |
195 |
|
✗ |
if (fdest == NULL) { |
196 |
|
✗ |
PANIC(kLogStderr, "Failed to create temporary file '%s'", |
197 |
|
|
remote_path.c_str()); |
198 |
|
|
} |
199 |
|
✗ |
int retval = zlib::DecompressPath2File(local_path, fdest); |
200 |
|
✗ |
if (!retval) { |
201 |
|
✗ |
PANIC(kLogStderr, "Failed to preload %s to %s", local_path.c_str(), |
202 |
|
|
remote_path.c_str()); |
203 |
|
|
} |
204 |
|
✗ |
fclose(fdest); |
205 |
|
✗ |
retval = rename(tmp_dest.c_str(), remote_path.c_str()); |
206 |
|
✗ |
assert(retval == 0); |
207 |
|
✗ |
unlink(local_path.c_str()); |
208 |
|
|
} |
209 |
|
|
} else { |
210 |
|
✗ |
spooler->Upload(local_path, remote_path); |
211 |
|
|
} |
212 |
|
|
} |
213 |
|
|
|
214 |
|
✗ |
static void Store( |
215 |
|
|
const string &local_path, |
216 |
|
|
const shash::Any &remote_hash, |
217 |
|
|
const bool compressed_src = true) |
218 |
|
|
{ |
219 |
|
✗ |
Store(local_path, MakePath(remote_hash), compressed_src); |
220 |
|
|
} |
221 |
|
|
|
222 |
|
|
|
223 |
|
✗ |
static void StoreBuffer(const unsigned char *buffer, const unsigned size, |
224 |
|
|
const std::string &dest_path, const bool compress) { |
225 |
|
✗ |
string tmp_file; |
226 |
|
✗ |
FILE *ftmp = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", &tmp_file); |
227 |
|
✗ |
assert(ftmp); |
228 |
|
|
int retval; |
229 |
|
✗ |
if (compress) { |
230 |
|
✗ |
shash::Any dummy(shash::kSha1); // hardcoded hash no problem, unused |
231 |
|
✗ |
retval = zlib::CompressMem2File(buffer, size, ftmp, &dummy); |
232 |
|
|
} else { |
233 |
|
✗ |
retval = CopyMem2File(buffer, size, ftmp); |
234 |
|
|
} |
235 |
|
✗ |
assert(retval); |
236 |
|
✗ |
fclose(ftmp); |
237 |
|
✗ |
Store(tmp_file, dest_path, true); |
238 |
|
|
} |
239 |
|
|
|
240 |
|
✗ |
static void StoreBuffer(const unsigned char *buffer, const unsigned size, |
241 |
|
|
const shash::Any &dest_hash, const bool compress) { |
242 |
|
✗ |
StoreBuffer(buffer, size, MakePath(dest_hash), compress); |
243 |
|
|
} |
244 |
|
|
|
245 |
|
|
|
246 |
|
✗ |
static void WaitForStorage() { |
247 |
|
✗ |
if (!preload_cache) spooler->WaitForUpload(); |
248 |
|
|
} |
249 |
|
|
|
250 |
|
|
|
251 |
|
|
struct MainWorkerContext { |
252 |
|
|
download::DownloadManager *download_manager; |
253 |
|
|
}; |
254 |
|
|
|
255 |
|
✗ |
static void *MainWorker(void *data) { |
256 |
|
✗ |
MainWorkerContext *mwc = static_cast<MainWorkerContext*>(data); |
257 |
|
✗ |
download::DownloadManager *download_manager = mwc->download_manager; |
258 |
|
|
|
259 |
|
|
while (1) { |
260 |
|
✗ |
ChunkJob next_chunk; |
261 |
|
|
{ |
262 |
|
✗ |
MutexLockGuard m(&lock_pipe); |
263 |
|
✗ |
ReadPipe(pipe_chunks[0], &next_chunk, sizeof(next_chunk)); |
264 |
|
|
} |
265 |
|
✗ |
if (next_chunk.IsTerminateJob()) |
266 |
|
✗ |
break; |
267 |
|
|
|
268 |
|
✗ |
shash::Any chunk_hash = next_chunk.hash(); |
269 |
|
✗ |
zlib::Algorithms compression_alg = next_chunk.compression_alg; |
270 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogVerboseMsg, "processing chunk %s", |
271 |
|
|
chunk_hash.ToString().c_str()); |
272 |
|
|
|
273 |
|
✗ |
if (!Peek(chunk_hash)) { |
274 |
|
✗ |
string tmp_file; |
275 |
|
✗ |
FILE *fchunk = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", |
276 |
|
|
&tmp_file); |
277 |
|
✗ |
string url_chunk = *stratum0_url + "/data/" + chunk_hash.MakePath(); |
278 |
|
✗ |
cvmfs::FileSink filesink(fchunk); |
279 |
|
|
download::JobInfo download_chunk(&url_chunk, false, false, |
280 |
|
✗ |
&chunk_hash, &filesink); |
281 |
|
|
|
282 |
|
|
const download::Failures download_result = |
283 |
|
✗ |
download_manager->Fetch(&download_chunk); |
284 |
|
✗ |
if (download_result != download::kFailOk) { |
285 |
|
✗ |
ReportDownloadError(download_chunk); |
286 |
|
✗ |
PANIC(kLogStderr, "Download error"); |
287 |
|
|
} |
288 |
|
✗ |
fclose(fchunk); |
289 |
|
✗ |
Store(tmp_file, chunk_hash, |
290 |
|
|
(compression_alg == zlib::kZlibDefault) ? true : false); |
291 |
|
✗ |
atomic_inc64(&overall_new); |
292 |
|
|
} |
293 |
|
✗ |
if (atomic_xadd64(&overall_chunks, 1) % 1000 == 0) |
294 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, "."); |
295 |
|
✗ |
atomic_dec64(&chunk_queue); |
296 |
|
|
} |
297 |
|
✗ |
return NULL; |
298 |
|
|
} |
299 |
|
|
|
300 |
|
|
|
301 |
|
✗ |
bool CommandPull::PullRecursion(catalog::Catalog *catalog, |
302 |
|
|
const std::string &path) { |
303 |
|
✗ |
assert(catalog); |
304 |
|
|
|
305 |
|
|
// Previous catalogs |
306 |
|
✗ |
if (pull_history) { |
307 |
|
✗ |
shash::Any previous_catalog = catalog->GetPreviousRevision(); |
308 |
|
✗ |
if (previous_catalog.IsNull()) { |
309 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Start of catalog, no more history"); |
310 |
|
|
} else { |
311 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from historic catalog %s", |
312 |
|
|
previous_catalog.ToString().c_str()); |
313 |
|
✗ |
bool retval = Pull(previous_catalog, path); |
314 |
|
✗ |
if (!retval) |
315 |
|
✗ |
return false; |
316 |
|
|
} |
317 |
|
|
} |
318 |
|
|
|
319 |
|
|
// Nested catalogs (in a nested code block because goto fail...) |
320 |
|
|
{ |
321 |
|
|
const catalog::Catalog::NestedCatalogList nested_catalogs = |
322 |
|
✗ |
catalog->ListOwnNestedCatalogs(); |
323 |
|
✗ |
for (catalog::Catalog::NestedCatalogList::const_iterator i = |
324 |
|
✗ |
nested_catalogs.begin(), iEnd = nested_catalogs.end(); |
325 |
|
✗ |
i != iEnd; ++i) |
326 |
|
|
{ |
327 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from catalog at %s", |
328 |
|
|
i->mountpoint.c_str()); |
329 |
|
✗ |
bool retval = Pull(i->hash, i->mountpoint.ToString()); |
330 |
|
✗ |
if (!retval) |
331 |
|
✗ |
return false; |
332 |
|
|
} |
333 |
|
|
} |
334 |
|
|
|
335 |
|
✗ |
return true; |
336 |
|
|
} |
337 |
|
|
|
338 |
|
✗ |
bool CommandPull::Pull(const shash::Any &catalog_hash, |
339 |
|
|
const std::string &path) { |
340 |
|
|
int retval; |
341 |
|
|
download::Failures dl_retval; |
342 |
|
✗ |
assert(shash::kSuffixCatalog == catalog_hash.suffix); |
343 |
|
|
|
344 |
|
|
// Check if the catalog already exists |
345 |
|
✗ |
if (Peek(catalog_hash)) { |
346 |
|
|
// Preload: dirtab changed |
347 |
|
✗ |
if (inspect_existing_catalogs) { |
348 |
|
✗ |
if (!preload_cache) { |
349 |
|
✗ |
PANIC(kLogStderr, "to be implemented: -t without -c"); |
350 |
|
|
} |
351 |
|
✗ |
catalog::Catalog *catalog = catalog::Catalog::AttachFreely( |
352 |
|
✗ |
path, MakePath(catalog_hash), catalog_hash); |
353 |
|
✗ |
if (catalog == NULL) { |
354 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s", |
355 |
|
|
catalog_hash.ToString().c_str()); |
356 |
|
✗ |
return false; |
357 |
|
|
} |
358 |
|
✗ |
bool retval = PullRecursion(catalog, path); |
359 |
|
✗ |
delete catalog; |
360 |
|
✗ |
return retval; |
361 |
|
|
} |
362 |
|
|
|
363 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, " Catalog up to date"); |
364 |
|
✗ |
return true; |
365 |
|
|
} |
366 |
|
|
|
367 |
|
|
// Check if the catalog matches the pathfilter |
368 |
|
✗ |
if (path != "" && // necessary to load the root catalog |
369 |
|
✗ |
pathfilter && |
370 |
|
✗ |
!pathfilter->IsMatching(path)) { |
371 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, " Catalog in '%s' does not match" |
372 |
|
|
" the path specification", path.c_str()); |
373 |
|
✗ |
return true; |
374 |
|
|
} |
375 |
|
|
|
376 |
|
✗ |
int64_t gauge_chunks = atomic_read64(&overall_chunks); |
377 |
|
✗ |
int64_t gauge_new = atomic_read64(&overall_new); |
378 |
|
|
|
379 |
|
|
// Download and uncompress catalog |
380 |
|
✗ |
shash::Any chunk_hash; |
381 |
|
|
zlib::Algorithms compression_alg; |
382 |
|
✗ |
catalog::Catalog *catalog = NULL; |
383 |
|
✗ |
string file_catalog; |
384 |
|
✗ |
string file_catalog_vanilla; |
385 |
|
✗ |
FILE *fcatalog = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", |
386 |
|
|
&file_catalog); |
387 |
|
✗ |
if (!fcatalog) { |
388 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "I/O error"); |
389 |
|
✗ |
return false; |
390 |
|
|
} |
391 |
|
✗ |
fclose(fcatalog); |
392 |
|
✗ |
FILE *fcatalog_vanilla = CreateTempFile(*temp_dir + "/cvmfs", 0600, "w", |
393 |
|
|
&file_catalog_vanilla); |
394 |
|
✗ |
if (!fcatalog_vanilla) { |
395 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "I/O error"); |
396 |
|
✗ |
unlink(file_catalog.c_str()); |
397 |
|
✗ |
return false; |
398 |
|
|
} |
399 |
|
✗ |
const string url_catalog = *stratum0_url + "/data/" + catalog_hash.MakePath(); |
400 |
|
✗ |
cvmfs::FileSink filesink(fcatalog_vanilla); |
401 |
|
|
download::JobInfo download_catalog(&url_catalog, false, false, |
402 |
|
✗ |
&catalog_hash, &filesink); |
403 |
|
✗ |
dl_retval = download_manager()->Fetch(&download_catalog); |
404 |
|
✗ |
fclose(fcatalog_vanilla); |
405 |
|
✗ |
if (dl_retval != download::kFailOk) { |
406 |
|
✗ |
if (path == "" && is_garbage_collectable) { |
407 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "skipping missing root catalog %s - " |
408 |
|
|
"probably sweeped by garbage collection", |
409 |
|
|
catalog_hash.ToString().c_str()); |
410 |
|
✗ |
goto pull_skip; |
411 |
|
|
} else { |
412 |
|
✗ |
ReportDownloadError(download_catalog); |
413 |
|
✗ |
goto pull_cleanup; |
414 |
|
|
} |
415 |
|
|
} |
416 |
|
✗ |
retval = zlib::DecompressPath2Path(file_catalog_vanilla, file_catalog); |
417 |
|
✗ |
if (!retval) { |
418 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "decompression failure (file %s, hash %s)", |
419 |
|
|
file_catalog_vanilla.c_str(), catalog_hash.ToString().c_str()); |
420 |
|
✗ |
goto pull_cleanup; |
421 |
|
|
} |
422 |
|
✗ |
if (path.empty() && reflog != NULL) { |
423 |
|
✗ |
if (!reflog->AddCatalog(catalog_hash)) { |
424 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to add catalog to Reflog."); |
425 |
|
✗ |
goto pull_cleanup; |
426 |
|
|
} |
427 |
|
|
} |
428 |
|
|
|
429 |
|
✗ |
catalog = catalog::Catalog::AttachFreely(path, file_catalog, catalog_hash); |
430 |
|
✗ |
if (catalog == NULL) { |
431 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to attach catalog %s", |
432 |
|
|
catalog_hash.ToString().c_str()); |
433 |
|
✗ |
goto pull_cleanup; |
434 |
|
|
} |
435 |
|
|
|
436 |
|
|
// Always pull the HEAD root catalog and nested catalogs |
437 |
|
✗ |
if (apply_timestamp_threshold && (path == "") && |
438 |
|
✗ |
(catalog->GetLastModified() < timestamp_threshold)) |
439 |
|
|
{ |
440 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
441 |
|
|
" Pruning at root catalog from %s due to threshold at %s", |
442 |
|
|
StringifyTime(catalog->GetLastModified(), false).c_str(), |
443 |
|
|
StringifyTime(timestamp_threshold, false).c_str()); |
444 |
|
✗ |
delete catalog; |
445 |
|
✗ |
goto pull_skip; |
446 |
|
|
} |
447 |
|
✗ |
apply_timestamp_threshold = true; |
448 |
|
|
|
449 |
|
|
// Traverse the chunks |
450 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, |
451 |
|
|
" Processing chunks [%" PRIu64 " registered chunks]: ", |
452 |
|
|
catalog->GetNumChunks()); |
453 |
|
✗ |
retval = catalog->AllChunksBegin(); |
454 |
|
✗ |
if (!retval) { |
455 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to gather chunks"); |
456 |
|
✗ |
goto pull_cleanup; |
457 |
|
|
} |
458 |
|
✗ |
while (catalog->AllChunksNext(&chunk_hash, &compression_alg)) { |
459 |
|
✗ |
ChunkJob next_chunk(chunk_hash, compression_alg); |
460 |
|
✗ |
WritePipe(pipe_chunks[1], &next_chunk, sizeof(next_chunk)); |
461 |
|
✗ |
atomic_inc64(&chunk_queue); |
462 |
|
|
} |
463 |
|
✗ |
catalog->AllChunksEnd(); |
464 |
|
✗ |
while (atomic_read64(&chunk_queue) != 0) { |
465 |
|
✗ |
SafeSleepMs(100); |
466 |
|
|
} |
467 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, " fetched %" PRId64 " new chunks out of " |
468 |
|
|
"%" PRId64 " unique chunks", |
469 |
|
|
atomic_read64(&overall_new)-gauge_new, |
470 |
|
|
atomic_read64(&overall_chunks)-gauge_chunks); |
471 |
|
|
|
472 |
|
✗ |
retval = PullRecursion(catalog, path); |
473 |
|
|
|
474 |
|
✗ |
delete catalog; |
475 |
|
✗ |
unlink(file_catalog.c_str()); |
476 |
|
✗ |
WaitForStorage(); |
477 |
|
✗ |
if (!retval) |
478 |
|
✗ |
return false; |
479 |
|
✗ |
Store(file_catalog_vanilla, catalog_hash); |
480 |
|
✗ |
return true; |
481 |
|
|
|
482 |
|
✗ |
pull_cleanup: |
483 |
|
✗ |
delete catalog; |
484 |
|
✗ |
unlink(file_catalog.c_str()); |
485 |
|
✗ |
unlink(file_catalog_vanilla.c_str()); |
486 |
|
✗ |
return false; |
487 |
|
|
|
488 |
|
✗ |
pull_skip: |
489 |
|
✗ |
unlink(file_catalog.c_str()); |
490 |
|
✗ |
unlink(file_catalog_vanilla.c_str()); |
491 |
|
✗ |
return true; |
492 |
|
|
} |
493 |
|
|
|
494 |
|
|
|
495 |
|
✗ |
int swissknife::CommandPull::Main(const swissknife::ArgumentList &args) { |
496 |
|
|
int retval; |
497 |
|
|
manifest::Failures m_retval; |
498 |
|
|
download::Failures dl_retval; |
499 |
|
✗ |
unsigned timeout = 60; |
500 |
|
✗ |
int fd_lockfile = -1; |
501 |
|
✗ |
string spooler_definition_str; |
502 |
|
✗ |
manifest::ManifestEnsemble ensemble; |
503 |
|
✗ |
shash::Any meta_info_hash; |
504 |
|
✗ |
string meta_info; |
505 |
|
|
|
506 |
|
|
// Option parsing |
507 |
|
✗ |
if (args.find('c') != args.end()) |
508 |
|
✗ |
preload_cache = true; |
509 |
|
✗ |
if (args.find('l') != args.end()) { |
510 |
|
|
unsigned log_level = |
511 |
|
✗ |
kLogLevel0 << String2Uint64(*args.find('l')->second); |
512 |
|
✗ |
if (log_level > kLogNone) { |
513 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "invalid log level"); |
514 |
|
✗ |
return 1; |
515 |
|
|
} |
516 |
|
✗ |
SetLogVerbosity(static_cast<LogLevels>(log_level)); |
517 |
|
|
} |
518 |
|
✗ |
stratum0_url = args.find('u')->second; |
519 |
|
✗ |
temp_dir = args.find('x')->second; |
520 |
|
✗ |
if (preload_cache) { |
521 |
|
✗ |
preload_cachedir = new string(*args.find('r')->second); |
522 |
|
|
} else { |
523 |
|
✗ |
spooler_definition_str = *args.find('r')->second; |
524 |
|
|
} |
525 |
|
✗ |
string master_keys = *args.find('k')->second; |
526 |
|
✗ |
if (DirectoryExists(master_keys)) |
527 |
|
✗ |
master_keys = JoinStrings(FindFilesBySuffix(master_keys, ".pub"), ":"); |
528 |
|
✗ |
const string repository_name = *args.find('m')->second; |
529 |
|
✗ |
if (args.find('n') != args.end()) |
530 |
|
✗ |
num_parallel = String2Uint64(*args.find('n')->second); |
531 |
|
✗ |
if (args.find('t') != args.end()) |
532 |
|
✗ |
timeout = String2Uint64(*args.find('t')->second); |
533 |
|
✗ |
if (args.find('a') != args.end()) |
534 |
|
✗ |
retries = String2Uint64(*args.find('a')->second); |
535 |
|
✗ |
if (args.find('d') != args.end()) { |
536 |
|
✗ |
pathfilter = catalog::RelaxedPathFilter::Create(*args.find('d')->second); |
537 |
|
✗ |
assert(pathfilter->IsValid()); |
538 |
|
|
} |
539 |
|
✗ |
if (args.find('p') != args.end()) |
540 |
|
✗ |
pull_history = true; |
541 |
|
✗ |
if (args.find('z') != args.end()) |
542 |
|
✗ |
inspect_existing_catalogs = true; |
543 |
|
✗ |
if (args.find('w') != args.end()) |
544 |
|
✗ |
stratum1_url = args.find('w')->second; |
545 |
|
✗ |
if (args.find('i') != args.end()) |
546 |
|
✗ |
initial_snapshot = true; |
547 |
|
✗ |
shash::Any reflog_hash; |
548 |
|
✗ |
string reflog_chksum_path; |
549 |
|
✗ |
if (args.find('R') != args.end()) { |
550 |
|
✗ |
reflog_chksum_path = *args.find('R')->second; |
551 |
|
✗ |
if (!initial_snapshot) { |
552 |
|
✗ |
if (!manifest::Reflog::ReadChecksum(reflog_chksum_path, &reflog_hash)) { |
553 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Could not read reflog checksum"); |
554 |
|
✗ |
return 1; |
555 |
|
|
} |
556 |
|
|
} |
557 |
|
|
} |
558 |
|
✗ |
if (args.find('Z') != args.end()) { |
559 |
|
✗ |
timestamp_threshold = String2Int64(*args.find('Z')->second); |
560 |
|
|
} |
561 |
|
|
|
562 |
|
✗ |
if (!preload_cache && stratum1_url.Get() == NULL) { |
563 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "need -w <stratum 1 URL>"); |
564 |
|
✗ |
return 1; |
565 |
|
|
} |
566 |
|
|
|
567 |
|
|
typedef std::vector<history::History::Tag> TagVector; |
568 |
|
✗ |
TagVector historic_tags; |
569 |
|
|
|
570 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "CernVM-FS: replicating from %s", |
571 |
|
|
stratum0_url->c_str()); |
572 |
|
|
|
573 |
|
✗ |
int result = 1; |
574 |
|
|
|
575 |
|
|
// Initialization |
576 |
|
✗ |
atomic_init64(&overall_chunks); |
577 |
|
✗ |
atomic_init64(&overall_new); |
578 |
|
✗ |
atomic_init64(&chunk_queue); |
579 |
|
|
|
580 |
|
✗ |
const bool follow_redirects = false; |
581 |
|
✗ |
const unsigned max_pool_handles = num_parallel+1; |
582 |
|
|
const string proxy = |
583 |
|
✗ |
(args.find('@') != args.end()) ? *args.find('@')->second : ""; |
584 |
|
|
|
585 |
|
✗ |
if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) { |
586 |
|
✗ |
return 1; |
587 |
|
|
} |
588 |
|
|
|
589 |
|
✗ |
if (!this->InitSignatureManager(master_keys)) { |
590 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to initialize CVMFS signatures"); |
591 |
|
✗ |
return 1; |
592 |
|
|
} else { |
593 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
594 |
|
|
"CernVM-FS: using public key(s) %s", |
595 |
|
|
JoinStrings(SplitString(master_keys, ':'), ", ").c_str()); |
596 |
|
|
} |
597 |
|
|
|
598 |
|
|
unsigned current_group; |
599 |
|
✗ |
vector< vector<download::DownloadManager::ProxyInfo> > proxies; |
600 |
|
✗ |
download_manager()->GetProxyInfo(&proxies, ¤t_group, NULL); |
601 |
|
✗ |
if (proxies.size() > 0) { |
602 |
|
✗ |
string proxy_str = "\nWarning, replicating through proxies\n"; |
603 |
|
✗ |
proxy_str += " Load-balance groups:\n"; |
604 |
|
✗ |
for (unsigned i = 0; i < proxies.size(); ++i) { |
605 |
|
✗ |
vector<string> urls; |
606 |
|
✗ |
for (unsigned j = 0; j < proxies[i].size(); ++j) { |
607 |
|
✗ |
urls.push_back(proxies[i][j].url); |
608 |
|
|
} |
609 |
|
|
proxy_str += |
610 |
|
✗ |
" [" + StringifyInt(i) + "] " + JoinStrings(urls, ", ") + "\n"; |
611 |
|
|
} |
612 |
|
✗ |
proxy_str += " Active proxy: [" + StringifyInt(current_group) + "] " + |
613 |
|
✗ |
proxies[current_group][0].url; |
614 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "%s\n", proxy_str.c_str()); |
615 |
|
|
} |
616 |
|
✗ |
download_manager()->SetTimeout(timeout, timeout); |
617 |
|
✗ |
download_manager()->SetRetryParameters(retries, 500, 2000); |
618 |
|
✗ |
download_manager()->Spawn(); |
619 |
|
|
|
620 |
|
|
// init the download helper |
621 |
|
|
ObjectFetcher object_fetcher(repository_name, |
622 |
|
✗ |
*stratum0_url, |
623 |
|
✗ |
*temp_dir, |
624 |
|
|
download_manager(), |
625 |
|
✗ |
signature_manager()); |
626 |
|
|
|
627 |
|
|
pthread_t *workers = |
628 |
|
✗ |
reinterpret_cast<pthread_t *>(smalloc(sizeof(pthread_t) * num_parallel)); |
629 |
|
|
|
630 |
|
|
// Check if we have a replica-ready server |
631 |
|
✗ |
const string url_sentinel = *stratum0_url + "/.cvmfs_master_replica"; |
632 |
|
✗ |
download::JobInfo download_sentinel(&url_sentinel, false); |
633 |
|
✗ |
retval = download_manager()->Fetch(&download_sentinel); |
634 |
|
✗ |
if (retval != download::kFailOk) { |
635 |
|
✗ |
if (download_sentinel.http_code() == 404) { |
636 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
637 |
|
|
"This is not a CernVM-FS server for replication"); |
638 |
|
|
} else { |
639 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
640 |
|
|
"Failed to contact stratum 0 server (%d - %s)", |
641 |
|
|
retval, download::Code2Ascii(download_sentinel.error_code())); |
642 |
|
|
} |
643 |
|
✗ |
goto fini; |
644 |
|
|
} |
645 |
|
|
|
646 |
|
✗ |
m_retval = FetchRemoteManifestEnsemble(*stratum0_url, |
647 |
|
|
repository_name, |
648 |
|
|
&ensemble); |
649 |
|
✗ |
if (m_retval != manifest::kFailOk) { |
650 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch manifest (%d - %s)", |
651 |
|
|
m_retval, manifest::Code2Ascii(m_retval)); |
652 |
|
✗ |
goto fini; |
653 |
|
|
} |
654 |
|
|
|
655 |
|
|
// Get meta info |
656 |
|
✗ |
meta_info_hash = ensemble.manifest->meta_info(); |
657 |
|
✗ |
if (!meta_info_hash.IsNull()) { |
658 |
|
✗ |
meta_info_hash = ensemble.manifest->meta_info(); |
659 |
|
✗ |
const string url = *stratum0_url + "/data/" + meta_info_hash.MakePath(); |
660 |
|
✗ |
cvmfs::MemSink metainfo_memsink; |
661 |
|
|
download::JobInfo download_metainfo(&url, true, false, &meta_info_hash, |
662 |
|
✗ |
&metainfo_memsink); |
663 |
|
✗ |
dl_retval = download_manager()->Fetch(&download_metainfo); |
664 |
|
✗ |
if (dl_retval != download::kFailOk) { |
665 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to fetch meta info (%d - %s)", |
666 |
|
|
dl_retval, download::Code2Ascii(dl_retval)); |
667 |
|
✗ |
goto fini; |
668 |
|
|
} |
669 |
|
✗ |
meta_info = string(reinterpret_cast<char*>(metainfo_memsink.data()), |
670 |
|
✗ |
metainfo_memsink.pos()); |
671 |
|
|
} |
672 |
|
|
|
673 |
|
✗ |
is_garbage_collectable = ensemble.manifest->garbage_collectable(); |
674 |
|
|
|
675 |
|
|
// Manifest available, now the spooler's hash algorithm can be determined |
676 |
|
|
// That doesn't actually matter because the replication does no re-hashing |
677 |
|
✗ |
if (!preload_cache) { |
678 |
|
|
const upload::SpoolerDefinition |
679 |
|
|
spooler_definition(spooler_definition_str, |
680 |
|
✗ |
ensemble.manifest->GetHashAlgorithm()); |
681 |
|
✗ |
spooler = upload::Spooler::Construct(spooler_definition); |
682 |
|
✗ |
assert(spooler); |
683 |
|
✗ |
spooler->RegisterListener(&SpoolerOnUpload); |
684 |
|
|
} |
685 |
|
|
|
686 |
|
|
// Open the reflog for modification |
687 |
|
✗ |
if (!preload_cache) { |
688 |
|
✗ |
if (initial_snapshot) { |
689 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Creating an empty Reflog for '%s'", |
690 |
|
|
repository_name.c_str()); |
691 |
|
✗ |
reflog = CreateEmptyReflog(*temp_dir, repository_name); |
692 |
|
✗ |
if (reflog == NULL) { |
693 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to create initial Reflog"); |
694 |
|
✗ |
goto fini; |
695 |
|
|
} |
696 |
|
|
} else { |
697 |
|
|
ObjectFetcher object_fetcher_stratum1(repository_name, |
698 |
|
✗ |
*stratum1_url, |
699 |
|
✗ |
*temp_dir, |
700 |
|
|
download_manager(), |
701 |
|
✗ |
signature_manager()); |
702 |
|
|
|
703 |
|
✗ |
if (!reflog_hash.IsNull()) { |
704 |
|
✗ |
reflog = |
705 |
|
✗ |
FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash); |
706 |
|
✗ |
assert(reflog != NULL); |
707 |
|
|
} else { |
708 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogVerboseMsg, "no reflog (ignoring)"); |
709 |
|
✗ |
if (spooler->Peek(".cvmfsreflog")) { |
710 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
711 |
|
|
"no reflog hash specified but reflog is present"); |
712 |
|
✗ |
goto fini; |
713 |
|
|
} |
714 |
|
|
} |
715 |
|
|
} |
716 |
|
|
|
717 |
|
✗ |
if (reflog != NULL) { |
718 |
|
✗ |
reflog->BeginTransaction(); |
719 |
|
|
// On commit: use manifest's hash algorithm |
720 |
|
✗ |
reflog_hash.algorithm = ensemble.manifest->GetHashAlgorithm(); |
721 |
|
|
} |
722 |
|
|
} |
723 |
|
|
|
724 |
|
|
// Fetch tag list. |
725 |
|
|
// If we are just preloading the cache it is not strictly necessarily to |
726 |
|
|
// download the entire tag list |
727 |
|
|
// TODO(molina): add user option to download tags when preloading the cache |
728 |
|
✗ |
if (!ensemble.manifest->history().IsNull() && !preload_cache) { |
729 |
|
✗ |
shash::Any history_hash = ensemble.manifest->history(); |
730 |
|
✗ |
const string history_url = *stratum0_url + "/data/" |
731 |
|
✗ |
+ history_hash.MakePath(); |
732 |
|
✗ |
const string history_path = *temp_dir + "/" + history_hash.ToString(); |
733 |
|
|
|
734 |
|
✗ |
cvmfs::PathSink pathsink(history_path); |
735 |
|
|
download::JobInfo download_history(&history_url, false, false, |
736 |
|
✗ |
&history_hash, &pathsink); |
737 |
|
✗ |
dl_retval = download_manager()->Fetch(&download_history); |
738 |
|
✗ |
if (dl_retval != download::kFailOk) { |
739 |
|
✗ |
ReportDownloadError(download_history); |
740 |
|
✗ |
goto fini; |
741 |
|
|
} |
742 |
|
✗ |
const std::string history_db_path = history_path + ".uncompressed"; |
743 |
|
✗ |
retval = zlib::DecompressPath2Path(history_path, history_db_path); |
744 |
|
✗ |
assert(retval); |
745 |
|
✗ |
history::History *tag_db = history::SqliteHistory::Open(history_db_path); |
746 |
|
✗ |
if (NULL == tag_db) { |
747 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to open history database (%s)", |
748 |
|
|
history_db_path.c_str()); |
749 |
|
✗ |
unlink(history_db_path.c_str()); |
750 |
|
✗ |
goto fini; |
751 |
|
|
} |
752 |
|
✗ |
retval = tag_db->List(&historic_tags); |
753 |
|
✗ |
delete tag_db; |
754 |
|
✗ |
unlink(history_db_path.c_str()); |
755 |
|
✗ |
if (!retval) { |
756 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "failed to read history database (%s)", |
757 |
|
|
history_db_path.c_str()); |
758 |
|
✗ |
goto fini; |
759 |
|
|
} |
760 |
|
|
|
761 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Found %lu named snapshots", |
762 |
|
|
historic_tags.size()); |
763 |
|
|
// TODO(jblomer): We should repliacte the previous history dbs, too, |
764 |
|
|
// in order to avoid races on fail-over between non-synchronized stratum 1s |
765 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Uploading history database"); |
766 |
|
✗ |
Store(history_path, history_hash); |
767 |
|
✗ |
WaitForStorage(); |
768 |
|
✗ |
unlink(history_path.c_str()); |
769 |
|
✗ |
if (reflog != NULL && !reflog->AddHistory(history_hash)) { |
770 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add history to Reflog."); |
771 |
|
✗ |
goto fini; |
772 |
|
|
} |
773 |
|
|
} |
774 |
|
|
|
775 |
|
|
// Starting threads |
776 |
|
✗ |
MakePipe(pipe_chunks); |
777 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Starting %u workers", num_parallel); |
778 |
|
|
MainWorkerContext mwc; |
779 |
|
✗ |
mwc.download_manager = download_manager(); |
780 |
|
✗ |
for (unsigned i = 0; i < num_parallel; ++i) { |
781 |
|
✗ |
int retval = pthread_create(&workers[i], NULL, MainWorker, |
782 |
|
|
static_cast<void*>(&mwc)); |
783 |
|
✗ |
assert(retval == 0); |
784 |
|
|
} |
785 |
|
|
|
786 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from trunk catalog at /"); |
787 |
|
✗ |
retval = Pull(ensemble.manifest->catalog_hash(), ""); |
788 |
|
✗ |
pull_history = false; |
789 |
|
✗ |
if (!historic_tags.empty()) { |
790 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Checking tagged snapshots..."); |
791 |
|
|
} |
792 |
|
✗ |
for (TagVector::const_iterator i = historic_tags.begin(), |
793 |
|
✗ |
iend = historic_tags.end(); |
794 |
|
✗ |
i != iend; ++i) |
795 |
|
|
{ |
796 |
|
✗ |
if (Peek(i->root_hash)) |
797 |
|
✗ |
continue; |
798 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Replicating from %s repository tag", |
799 |
|
|
i->name.c_str()); |
800 |
|
✗ |
apply_timestamp_threshold = false; |
801 |
|
✗ |
bool retval2 = Pull(i->root_hash, ""); |
802 |
|
✗ |
retval = retval && retval2; |
803 |
|
|
} |
804 |
|
|
|
805 |
|
|
// Stopping threads |
806 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Stopping %u workers", num_parallel); |
807 |
|
✗ |
for (unsigned i = 0; i < num_parallel; ++i) { |
808 |
|
✗ |
ChunkJob terminate_workers; |
809 |
|
✗ |
WritePipe(pipe_chunks[1], &terminate_workers, sizeof(terminate_workers)); |
810 |
|
|
} |
811 |
|
✗ |
for (unsigned i = 0; i < num_parallel; ++i) { |
812 |
|
✗ |
int retval = pthread_join(workers[i], NULL); // NOLINT (false positive) |
813 |
|
✗ |
assert(retval == 0); |
814 |
|
|
} |
815 |
|
✗ |
ClosePipe(pipe_chunks); |
816 |
|
|
|
817 |
|
✗ |
if (!retval) |
818 |
|
✗ |
goto fini; |
819 |
|
|
|
820 |
|
|
// Upload manifest ensemble |
821 |
|
|
{ |
822 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Uploading manifest ensemble"); |
823 |
|
✗ |
WaitForStorage(); |
824 |
|
|
|
825 |
|
✗ |
if (!Peek(ensemble.manifest->certificate())) { |
826 |
|
✗ |
StoreBuffer(ensemble.cert_buf, |
827 |
|
|
ensemble.cert_size, |
828 |
|
✗ |
ensemble.manifest->certificate(), true); |
829 |
|
|
} |
830 |
|
✗ |
if (reflog != NULL && |
831 |
|
✗ |
!reflog->AddCertificate(ensemble.manifest->certificate())) { |
832 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add certificate to Reflog."); |
833 |
|
✗ |
goto fini; |
834 |
|
|
} |
835 |
|
✗ |
if (!meta_info_hash.IsNull()) { |
836 |
|
|
const unsigned char *info = reinterpret_cast<const unsigned char *>( |
837 |
|
✗ |
meta_info.data()); |
838 |
|
✗ |
StoreBuffer(info, meta_info.size(), meta_info_hash, true); |
839 |
|
✗ |
if (reflog != NULL && !reflog->AddMetainfo(meta_info_hash)) { |
840 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to add metainfo to Reflog."); |
841 |
|
✗ |
goto fini; |
842 |
|
|
} |
843 |
|
|
} |
844 |
|
|
|
845 |
|
|
// Create alternative bootstrapping symlinks for VOMS secured repos |
846 |
|
✗ |
if (ensemble.manifest->has_alt_catalog_path()) { |
847 |
|
|
const bool success = |
848 |
|
✗ |
spooler->PlaceBootstrappingShortcut(ensemble.manifest->certificate()) && |
849 |
|
✗ |
spooler->PlaceBootstrappingShortcut(ensemble.manifest->catalog_hash()) |
850 |
|
✗ |
&& (ensemble.manifest->history().IsNull() || |
851 |
|
✗ |
spooler->PlaceBootstrappingShortcut(ensemble.manifest->history())) |
852 |
|
✗ |
&& (meta_info_hash.IsNull() || |
853 |
|
✗ |
spooler->PlaceBootstrappingShortcut(meta_info_hash)); |
854 |
|
|
|
855 |
|
✗ |
if (!success) { |
856 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
857 |
|
|
"failed to place root catalog bootstrapping symlinks"); |
858 |
|
✗ |
goto fini; |
859 |
|
|
} |
860 |
|
|
} |
861 |
|
|
|
862 |
|
|
// upload Reflog database |
863 |
|
✗ |
if (!preload_cache && reflog != NULL) { |
864 |
|
✗ |
reflog->CommitTransaction(); |
865 |
|
✗ |
reflog->DropDatabaseFileOwnership(); |
866 |
|
✗ |
string reflog_path = reflog->database_file(); |
867 |
|
✗ |
delete reflog; |
868 |
|
✗ |
manifest::Reflog::HashDatabase(reflog_path, &reflog_hash); |
869 |
|
✗ |
WaitForStorage(); // Reduce the duration of reflog /wo checksum |
870 |
|
✗ |
spooler->UploadReflog(reflog_path); |
871 |
|
✗ |
spooler->WaitForUpload(); |
872 |
|
✗ |
unlink(reflog_path.c_str()); |
873 |
|
✗ |
if (spooler->GetNumberOfErrors()) { |
874 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, "Failed to upload Reflog (errors: %d)", |
875 |
|
|
spooler->GetNumberOfErrors()); |
876 |
|
✗ |
goto fini; |
877 |
|
|
} |
878 |
|
✗ |
assert(!reflog_chksum_path.empty()); |
879 |
|
✗ |
manifest::Reflog::WriteChecksum(reflog_chksum_path, reflog_hash); |
880 |
|
|
} |
881 |
|
|
|
882 |
|
✗ |
if (preload_cache) { |
883 |
|
|
bool retval = |
884 |
|
✗ |
ensemble.manifest->ExportBreadcrumb(*preload_cachedir, 0660); |
885 |
|
✗ |
assert(retval); |
886 |
|
|
} else { |
887 |
|
|
// pkcs#7 structure contains content + certificate + signature |
888 |
|
|
// So there is no race with whitelist and pkcs7 signature being out of |
889 |
|
|
// sync |
890 |
|
✗ |
if (ensemble.whitelist_pkcs7_buf) { |
891 |
|
✗ |
StoreBuffer(ensemble.whitelist_pkcs7_buf, ensemble.whitelist_pkcs7_size, |
892 |
|
|
".cvmfswhitelist.pkcs7", false); |
893 |
|
|
} |
894 |
|
✗ |
StoreBuffer(ensemble.whitelist_buf, ensemble.whitelist_size, |
895 |
|
|
".cvmfswhitelist", false); |
896 |
|
✗ |
StoreBuffer(ensemble.raw_manifest_buf, ensemble.raw_manifest_size, |
897 |
|
|
".cvmfspublished", false); |
898 |
|
|
} |
899 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Serving revision %" PRIu64, |
900 |
|
|
ensemble.manifest->revision()); |
901 |
|
|
} |
902 |
|
|
|
903 |
|
✗ |
WaitForStorage(); |
904 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, "Fetched %" PRId64 " new chunks out of %" |
905 |
|
|
PRId64 " processed chunks", |
906 |
|
|
atomic_read64(&overall_new), atomic_read64(&overall_chunks)); |
907 |
|
✗ |
result = 0; |
908 |
|
|
|
909 |
|
✗ |
fini: |
910 |
|
✗ |
if (fd_lockfile >= 0) |
911 |
|
✗ |
UnlockFile(fd_lockfile); |
912 |
|
✗ |
free(workers); |
913 |
|
✗ |
delete spooler; |
914 |
|
✗ |
delete pathfilter; |
915 |
|
✗ |
return result; |
916 |
|
|
} |
917 |
|
|
|
918 |
|
|
} // namespace swissknife |
919 |
|
|
|