28 std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks =
33 for (vector<Fetcher::ThreadLocalStorage *>::iterator
34 i = tls_blocks->begin(),
35 iEnd = tls_blocks->end();
62 pthread_getspecific(thread_local_storage_));
71 int retval = pthread_setspecific(thread_local_storage_, tls);
75 tls_blocks_.push_back(tls);
83 const std::string &alt_url)
91 if ((fd_return = OpenSelect(
object)) >= 0) {
96 if (
object.
id.IsNull()) {
108 pthread_mutex_lock(lock_queues_download_);
109 ThreadQueues::iterator iDownloadQueue = queues_download_.find(
object.
id);
110 if (iDownloadQueue != queues_download_.end()) {
112 object.label.path.c_str());
114 iDownloadQueue->second->push_back(tls->
pipe_wait[1]);
115 pthread_mutex_unlock(lock_queues_download_);
119 fd_return,
object.label.path.c_str());
123 fd_return = OpenSelect(
object);
124 if (fd_return >= 0) {
125 pthread_mutex_unlock(lock_queues_download_);
131 pthread_mutex_unlock(lock_queues_download_);
139 if (
object.label.IsExternal()) {
140 url = !alt_url.empty() ? alt_url :
object.label.path;
142 url =
"/" + (alt_url.size() ? alt_url :
"data/" +
object.id.MakePath());
144 void *txn = alloca(cache_mgr_->SizeOfTxn());
145 retval = cache_mgr_->StartTxn(
object.
id,
object.label.size, txn);
148 object.label.path.c_str());
149 SignalWaitingThreads(retval,
object.
id, tls);
152 cache_mgr_->CtrlTxn(
object.label, 0, txn);
155 object.label.path.c_str(), url.c_str());
177 fd_return = cache_mgr_->OpenFromTxn(txn);
179 cache_mgr_->AbortTxn(txn);
180 SignalWaitingThreads(fd_return,
object.
id, tls);
184 retval = cache_mgr_->CommitTxn(txn);
186 cache_mgr_->Close(fd_return);
187 SignalWaitingThreads(retval,
object.
id, tls);
190 SignalWaitingThreads(fd_return,
object.
id, tls);
196 "failed to fetch %s (hash: %s, error %d [%s])",
197 object.label.path.c_str(),
198 object.id.ToString().c_str(),
201 cache_mgr_->AbortTxn(txn);
202 backoff_throttle_->Throttle();
203 SignalWaitingThreads(-EIO,
object.
id, tls);
213 : lock_queues_download_(NULL)
214 , lock_tls_blocks_(NULL)
215 , cache_mgr_(cache_mgr)
217 , backoff_throttle_(backoff_throttle)
223 smalloc(
sizeof(pthread_mutex_t)));
227 smalloc(
sizeof(pthread_mutex_t)));
231 "overall number of downloaded files (incl. catalogs, chunks)");
233 "overall number of object requests (incl. catalogs, chunks)");
264 if (
object.label.IsCatalog() ||
object.label.IsPinned()) {
void CleanupTls(ThreadLocalStorage *tls)
friend void TLSDestructor(void *data)
download::DownloadManager * download_mgr_
struct cvmcache_context * ctx
perf::Counter * n_invocations
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
CacheManager * cache_mgr_
ThreadQueues queues_download_
int OpenPinned(const LabeledObject &object)
void SetRangeOffset(off_t range_offset)
assert((mem||(size==0))&&"Out Of Memory")
std::vector< ThreadLocalStorage * > tls_blocks_
std::vector< int > other_pipes_waiting
pthread_mutex_t * lock_tls_blocks_
void MakePipe(int pipe_fd[2])
pthread_mutex_t * lock_queues_download_
virtual int Open(const LabeledObject &object)=0
Counter * RegisterTemplated(const std::string &name_minor, const std::string &desc)
const char * Code2Ascii(const Failures error)
int OpenSelect(const CacheManager::LabeledObject &object)
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void SetExpectedHash(const shash::Any *expected_hash)
InterruptCue ** GetInterruptCuePtr()
void SetRangeSize(off_t range_size)
void SetCompressed(bool compressed)
Failures error_code() const
void Inc(class Counter *counter)
void SignalWaitingThreads(const int fd, const shash::Any &id, ThreadLocalStorage *tls)
void SetSink(cvmfs::Sink *sink)
void SetExtraInfo(const std::string *extra_info)
download::JobInfo download_job
Failures Fetch(JobInfo *info)
void TLSDestructor(void *data)
void SetUrl(const std::string *url)
virtual int Dup(int fd)=0
perf::Counter * n_downloads
void SetProbeHosts(bool probe_hosts)
void WritePipe(int fd, const void *buf, size_t nbyte)
pthread_key_t thread_local_storage_
void ReadPipe(int fd, void *buf, size_t nbyte)
void ClosePipe(int pipe_fd[2])
static ClientCtx * GetInstance()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)