28 std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks = &tls->
fetcher
33 for (vector<Fetcher::ThreadLocalStorage *>::iterator
34 i = tls_blocks->begin(),
35 iEnd = tls_blocks->end();
63 pthread_getspecific(thread_local_storage_));
72 int retval = pthread_setspecific(thread_local_storage_, tls);
76 tls_blocks_.push_back(tls);
83 const std::string &alt_url) {
90 if ((fd_return = OpenSelect(
object)) >= 0) {
95 if (
object.
id.IsNull()) {
107 pthread_mutex_lock(lock_queues_download_);
108 ThreadQueues::iterator iDownloadQueue = queues_download_.find(
object.
id);
109 if (iDownloadQueue != queues_download_.end()) {
111 object.label.path.c_str());
113 iDownloadQueue->second->push_back(tls->
pipe_wait[1]);
114 pthread_mutex_unlock(lock_queues_download_);
118 fd_return,
object.label.path.c_str());
122 fd_return = OpenSelect(
object);
123 if (fd_return >= 0) {
124 pthread_mutex_unlock(lock_queues_download_);
130 pthread_mutex_unlock(lock_queues_download_);
138 if (
object.label.IsExternal()) {
139 url = !alt_url.empty() ? alt_url :
object.label.path;
141 url =
"/" + (alt_url.size() ? alt_url :
"data/" +
object.id.MakePath());
143 void *txn = alloca(cache_mgr_->SizeOfTxn());
144 retval = cache_mgr_->StartTxn(
object.
id,
object.label.size, txn);
147 object.label.path.c_str());
148 SignalWaitingThreads(retval,
object.
id, tls);
151 cache_mgr_->CtrlTxn(
object.label, 0, txn);
176 fd_return = cache_mgr_->OpenFromTxn(txn);
178 cache_mgr_->AbortTxn(txn);
179 SignalWaitingThreads(fd_return,
object.
id, tls);
183 retval = cache_mgr_->CommitTxn(txn);
185 cache_mgr_->Close(fd_return);
186 SignalWaitingThreads(retval,
object.
id, tls);
189 SignalWaitingThreads(fd_return,
object.
id, tls);
195 "failed to fetch %s (hash: %s, error %d [%s])",
196 object.label.path.c_str(),
object.id.ToString().c_str(),
199 cache_mgr_->AbortTxn(txn);
200 backoff_throttle_->Throttle();
201 SignalWaitingThreads(-EIO,
object.
id, tls);
210 : lock_queues_download_(NULL)
211 , lock_tls_blocks_(NULL)
212 , cache_mgr_(cache_mgr)
214 , backoff_throttle_(backoff_throttle) {
219 smalloc(
sizeof(pthread_mutex_t)));
223 smalloc(
sizeof(pthread_mutex_t)));
228 "overall number of downloaded files (incl. catalogs, chunks)");
231 "overall number of object requests (incl. catalogs, chunks)");
262 if (
object.label.IsCatalog() ||
object.label.IsPinned()) {
void CleanupTls(ThreadLocalStorage *tls)
friend void TLSDestructor(void *data)
download::DownloadManager * download_mgr_
struct cvmcache_context * ctx
perf::Counter * n_invocations
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
CacheManager * cache_mgr_
ThreadQueues queues_download_
int OpenPinned(const LabeledObject &object)
void SetRangeOffset(off_t range_offset)
assert((mem||(size==0))&&"Out Of Memory")
std::vector< ThreadLocalStorage * > tls_blocks_
std::vector< int > other_pipes_waiting
pthread_mutex_t * lock_tls_blocks_
void MakePipe(int pipe_fd[2])
pthread_mutex_t * lock_queues_download_
virtual int Open(const LabeledObject &object)=0
Counter * RegisterTemplated(const std::string &name_minor, const std::string &desc)
const char * Code2Ascii(const Failures error)
int OpenSelect(const CacheManager::LabeledObject &object)
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void SetExpectedHash(const shash::Any *expected_hash)
InterruptCue ** GetInterruptCuePtr()
void SetRangeSize(off_t range_size)
void SetCompressed(bool compressed)
Failures error_code() const
void Inc(class Counter *counter)
void SignalWaitingThreads(const int fd, const shash::Any &id, ThreadLocalStorage *tls)
void SetSink(cvmfs::Sink *sink)
void SetExtraInfo(const std::string *extra_info)
download::JobInfo download_job
Failures Fetch(JobInfo *info)
void TLSDestructor(void *data)
void SetUrl(const std::string *url)
virtual int Dup(int fd)=0
perf::Counter * n_downloads
void SetProbeHosts(bool probe_hosts)
void WritePipe(int fd, const void *buf, size_t nbyte)
pthread_key_t thread_local_storage_
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
static ClientCtx * GetInstance()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)