CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
fetch.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "fetch.h"
7 
8 #include <unistd.h>
9 
10 #include "backoff.h"
11 #include "cache.h"
12 #include "clientctx.h"
13 #include "interrupt.h"
14 #include "network/download.h"
15 #include "quota.h"
16 #include "statistics.h"
17 #include "util/concurrency.h"
18 #include "util/logging.h"
19 #include "util/posix.h"
20 
21 using namespace std; // NOLINT
22 
23 namespace cvmfs {
24 
25 void TLSDestructor(void *data) {
27  static_cast<Fetcher::ThreadLocalStorage *>(data);
28  std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks =
29  &tls->fetcher->tls_blocks_;
30 
31  {
33  for (vector<Fetcher::ThreadLocalStorage *>::iterator
34  i = tls_blocks->begin(),
35  iEnd = tls_blocks->end();
36  i != iEnd; ++i) {
37  if ((*i) == tls) {
38  tls_blocks->erase(i);
39  break;
40  }
41  }
42  }
43  tls->fetcher->CleanupTls(tls);
44 }
45 
46 
51 void Fetcher::CleanupTls(ThreadLocalStorage *tls) {
52  ClosePipe(tls->pipe_wait);
53  delete tls;
54 }
55 
56 
60 Fetcher::ThreadLocalStorage *Fetcher::GetTls() {
61  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
62  pthread_getspecific(thread_local_storage_));
63  if (tls != NULL)
64  return tls;
65 
66  tls = new ThreadLocalStorage();
67  tls->fetcher = this;
68  MakePipe(tls->pipe_wait);
69  tls->download_job.SetCompressed(true);
70  tls->download_job.SetProbeHosts(true);
71  int retval = pthread_setspecific(thread_local_storage_, tls);
72  assert(retval == 0);
73 
74  MutexLockGuard m(lock_tls_blocks_);
75  tls_blocks_.push_back(tls);
76 
77  return tls;
78 }
79 
80 
82  const CacheManager::LabeledObject &object,
83  const std::string &alt_url)
84 {
85  int fd_return; // Read-only file descriptor that is returned
86  int retval;
87 
88  perf::Inc(n_invocations);
89 
90  // Try to open from local cache
91  if ((fd_return = OpenSelect(object)) >= 0) {
92  LogCvmfs(kLogCache, kLogDebug, "hit: %s", object.label.path.c_str());
93  return fd_return;
94  }
95 
96  if (object.id.IsNull()) {
97  // This has been seen when trying to load the root catalog signed by an
98  // invalid certificate on an empty cache
99  // TODO(jblomer): check if still necessary after the catalog reload refactor
100  LogCvmfs(kLogCache, kLogDebug, "cancel attempt to download null hash");
101  return -EIO;
102  }
103 
104  ThreadLocalStorage *tls = GetTls();
105 
106  // Synchronization point: either act as a master thread for this object or
107  // enqueue to the list of waiting threads.
108  pthread_mutex_lock(lock_queues_download_);
109  ThreadQueues::iterator iDownloadQueue = queues_download_.find(object.id);
110  if (iDownloadQueue != queues_download_.end()) {
111  LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s",
112  object.label.path.c_str());
113 
114  iDownloadQueue->second->push_back(tls->pipe_wait[1]);
115  pthread_mutex_unlock(lock_queues_download_);
116  ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int));
117 
118  LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s",
119  fd_return, object.label.path.c_str());
120  return fd_return;
121  } else {
122  // Seems we are the first one, check again in the cache (race condition)
123  fd_return = OpenSelect(object);
124  if (fd_return >= 0) {
125  pthread_mutex_unlock(lock_queues_download_);
126  return fd_return;
127  }
128 
129  // Create a new queue for this chunk
130  queues_download_[object.id] = &tls->other_pipes_waiting;
131  pthread_mutex_unlock(lock_queues_download_);
132  }
133 
134  perf::Inc(n_downloads);
135 
136  // Involve the download manager
137  LogCvmfs(kLogCache, kLogDebug, "downloading %s", object.label.path.c_str());
138  std::string url;
139  if (object.label.IsExternal()) {
140  url = !alt_url.empty() ? alt_url : object.label.path;
141  } else {
142  url = "/" + (alt_url.size() ? alt_url : "data/" + object.id.MakePath());
143  }
144  void *txn = alloca(cache_mgr_->SizeOfTxn());
145  retval = cache_mgr_->StartTxn(object.id, object.label.size, txn);
146  if (retval < 0) {
147  LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s",
148  object.label.path.c_str());
149  SignalWaitingThreads(retval, object.id, tls);
150  return retval;
151  }
152  cache_mgr_->CtrlTxn(object.label, 0, txn);
153 
154  LogCvmfs(kLogCache, kLogDebug, "miss: %s %s",
155  object.label.path.c_str(), url.c_str());
156  TransactionSink sink(cache_mgr_, txn);
157  tls->download_job.SetUrl(&url);
158  tls->download_job.SetSink(&sink);
159  tls->download_job.SetExpectedHash(&object.id);
160  tls->download_job.SetExtraInfo(&object.label.path);
162  if (ctx->IsSet()) {
163  ctx->Get(tls->download_job.GetUidPtr(),
164  tls->download_job.GetGidPtr(),
165  tls->download_job.GetPidPtr(),
167  }
169  object.label.zip_algorithm == zlib::kZlibDefault);
170  tls->download_job.SetRangeOffset(object.label.range_offset);
171  tls->download_job.SetRangeSize(static_cast<int64_t>(object.label.size));
173 
174  if (tls->download_job.error_code() == download::kFailOk) {
175  LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str());
176 
177  fd_return = cache_mgr_->OpenFromTxn(txn);
178  if (fd_return < 0) {
179  cache_mgr_->AbortTxn(txn);
180  SignalWaitingThreads(fd_return, object.id, tls);
181  return fd_return;
182  }
183 
184  retval = cache_mgr_->CommitTxn(txn);
185  if (retval < 0) {
186  cache_mgr_->Close(fd_return);
187  SignalWaitingThreads(retval, object.id, tls);
188  return retval;
189  }
190  SignalWaitingThreads(fd_return, object.id, tls);
191  return fd_return;
192  }
193 
194  // Download failed
196  "failed to fetch %s (hash: %s, error %d [%s])",
197  object.label.path.c_str(),
198  object.id.ToString().c_str(),
199  tls->download_job.error_code(),
201  cache_mgr_->AbortTxn(txn);
202  backoff_throttle_->Throttle();
203  SignalWaitingThreads(-EIO, object.id, tls);
204  return -EIO;
205 }
206 
207 
208 Fetcher::Fetcher(
209  CacheManager *cache_mgr,
210  download::DownloadManager *download_mgr,
211  BackoffThrottle *backoff_throttle,
212  perf::StatisticsTemplate statistics)
213  : lock_queues_download_(NULL)
214  , lock_tls_blocks_(NULL)
215  , cache_mgr_(cache_mgr)
216  , download_mgr_(download_mgr)
217  , backoff_throttle_(backoff_throttle)
218 {
219  int retval;
220  retval = pthread_key_create(&thread_local_storage_, TLSDestructor);
221  assert(retval == 0);
222  lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>(
223  smalloc(sizeof(pthread_mutex_t)));
224  retval = pthread_mutex_init(lock_queues_download_, NULL);
225  assert(retval == 0);
226  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
227  smalloc(sizeof(pthread_mutex_t)));
228  retval = pthread_mutex_init(lock_tls_blocks_, NULL);
229  assert(retval == 0);
230  n_downloads = statistics.RegisterTemplated("n_downloads",
231  "overall number of downloaded files (incl. catalogs, chunks)");
232  n_invocations = statistics.RegisterTemplated("n_invocations",
233  "overall number of object requests (incl. catalogs, chunks)");
234 }
235 
236 
238  int retval;
239 
240  {
242  for (unsigned i = 0; i < tls_blocks_.size(); ++i)
244  }
245 
246  retval = pthread_mutex_destroy(lock_tls_blocks_);
247  assert(retval == 0);
248  free(lock_tls_blocks_);
249 
250  retval = pthread_mutex_destroy(lock_queues_download_);
251  assert(retval == 0);
252  free(lock_queues_download_);
253 
254  retval = pthread_key_delete(thread_local_storage_);
255  assert(retval == 0);
256 }
257 
258 
264  if (object.label.IsCatalog() || object.label.IsPinned()) {
265  return cache_mgr_->OpenPinned(object);
266  } else {
267  return cache_mgr_->Open(object);
268  }
269 }
270 
271 
273  const int fd,
274  const shash::Any &id,
275  ThreadLocalStorage *tls)
276 {
278  for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) {
279  int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd;
280  WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int));
281  }
282  tls->other_pipes_waiting.clear();
283  queues_download_.erase(id);
284 }
285 
286 } // namespace cvmfs
void CleanupTls(ThreadLocalStorage *tls)
Definition: fetch.cc:51
friend void TLSDestructor(void *data)
Definition: fetch.cc:25
download::DownloadManager * download_mgr_
Definition: repository.h:141
struct cvmcache_context * ctx
perf::Counter * n_invocations
Definition: fetch.h:197
uid_t * GetUidPtr()
Definition: jobinfo.h:159
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
Definition: clientctx.cc:57
CacheManager * cache_mgr_
Definition: fetch.h:193
ThreadQueues queues_download_
Definition: fetch.h:183
int OpenPinned(const LabeledObject &object)
Definition: cache.cc:168
void SetRangeOffset(off_t range_offset)
Definition: jobinfo.h:233
assert((mem||(size==0))&&"Out Of Memory")
std::vector< ThreadLocalStorage * > tls_blocks_
Definition: fetch.h:190
std::vector< int > other_pipes_waiting
Definition: fetch.h:156
pthread_mutex_t * lock_tls_blocks_
Definition: fetch.h:191
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
pthread_mutex_t * lock_queues_download_
Definition: fetch.h:184
bool IsSet()
Definition: clientctx.cc:74
virtual int Open(const LabeledObject &object)=0
Counter * RegisterTemplated(const std::string &name_minor, const std::string &desc)
Definition: statistics.h:111
const char * Code2Ascii(const Failures error)
int OpenSelect(const CacheManager::LabeledObject &object)
Definition: fetch.cc:263
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void SetExpectedHash(const shash::Any *expected_hash)
Definition: jobinfo.h:228
InterruptCue ** GetInterruptCuePtr()
Definition: jobinfo.h:161
void SetRangeSize(off_t range_size)
Definition: jobinfo.h:234
void SetCompressed(bool compressed)
Definition: jobinfo.h:214
Failures error_code() const
Definition: jobinfo.h:200
void Inc(class Counter *counter)
Definition: statistics.h:50
pid_t * GetPidPtr()
Definition: jobinfo.h:158
void SignalWaitingThreads(const int fd, const shash::Any &id, ThreadLocalStorage *tls)
Definition: fetch.cc:272
void SetSink(cvmfs::Sink *sink)
Definition: jobinfo.h:226
void SetExtraInfo(const std::string *extra_info)
Definition: jobinfo.h:230
download::JobInfo download_job
Definition: fetch.h:161
Failures Fetch(JobInfo *info)
Definition: download.cc:1860
Definition: mutex.h:42
void TLSDestructor(void *data)
Definition: fetch.cc:25
void SetUrl(const std::string *url)
Definition: jobinfo.h:213
virtual int Dup(int fd)=0
perf::Counter * n_downloads
Definition: fetch.h:196
void SetProbeHosts(bool probe_hosts)
Definition: jobinfo.h:215
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
pthread_key_t thread_local_storage_
Definition: fetch.h:181
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:551
static ClientCtx * GetInstance()
Definition: clientctx.cc:45
gid_t * GetGidPtr()
Definition: jobinfo.h:160
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528