9 #define _FILE_OFFSET_BITS 64
11 #define __STDC_FORMAT_MACROS
13 #include "cvmfs_config.h"
49 namespace swissknife {
63 , hash_algorithm(shash::
kAny)
69 , compression_alg(compression_alg)
132 static bool Peek(
const string &remote_path) {
143 const int http_code = download_job.
http_code();
144 const std::string url = *download_job.
url();
149 switch (error_code) {
153 "please check the network connection");
158 "please check the stratum 0 health", http_code);
163 "please check the stratum 0 health");
170 "please check the network connection");
180 const string &local_path,
181 const string &remote_path,
182 const bool compressed_src)
185 if (!compressed_src) {
186 int retval = rename(local_path.c_str(), remote_path.c_str());
189 remote_path.c_str());
197 remote_path.c_str());
202 remote_path.c_str());
205 retval = rename(tmp_dest.c_str(), remote_path.c_str());
207 unlink(local_path.c_str());
210 spooler->Upload(local_path, remote_path);
215 const string &local_path,
217 const bool compressed_src =
true)
224 const std::string &dest_path,
const bool compress) {
237 Store(tmp_file, dest_path,
true);
241 const shash::Any &dest_hash,
const bool compress) {
265 if (next_chunk.IsTerminateJob())
273 if (!
Peek(chunk_hash)) {
280 &chunk_hash, &filesink);
283 download_manager->
Fetch(&download_chunk);
289 Store(tmp_file, chunk_hash,
302 const std::string &path) {
308 if (previous_catalog.
IsNull()) {
312 previous_catalog.
ToString().c_str());
313 bool retval = Pull(previous_catalog, path);
323 for (catalog::Catalog::NestedCatalogList::const_iterator i =
324 nested_catalogs.begin(), iEnd = nested_catalogs.end();
328 i->mountpoint.c_str());
329 bool retval = Pull(i->hash, i->mountpoint.ToString());
339 const std::string &path) {
345 if (
Peek(catalog_hash)) {
352 path,
MakePath(catalog_hash), catalog_hash);
353 if (catalog == NULL) {
358 bool retval = PullRecursion(catalog, path);
372 " the path specification", path.c_str());
384 string file_catalog_vanilla;
393 &file_catalog_vanilla);
394 if (!fcatalog_vanilla) {
396 unlink(file_catalog.c_str());
402 &catalog_hash, &filesink);
403 dl_retval = download_manager()->Fetch(&download_catalog);
404 fclose(fcatalog_vanilla);
408 "probably sweeped by garbage collection",
419 file_catalog_vanilla.c_str(), catalog_hash.
ToString().c_str());
422 if (path.empty() &&
reflog != NULL) {
430 if (catalog == NULL) {
441 " Pruning at root catalog from %s due to threshold at %s",
451 " Processing chunks [%" PRIu64
" registered chunks]: ",
458 while (catalog->
AllChunksNext(&chunk_hash, &compression_alg)) {
459 ChunkJob next_chunk(chunk_hash, compression_alg);
468 "%" PRId64
" unique chunks",
472 retval = PullRecursion(catalog, path);
475 unlink(file_catalog.c_str());
479 Store(file_catalog_vanilla, catalog_hash);
484 unlink(file_catalog.c_str());
485 unlink(file_catalog_vanilla.c_str());
489 unlink(file_catalog.c_str());
490 unlink(file_catalog_vanilla.c_str());
499 unsigned timeout = 60;
500 int fd_lockfile = -1;
501 string spooler_definition_str;
507 if (args.find(
'c') != args.end())
509 if (args.find(
'l') != args.end()) {
523 spooler_definition_str = *args.find(
'r')->second;
525 string master_keys = *args.find(
'k')->second;
528 const string repository_name = *args.find(
'm')->second;
529 string trusted_certs;
530 if (args.find(
'y') != args.end())
531 trusted_certs = *args.find(
'y')->second;
532 if (args.find(
'n') != args.end())
534 if (args.find(
't') != args.end())
536 if (args.find(
'a') != args.end())
538 if (args.find(
'd') != args.end()) {
542 if (args.find(
'p') != args.end())
544 if (args.find(
'z') != args.end())
546 if (args.find(
'w') != args.end())
548 if (args.find(
'i') != args.end())
551 string reflog_chksum_path;
552 if (args.find(
'R') != args.end()) {
553 reflog_chksum_path = *args.find(
'R')->second;
561 if (args.find(
'Z') != args.end()) {
570 typedef std::vector<history::History::Tag> TagVector;
571 TagVector historic_tags;
583 const bool follow_redirects =
false;
586 (args.find(
'@') != args.end()) ? *args.find(
'@')->second :
"";
588 if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
592 if (!this->InitVerifyingSignatureManager(master_keys, trusted_certs)) {
597 "CernVM-FS: using public key(s) %s",
599 if (!trusted_certs.empty()) {
601 "CernVM-FS: using trusted certificates in %s",
606 unsigned current_group;
607 vector< vector<download::DownloadManager::ProxyInfo> > proxies;
608 download_manager()->GetProxyInfo(&proxies, ¤t_group, NULL);
609 if (proxies.size() > 0) {
610 string proxy_str =
"\nWarning, replicating through proxies\n";
611 proxy_str +=
" Load-balance groups:\n";
612 for (
unsigned i = 0; i < proxies.size(); ++i) {
614 for (
unsigned j = 0; j < proxies[i].size(); ++j) {
615 urls.push_back(proxies[i][j].url);
620 proxy_str +=
" Active proxy: [" +
StringifyInt(current_group) +
"] " +
621 proxies[current_group][0].url;
624 download_manager()->SetTimeout(timeout, timeout);
625 download_manager()->SetRetryParameters(
retries, 500, 2000);
626 download_manager()->Spawn();
633 signature_manager());
636 reinterpret_cast<pthread_t *
>(smalloc(
sizeof(pthread_t) *
num_parallel));
639 const string url_sentinel = *
stratum0_url +
"/.cvmfs_master_replica";
641 retval = download_manager()->Fetch(&download_sentinel);
643 if (download_sentinel.
http_code() == 404) {
645 "This is not a CernVM-FS server for replication");
648 "Failed to contact stratum 0 server (%d - %s)",
665 if (!meta_info_hash.
IsNull()) {
671 dl_retval = download_manager()->Fetch(&download_metainfo);
677 meta_info = string(reinterpret_cast<char*>(metainfo_memsink.
data()),
678 metainfo_memsink.
pos());
687 spooler_definition(spooler_definition_str,
689 spooler = upload::Spooler::Construct(spooler_definition);
698 repository_name.c_str());
700 if (reflog == NULL) {
709 signature_manager());
711 if (!reflog_hash.
IsNull()) {
713 FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash);
717 if (
spooler->Peek(
".cvmfsreflog")) {
719 "no reflog hash specified but reflog is present");
744 &history_hash, &pathsink);
745 dl_retval = download_manager()->Fetch(&download_history);
750 const std::string history_db_path = history_path +
".uncompressed";
754 if (NULL == tag_db) {
756 history_db_path.c_str());
757 unlink(history_db_path.c_str());
760 retval = tag_db->
List(&historic_tags);
762 unlink(history_db_path.c_str());
765 history_db_path.c_str());
770 historic_tags.size());
774 Store(history_path, history_hash);
776 unlink(history_path.c_str());
789 int retval = pthread_create(&workers[i], NULL,
MainWorker,
790 static_cast<void*>(&mwc));
797 if (!historic_tags.empty()) {
800 for (TagVector::const_iterator i = historic_tags.begin(),
801 iend = historic_tags.end();
804 if (
Peek(i->root_hash))
809 bool retval2 = Pull(i->root_hash,
"");
810 retval = retval && retval2;
816 ChunkJob terminate_workers;
820 int retval = pthread_join(workers[i], NULL);
843 if (!meta_info_hash.
IsNull()) {
844 const unsigned char *info =
reinterpret_cast<const unsigned char *
>(
846 StoreBuffer(info, meta_info.size(), meta_info_hash,
true);
860 && (meta_info_hash.
IsNull() ||
861 spooler->PlaceBootstrappingShortcut(meta_info_hash));
865 "failed to place root catalog bootstrapping symlinks");
878 spooler->UploadReflog(reflog_path);
880 unlink(reflog_path.c_str());
881 if (
spooler->GetNumberOfErrors()) {
886 assert(!reflog_chksum_path.empty());
900 ".cvmfswhitelist.pkcs7",
false);
903 ".cvmfswhitelist",
false);
905 ".cvmfspublished",
false);
913 PRId64
" processed chunks",
918 if (fd_lockfile >= 0)
int return_code
the return value of the spooler operation
#define LogCvmfs(source, mask,...)
void SetLogVerbosity(const LogLevels max_level)
std::string database_file() const
bool is_garbage_collectable
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
SharedPtr< string > temp_dir
bool AddHistory(const shash::Any &history)
unsigned char * raw_manifest_buf
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
static SqliteHistory * Open(const std::string &file_name)
const shash::Algorithms hash_algorithm
bool inspect_existing_catalogs
string * preload_cachedir
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
const std::string * url() const
pthread_mutex_t lock_pipe
const shash::Suffix suffix
shash::Any GetPreviousRevision() const
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
unsigned whitelist_pkcs7_size
bool IsTerminateJob() const
upload::Spooler * spooler
string StringifyTime(const time_t seconds, const bool utc)
void MakePipe(int pipe_fd[2])
static void WaitForStorage()
bool apply_timestamp_threshold
bool AddCatalog(const shash::Any &catalog)
unsigned char digest[digest_size_]
uint64_t revision() const
static void SpoolerOnUpload(const upload::SpoolerResult &result)
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
uint64_t GetLastModified() const
int64_t String2Int64(const string &value)
const char * Code2Ascii(const Failures error)
unsigned GetDigestSize() const
bool ExportBreadcrumb(const std::string &directory, const int mode) const
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
unsigned raw_manifest_size
bool AddMetainfo(const shash::Any &metainfo)
vector< string > SplitString(const string &str, char delim)
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
bool DecompressPath2File(const string &src, FILE *fdest)
shash::Any certificate() const
bool IsProxyTransferError(const Failures error)
static void ReportDownloadError(const download::JobInfo &download_job)
catalog::RelaxedPathFilter * pathfilter
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string &dest_path, const bool compress)
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
shash::Any catalog_hash() const
manifest::Reflog * reflog
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
bool garbage_collectable() const
uint64_t timestamp_threshold
Failures error_code() const
const zlib::Algorithms compression_alg
void DropDatabaseFileOwnership()
bool DirectoryExists(const std::string &path)
atomic_int64 overall_chunks
bool AddCertificate(const shash::Any &certificate)
shash::Algorithms GetHashAlgorithm() const
std::string MakePathWithoutSuffix() const
std::vector< NestedCatalog > NestedCatalogList
const NestedCatalogList ListOwnNestedCatalogs() const
uint64_t String2Uint64(const string &value)
std::map< char, SharedPtr< std::string > > ArgumentList
Failures Fetch(JobInfo *info)
SharedPtr< string > stratum1_url
shash::Any history() const
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
uint64_t GetNumChunks() const
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
const unsigned kMaxDigestSize
bool DecompressPath2Path(const string &src, const string &dest)
std::string meta_info() const
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
std::string MakePath() const
static bool WriteChecksum(const std::string &path, const shash::Any &value)
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
void ClosePipe(int pipe_fd[2])
SharedPtr< string > stratum0_url
shash::Any meta_info() const
void UnlockFile(const int filedes)
const char * Code2Ascii(const Failures error)