9 #define _FILE_OFFSET_BITS 64
11 #define __STDC_FORMAT_MACROS
49 namespace swissknife {
63 , hash_algorithm(shash::
kAny)
69 , compression_alg(compression_alg)
132 static bool Peek(
const string &remote_path) {
143 const int http_code = download_job.
http_code();
144 const std::string url = *download_job.
url();
149 switch (error_code) {
153 "please check the network connection");
158 "please check the stratum 0 health", http_code);
163 "please check the stratum 0 health");
170 "please check the network connection");
180 const string &local_path,
181 const string &remote_path,
182 const bool compressed_src)
185 if (!compressed_src) {
186 int retval = rename(local_path.c_str(), remote_path.c_str());
189 remote_path.c_str());
197 remote_path.c_str());
202 remote_path.c_str());
205 retval = rename(tmp_dest.c_str(), remote_path.c_str());
207 unlink(local_path.c_str());
210 spooler->Upload(local_path, remote_path);
215 const string &local_path,
217 const bool compressed_src =
true)
224 const std::string &dest_path,
const bool compress) {
237 Store(tmp_file, dest_path,
true);
241 const shash::Any &dest_hash,
const bool compress) {
265 if (next_chunk.IsTerminateJob())
273 if (!
Peek(chunk_hash)) {
280 &chunk_hash, &filesink);
283 download_manager->
Fetch(&download_chunk);
289 Store(tmp_file, chunk_hash,
302 const std::string &path) {
308 if (previous_catalog.
IsNull()) {
312 previous_catalog.
ToString().c_str());
313 bool retval = Pull(previous_catalog, path);
323 for (catalog::Catalog::NestedCatalogList::const_iterator i =
324 nested_catalogs.begin(), iEnd = nested_catalogs.end();
328 i->mountpoint.c_str());
329 bool retval = Pull(i->hash, i->mountpoint.ToString());
339 const std::string &path) {
345 if (
Peek(catalog_hash)) {
352 path,
MakePath(catalog_hash), catalog_hash);
353 if (catalog == NULL) {
358 bool retval = PullRecursion(catalog, path);
372 " the path specification", path.c_str());
384 string file_catalog_vanilla;
393 &file_catalog_vanilla);
394 if (!fcatalog_vanilla) {
396 unlink(file_catalog.c_str());
402 &catalog_hash, &filesink);
403 dl_retval = download_manager()->Fetch(&download_catalog);
404 fclose(fcatalog_vanilla);
408 "probably sweeped by garbage collection",
419 file_catalog_vanilla.c_str(), catalog_hash.
ToString().c_str());
422 if (path.empty() &&
reflog != NULL) {
430 if (catalog == NULL) {
441 " Pruning at root catalog from %s due to threshold at %s",
451 " Processing chunks [%" PRIu64
" registered chunks]: ",
458 while (catalog->
AllChunksNext(&chunk_hash, &compression_alg)) {
459 ChunkJob next_chunk(chunk_hash, compression_alg);
468 "%" PRId64
" unique chunks",
472 retval = PullRecursion(catalog, path);
475 unlink(file_catalog.c_str());
479 Store(file_catalog_vanilla, catalog_hash);
484 unlink(file_catalog.c_str());
485 unlink(file_catalog_vanilla.c_str());
489 unlink(file_catalog.c_str());
490 unlink(file_catalog_vanilla.c_str());
499 unsigned timeout = 60;
500 int fd_lockfile = -1;
501 string spooler_definition_str;
507 if (args.find(
'c') != args.end())
509 if (args.find(
'l') != args.end()) {
523 spooler_definition_str = *args.find(
'r')->second;
525 string master_keys = *args.find(
'k')->second;
528 const string repository_name = *args.find(
'm')->second;
529 if (args.find(
'n') != args.end())
531 if (args.find(
't') != args.end())
533 if (args.find(
'a') != args.end())
535 if (args.find(
'd') != args.end()) {
539 if (args.find(
'p') != args.end())
541 if (args.find(
'z') != args.end())
543 if (args.find(
'w') != args.end())
545 if (args.find(
'i') != args.end())
548 string reflog_chksum_path;
549 if (args.find(
'R') != args.end()) {
550 reflog_chksum_path = *args.find(
'R')->second;
558 if (args.find(
'Z') != args.end()) {
567 typedef std::vector<history::History::Tag> TagVector;
568 TagVector historic_tags;
580 const bool follow_redirects =
false;
583 (args.find(
'@') != args.end()) ? *args.find(
'@')->second :
"";
585 if (!this->InitDownloadManager(follow_redirects, proxy, max_pool_handles)) {
589 if (!this->InitSignatureManager(master_keys)) {
594 "CernVM-FS: using public key(s) %s",
598 unsigned current_group;
599 vector< vector<download::DownloadManager::ProxyInfo> > proxies;
600 download_manager()->GetProxyInfo(&proxies, ¤t_group, NULL);
601 if (proxies.size() > 0) {
602 string proxy_str =
"\nWarning, replicating through proxies\n";
603 proxy_str +=
" Load-balance groups:\n";
604 for (
unsigned i = 0; i < proxies.size(); ++i) {
606 for (
unsigned j = 0; j < proxies[i].size(); ++j) {
607 urls.push_back(proxies[i][j].url);
612 proxy_str +=
" Active proxy: [" +
StringifyInt(current_group) +
"] " +
613 proxies[current_group][0].url;
616 download_manager()->SetTimeout(timeout, timeout);
617 download_manager()->SetRetryParameters(
retries, 500, 2000);
618 download_manager()->Spawn();
625 signature_manager());
628 reinterpret_cast<pthread_t *
>(smalloc(
sizeof(pthread_t) *
num_parallel));
631 const string url_sentinel = *
stratum0_url +
"/.cvmfs_master_replica";
633 retval = download_manager()->Fetch(&download_sentinel);
635 if (download_sentinel.
http_code() == 404) {
637 "This is not a CernVM-FS server for replication");
640 "Failed to contact stratum 0 server (%d - %s)",
657 if (!meta_info_hash.
IsNull()) {
663 dl_retval = download_manager()->Fetch(&download_metainfo);
669 meta_info = string(reinterpret_cast<char*>(metainfo_memsink.
data()),
670 metainfo_memsink.
pos());
679 spooler_definition(spooler_definition_str,
681 spooler = upload::Spooler::Construct(spooler_definition);
690 repository_name.c_str());
692 if (reflog == NULL) {
701 signature_manager());
703 if (!reflog_hash.
IsNull()) {
705 FetchReflog(&object_fetcher_stratum1, repository_name, reflog_hash);
709 if (
spooler->Peek(
".cvmfsreflog")) {
711 "no reflog hash specified but reflog is present");
736 &history_hash, &pathsink);
737 dl_retval = download_manager()->Fetch(&download_history);
742 const std::string history_db_path = history_path +
".uncompressed";
746 if (NULL == tag_db) {
748 history_db_path.c_str());
749 unlink(history_db_path.c_str());
752 retval = tag_db->
List(&historic_tags);
754 unlink(history_db_path.c_str());
757 history_db_path.c_str());
762 historic_tags.size());
766 Store(history_path, history_hash);
768 unlink(history_path.c_str());
781 int retval = pthread_create(&workers[i], NULL,
MainWorker,
782 static_cast<void*>(&mwc));
789 if (!historic_tags.empty()) {
792 for (TagVector::const_iterator i = historic_tags.begin(),
793 iend = historic_tags.end();
796 if (
Peek(i->root_hash))
801 bool retval2 = Pull(i->root_hash,
"");
802 retval = retval && retval2;
808 ChunkJob terminate_workers;
812 int retval = pthread_join(workers[i], NULL);
835 if (!meta_info_hash.
IsNull()) {
836 const unsigned char *info =
reinterpret_cast<const unsigned char *
>(
838 StoreBuffer(info, meta_info.size(), meta_info_hash,
true);
852 && (meta_info_hash.
IsNull() ||
853 spooler->PlaceBootstrappingShortcut(meta_info_hash));
857 "failed to place root catalog bootstrapping symlinks");
870 spooler->UploadReflog(reflog_path);
872 unlink(reflog_path.c_str());
873 if (
spooler->GetNumberOfErrors()) {
878 assert(!reflog_chksum_path.empty());
892 ".cvmfswhitelist.pkcs7",
false);
895 ".cvmfswhitelist",
false);
897 ".cvmfspublished",
false);
905 PRId64
" processed chunks",
910 if (fd_lockfile >= 0)
int return_code
the return value of the spooler operation
void SetLogVerbosity(const LogLevels max_level)
std::string database_file() const
bool is_garbage_collectable
static RelaxedPathFilter * Create(const std::string &dirtab_path)
bool AllChunksNext(shash::Any *hash, zlib::Algorithms *compression_alg)
SharedPtr< string > temp_dir
bool AddHistory(const shash::Any &history)
unsigned char * raw_manifest_buf
int Main(const ArgumentList &args)
HttpObjectFetcher ObjectFetcher
static SqliteHistory * Open(const std::string &file_name)
const shash::Algorithms hash_algorithm
bool inspect_existing_catalogs
string * preload_cachedir
static bool ReadChecksum(const std::string &path, shash::Any *checksum)
FILE * CreateTempFile(const std::string &path_prefix, const int mode, const char *open_flags, std::string *final_path)
static void * MainWorker(void *data)
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
bool CopyMem2File(const unsigned char *buffer, const unsigned buffer_size, FILE *fdest)
const std::string * url() const
pthread_mutex_t lock_pipe
const shash::Suffix suffix
shash::Any GetPreviousRevision() const
assert((mem||(size==0))&&"Out Of Memory")
bool has_alt_catalog_path() const
static void Store(const string &local_path, const string &remote_path, const bool compressed_src)
unsigned whitelist_pkcs7_size
bool IsTerminateJob() const
upload::Spooler * spooler
string StringifyTime(const time_t seconds, const bool utc)
void MakePipe(int pipe_fd[2])
static void WaitForStorage()
bool apply_timestamp_threshold
bool AddCatalog(const shash::Any &catalog)
unsigned char digest[digest_size_]
uint64_t revision() const
static void SpoolerOnUpload(const upload::SpoolerResult &result)
unsigned char * whitelist_buf
bool FileExists(const std::string &path)
uint64_t GetLastModified() const
int64_t String2Int64(const string &value)
const char * Code2Ascii(const Failures error)
unsigned GetDigestSize() const
bool ExportBreadcrumb(const std::string &directory, const int mode) const
std::string local_path
the local_path previously given as input
virtual bool List(std::vector< Tag > *tags) const =0
unsigned raw_manifest_size
bool AddMetainfo(const shash::Any &metainfo)
vector< string > SplitString(const string &str, char delim)
virtual bool IsMatching(const std::string &path) const
const char kSuffixCatalog
bool DecompressPath2File(const string &src, FILE *fdest)
shash::Any certificate() const
bool IsProxyTransferError(const Failures error)
static void ReportDownloadError(const download::JobInfo &download_job)
catalog::RelaxedPathFilter * pathfilter
static void StoreBuffer(const unsigned char *buffer, const unsigned size, const std::string &dest_path, const bool compress)
unsigned char * whitelist_pkcs7_buf
static void HashDatabase(const std::string &database_path, shash::Any *hash_reflog)
shash::Any catalog_hash() const
manifest::Reflog * reflog
download::DownloadManager * download_manager
string StringifyInt(const int64_t value)
bool garbage_collectable() const
uint64_t timestamp_threshold
Failures error_code() const
const zlib::Algorithms compression_alg
void DropDatabaseFileOwnership()
bool DirectoryExists(const std::string &path)
atomic_int64 overall_chunks
bool AddCertificate(const shash::Any &certificate)
shash::Algorithms GetHashAlgorithm() const
std::string MakePathWithoutSuffix() const
std::vector< NestedCatalog > NestedCatalogList
const NestedCatalogList ListOwnNestedCatalogs() const
uint64_t String2Uint64(const string &value)
std::map< char, SharedPtr< std::string > > ArgumentList
Failures Fetch(JobInfo *info)
SharedPtr< string > stratum1_url
shash::Any history() const
bool CompressMem2File(const unsigned char *buf, const size_t size, FILE *fdest, shash::Any *compressed_hash)
uint64_t GetNumChunks() const
ChunkJob(const shash::Any &hash, zlib::Algorithms compression_alg)
static std::string MakePath(const shash::Any &hash)
static Catalog * AttachFreely(const std::string &imaginary_mountpoint, const std::string &file, const shash::Any &catalog_hash, Catalog *parent=NULL, const bool is_nested=false)
const unsigned kMaxDigestSize
bool DecompressPath2Path(const string &src, const string &dest)
std::string meta_info() const
static bool Peek(const string &remote_path)
void SafeSleepMs(const unsigned ms)
bool IsHostTransferError(const Failures error)
std::string MakePath() const
static bool WriteChecksum(const std::string &path, const shash::Any &value)
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
void ClosePipe(int pipe_fd[2])
SharedPtr< string > stratum0_url
shash::Any meta_info() const
void UnlockFile(const int filedes)
const char * Code2Ascii(const Failures error)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)