9 #define _FILE_OFFSET_BITS 64
11 #define __STDC_FORMAT_MACROS
49 namespace swissknife {
63 , hash_algorithm(shash::
kAny)
69 , compression_alg(compression_alg) {
127 static bool Peek(
const string &remote_path) {
137 const int http_code = download_job.
http_code();
138 const std::string url = *download_job.
url();
143 switch (error_code) {
147 "DNS lookup for Stratum 0 failed - "
148 "please check the network connection");
153 "unexpected HTTP error code %d - "
154 "please check the stratum 0 health",
160 "downloaded corrupted data - "
161 "please check the stratum 0 health");
168 "couldn't reach Stratum 0 - "
169 "please check the network connection");
172 "unexpected error - feel free to file "
179 static void Store(
const string &local_path,
180 const string &remote_path,
181 const bool compressed_src) {
183 if (!compressed_src) {
184 int retval = rename(local_path.c_str(), remote_path.c_str());
187 remote_path.c_str());
195 remote_path.c_str());
200 remote_path.c_str());
203 retval = rename(tmp_dest.c_str(), remote_path.c_str());
205 unlink(local_path.c_str());
208 spooler->Upload(local_path, remote_path);
212 static void Store(
const string &local_path,
214 const bool compressed_src =
true) {
220 const std::string &dest_path,
const bool compress) {
233 Store(tmp_file, dest_path,
true);
237 const shash::Any &dest_hash,
const bool compress) {
262 if (next_chunk.IsTerminateJob())
270 if (!
Peek(chunk_hash)) {
285 Store(tmp_file, chunk_hash,
298 const std::string &path) {
304 if (previous_catalog.
IsNull()) {
308 previous_catalog.
ToString().c_str());
309 bool retval = Pull(previous_catalog, path);
319 for (catalog::Catalog::NestedCatalogList::const_iterator
320 i = nested_catalogs.begin(),
321 iEnd = nested_catalogs.end();
324 i->mountpoint.c_str());
325 bool retval = Pull(i->hash, i->mountpoint.ToString());
335 const std::string &path) {
341 if (
Peek(catalog_hash)) {
348 path,
MakePath(catalog_hash), catalog_hash);
349 if (catalog == NULL) {
354 bool retval = PullRecursion(catalog, path);
367 " Catalog in '%s' does not match"
368 " the path specification",
381 string file_catalog_vanilla;
390 &file_catalog_vanilla);
391 if (!fcatalog_vanilla) {
393 unlink(file_catalog.c_str());
400 dl_retval = download_manager()->Fetch(&download_catalog);
401 fclose(fcatalog_vanilla);
405 "skipping missing root catalog %s - "
406 "probably sweeped by garbage collection",
417 file_catalog_vanilla.c_str(), catalog_hash.
ToString().c_str());
420 if (path.empty() &&
reflog != NULL) {
428 if (catalog == NULL) {
438 " Pruning at root catalog from %s due to threshold at %s",
448 " Processing chunks [%" PRIu64
" registered chunks]: ",
455 while (catalog->
AllChunksNext(&chunk_hash, &compression_alg)) {
456 ChunkJob next_chunk(chunk_hash, compression_alg);
465 " fetched %" PRId64
" new chunks out of "
466 "%" PRId64
" unique chunks",
470 retval = PullRecursion(catalog, path);
473 unlink(file_catalog.c_str());
477 Store(file_catalog_vanilla, catalog_hash);
482 unlink(file_catalog.c_str());
483 unlink(file_catalog_vanilla.c_str());
487 unlink(file_catalog.c_str());
488 unlink(file_catalog_vanilla.c_str());
497 unsigned timeout = 60;
498 int fd_lockfile = -1;
499 string spooler_definition_str;
505 if (args.find(
'c') != args.end())
507 if (args.find(
'l') != args.end()) {
520 spooler_definition_str = *args.find(
'r')->second;
522 string master_keys = *args.find(
'k')->second;
525 const string repository_name = *args.find(
'm')->second;
526 if (args.find(
'n') != args.end())
528 if (args.find(
't') != args.end())
530 if (args.find(
'a') != args.end())
532 if (args.find(
'd') != args.end()) {
536 if (args.find(
'p') != args.end())
538 if (args.find(
'z') != args.end())
540 if (args.find(
'w') != args.end())
542 if (args.find(
'i') != args.end())
545 string reflog_chksum_path;
546 if (args.find(
'R') != args.end()) {
547 reflog_chksum_path = *args.find(
'R')->second;
555 if (args.find(
'Z') != args.end()) {
564 typedef std::vector<history::History::Tag> TagVector;
565 TagVector historic_tags;
577 const bool follow_redirects =
false;
579 const string proxy = (args.find(
'@') != args.end()) ? *args.find(
'@')->second
582 if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
586 if (!this->InitSignatureManager(master_keys)) {
594 unsigned current_group;
595 vector<vector<download::DownloadManager::ProxyInfo> > proxies;
596 download_manager()->GetProxyInfo(&proxies, ¤t_group, NULL);
597 if (proxies.size() > 0) {
598 string proxy_str =
"\nWarning, replicating through proxies\n";
599 proxy_str +=
" Load-balance groups:\n";
600 for (
unsigned i = 0; i < proxies.size(); ++i) {
602 for (
unsigned j = 0; j < proxies[i].size(); ++j) {
603 urls.push_back(proxies[i][j].url);
608 proxy_str +=
" Active proxy: [" +
StringifyInt(current_group) +
"] "
609 + proxies[current_group][0].url;
612 download_manager()->SetTimeout(timeout, timeout);
613 download_manager()->SetRetryParameters(
retries, 500, 2000);
614 download_manager()->Spawn();
621 signature_manager());
623 pthread_t *workers =
reinterpret_cast<pthread_t *
>(
627 const string url_sentinel = *
stratum0_url +
"/.cvmfs_master_replica";
629 retval = download_manager()->Fetch(&download_sentinel);
631 if (download_sentinel.
http_code() == 404) {
633 "This is not a CernVM-FS server for replication");
636 "Failed to contact stratum 0 server (%d - %s)", retval,
642 m_retval = FetchRemoteManifestEnsemble(
652 if (!meta_info_hash.
IsNull()) {
658 dl_retval = download_manager()->Fetch(&download_metainfo);
664 meta_info = string(reinterpret_cast<char *>(metainfo_memsink.
data()),
665 metainfo_memsink.
pos());
675 spooler = upload::Spooler::Construct(spooler_definition);
684 repository_name.c_str());
686 if (reflog == NULL) {
695 signature_manager());
697 if (!reflog_hash.
IsNull()) {
698 reflog = FetchReflog(&object_fetcher_stratum1, repository_name,
703 if (
spooler->Peek(
".cvmfsreflog")) {
705 "no reflog hash specified but reflog is present");
730 &history_hash, &pathsink);
731 dl_retval = download_manager()->Fetch(&download_history);
736 const std::string history_db_path = history_path +
".uncompressed";
740 if (NULL == tag_db) {
742 history_db_path.c_str());
743 unlink(history_db_path.c_str());
746 retval = tag_db->
List(&historic_tags);
748 unlink(history_db_path.c_str());
751 history_db_path.c_str());
756 historic_tags.size());
760 Store(history_path, history_hash);
762 unlink(history_path.c_str());
775 int retval = pthread_create(&workers[i], NULL,
MainWorker,
776 static_cast<void *>(&mwc));
783 if (!historic_tags.empty()) {
786 for (TagVector::const_iterator i = historic_tags.begin(),
787 iend = historic_tags.end();
790 if (
Peek(i->root_hash))
795 bool retval2 = Pull(i->root_hash,
"");
796 retval = retval && retval2;
802 ChunkJob terminate_workers;
806 int retval = pthread_join(workers[i], NULL);
828 if (!meta_info_hash.
IsNull()) {
829 const unsigned char *info =
reinterpret_cast<const unsigned char *
>(
831 StoreBuffer(info, meta_info.size(), meta_info_hash,
true);
840 const bool success =
spooler->PlaceBootstrappingShortcut(
842 &&
spooler->PlaceBootstrappingShortcut(
845 ||
spooler->PlaceBootstrappingShortcut(
847 && (meta_info_hash.
IsNull()
848 ||
spooler->PlaceBootstrappingShortcut(
853 "failed to place root catalog bootstrapping symlinks");
866 spooler->UploadReflog(reflog_path);
868 unlink(reflog_path.c_str());
869 if (
spooler->GetNumberOfErrors()) {
874 assert(!reflog_chksum_path.empty());
888 ".cvmfswhitelist.pkcs7",
false);
891 ".cvmfswhitelist",
false);
893 ".cvmfspublished",
false);
901 "Fetched %" PRId64
" new chunks out of %" PRId64
" processed chunks",
906 if (fd_lockfile >= 0)
int return_code
the return value of the spooler operation
void SetLogVerbosity(const LogLevels max_level)
std::string database_file() const
bool is_garbage_collectable
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
SharedPtr< string > temp_dir
bool AddHistory(const shash::Any &history)
unsigned char * raw_manifest_buf
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
static SqliteHistory * Open(const std::string &file_name)
const shash::Algorithms hash_algorithm
bool inspect_existing_catalogs
string * preload_cachedir
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
const std::string * url() const
pthread_mutex_t lock_pipe
const shash::Suffix suffix
shash::Any GetPreviousRevision() const
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
unsigned whitelist_pkcs7_size
bool IsTerminateJob() const
upload::Spooler * spooler
string StringifyTime(const time_t seconds, const bool utc)
void MakePipe(int pipe_fd[2])
static void WaitForStorage()
bool apply_timestamp_threshold
bool AddCatalog(const shash::Any &catalog)
unsigned char digest[digest_size_]
uint64_t revision() const
static void SpoolerOnUpload(const upload::SpoolerResult &result)
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
uint64_t GetLastModified() const
int64_t String2Int64(const string &value)
const char * Code2Ascii(const Failures error)
unsigned GetDigestSize() const
bool ExportBreadcrumb(const std::string &directory, const int mode) const
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
unsigned raw_manifest_size
bool AddMetainfo(const shash::Any &metainfo)
vector< string > SplitString(const string &str, char delim)
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
bool DecompressPath2File(const string &src, FILE *fdest)
shash::Any certificate() const
bool IsProxyTransferError(const Failures error)
static void ReportDownloadError(const download::JobInfo &download_job)
catalog::RelaxedPathFilter * pathfilter
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string &dest_path, const bool compress)
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
shash::Any catalog_hash() const
manifest::Reflog * reflog
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
bool garbage_collectable() const
uint64_t timestamp_threshold
Failures error_code() const
const zlib::Algorithms compression_alg
void DropDatabaseFileOwnership()
bool DirectoryExists(const std::string &path)
atomic_int64 overall_chunks
bool AddCertificate(const shash::Any &certificate)
shash::Algorithms GetHashAlgorithm() const
std::string MakePathWithoutSuffix() const
std::vector< NestedCatalog > NestedCatalogList
const NestedCatalogList ListOwnNestedCatalogs() const
uint64_t String2Uint64(const string &value)
std::map< char, SharedPtr< std::string > > ArgumentList
Failures Fetch(JobInfo *info)
SharedPtr< string > stratum1_url
shash::Any history() const
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
uint64_t GetNumChunks() const
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
const unsigned kMaxDigestSize
bool DecompressPath2Path(const string &src, const string &dest)
std::string meta_info() const
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
std::string MakePath() const
static bool WriteChecksum(const std::string &path, const shash::Any &value)
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
void ClosePipe(int pipe_fd[2])
SharedPtr< string > stratum0_url
shash::Any meta_info() const
void UnlockFile(const int filedes)
const char * Code2Ascii(const Failures error)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)