CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
fetch.cc
Go to the documentation of this file.
1 
6 #include "fetch.h"
7 
8 #include <unistd.h>
9 
10 #include "backoff.h"
11 #include "cache.h"
12 #include "clientctx.h"
13 #include "interrupt.h"
14 #include "network/download.h"
15 #include "quota.h"
16 #include "statistics.h"
17 #include "util/concurrency.h"
18 #include "util/logging.h"
19 #include "util/posix.h"
20 
21 using namespace std; // NOLINT
22 
23 namespace cvmfs {
24 
25 void TLSDestructor(void *data) {
27  data);
28  std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks = &tls->fetcher
29  ->tls_blocks_;
30 
31  {
33  for (vector<Fetcher::ThreadLocalStorage *>::iterator
34  i = tls_blocks->begin(),
35  iEnd = tls_blocks->end();
36  i != iEnd;
37  ++i) {
38  if ((*i) == tls) {
39  tls_blocks->erase(i);
40  break;
41  }
42  }
43  }
44  tls->fetcher->CleanupTls(tls);
45 }
46 
47 
52 void Fetcher::CleanupTls(ThreadLocalStorage *tls) {
53  ClosePipe(tls->pipe_wait);
54  delete tls;
55 }
56 
57 
61 Fetcher::ThreadLocalStorage *Fetcher::GetTls() {
62  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
63  pthread_getspecific(thread_local_storage_));
64  if (tls != NULL)
65  return tls;
66 
67  tls = new ThreadLocalStorage();
68  tls->fetcher = this;
69  MakePipe(tls->pipe_wait);
70  tls->download_job.SetCompressed(true);
71  tls->download_job.SetProbeHosts(true);
72  const int retval = pthread_setspecific(thread_local_storage_, tls);
73  assert(retval == 0);
74 
75  const MutexLockGuard m(lock_tls_blocks_);
76  tls_blocks_.push_back(tls);
77 
78  return tls;
79 }
80 
81 
83  const std::string &alt_url) {
84  int fd_return; // Read-only file descriptor that is returned
85  int retval;
86 
87  perf::Inc(n_invocations);
88 
89  // Try to open from local cache
90  if ((fd_return = OpenSelect(object)) >= 0) {
91  LogCvmfs(kLogCache, kLogDebug, "hit: %s", object.label.path.c_str());
92  return fd_return;
93  }
94 
95  if (object.id.IsNull()) {
96  // This has been seen when trying to load the root catalog signed by an
97  // invalid certificate on an empty cache
98  // TODO(jblomer): check if still necessary after the catalog reload refactor
99  LogCvmfs(kLogCache, kLogDebug, "cancel attempt to download null hash");
100  return -EIO;
101  }
102 
103  ThreadLocalStorage *tls = GetTls();
104 
105  // Synchronization point: either act as a master thread for this object or
106  // enqueue to the list of waiting threads.
107  pthread_mutex_lock(lock_queues_download_);
108  const ThreadQueues::iterator iDownloadQueue =
109  queues_download_.find(object.id);
110  if (iDownloadQueue != queues_download_.end()) {
111  LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s",
112  object.label.path.c_str());
113 
114  iDownloadQueue->second->push_back(tls->pipe_wait[1]);
115  pthread_mutex_unlock(lock_queues_download_);
116  ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int));
117 
118  LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s",
119  fd_return, object.label.path.c_str());
120  return fd_return;
121  } else {
122  // Seems we are the first one, check again in the cache (race condition)
123  fd_return = OpenSelect(object);
124  if (fd_return >= 0) {
125  pthread_mutex_unlock(lock_queues_download_);
126  return fd_return;
127  }
128 
129  // Create a new queue for this chunk
130  queues_download_[object.id] = &tls->other_pipes_waiting;
131  pthread_mutex_unlock(lock_queues_download_);
132  }
133 
134  perf::Inc(n_downloads);
135 
136  // Involve the download manager
137  LogCvmfs(kLogCache, kLogDebug, "downloading %s", object.label.path.c_str());
138  std::string url;
139  if (object.label.IsExternal()) {
140  url = !alt_url.empty() ? alt_url : object.label.path;
141  } else {
142  url = "/" + (alt_url.size() ? alt_url : "data/" + object.id.MakePath());
143  }
144  void *txn = alloca(cache_mgr_->SizeOfTxn());
145  retval = cache_mgr_->StartTxn(object.id, object.label.size, txn);
146  if (retval < 0) {
147  LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s",
148  object.label.path.c_str());
149  SignalWaitingThreads(retval, object.id, tls);
150  return retval;
151  }
152  cache_mgr_->CtrlTxn(object.label, 0, txn);
153 
154  LogCvmfs(kLogCache, kLogDebug, "miss: %s %s", object.label.path.c_str(),
155  url.c_str());
156  TransactionSink sink(cache_mgr_, txn);
157  tls->download_job.SetUrl(&url);
158  tls->download_job.SetSink(&sink);
159  tls->download_job.SetExpectedHash(&object.id);
160  tls->download_job.SetExtraInfo(&object.label.path);
162  if (ctx->IsSet()) {
163  ctx->Get(tls->download_job.GetUidPtr(),
164  tls->download_job.GetGidPtr(),
165  tls->download_job.GetPidPtr(),
167  }
168  tls->download_job.SetCompressed(object.label.zip_algorithm
169  == zlib::kZlibDefault);
170  tls->download_job.SetRangeOffset(object.label.range_offset);
171  tls->download_job.SetRangeSize(static_cast<int64_t>(object.label.size));
173 
174  if (tls->download_job.error_code() == download::kFailOk) {
175  LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str());
176 
177  fd_return = cache_mgr_->OpenFromTxn(txn);
178  if (fd_return < 0) {
179  cache_mgr_->AbortTxn(txn);
180  SignalWaitingThreads(fd_return, object.id, tls);
181  return fd_return;
182  }
183 
184  retval = cache_mgr_->CommitTxn(txn);
185  if (retval < 0) {
186  cache_mgr_->Close(fd_return);
187  SignalWaitingThreads(retval, object.id, tls);
188  return retval;
189  }
190  SignalWaitingThreads(fd_return, object.id, tls);
191  return fd_return;
192  }
193 
194  // Download failed
196  "failed to fetch %s (hash: %s, error %d [%s])",
197  object.label.path.c_str(), object.id.ToString().c_str(),
198  tls->download_job.error_code(),
200  cache_mgr_->AbortTxn(txn);
201  backoff_throttle_->Throttle();
202  SignalWaitingThreads(-EIO, object.id, tls);
203  return -EIO;
204 }
205 
206 
207 Fetcher::Fetcher(CacheManager *cache_mgr,
208  download::DownloadManager *download_mgr,
209  BackoffThrottle *backoff_throttle,
210  perf::StatisticsTemplate statistics)
211  : lock_queues_download_(NULL)
212  , lock_tls_blocks_(NULL)
213  , cache_mgr_(cache_mgr)
214  , download_mgr_(download_mgr)
215  , backoff_throttle_(backoff_throttle) {
216  int retval;
217  retval = pthread_key_create(&thread_local_storage_, TLSDestructor);
218  assert(retval == 0);
219  lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>(
220  smalloc(sizeof(pthread_mutex_t)));
221  retval = pthread_mutex_init(lock_queues_download_, NULL);
222  assert(retval == 0);
223  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
224  smalloc(sizeof(pthread_mutex_t)));
225  retval = pthread_mutex_init(lock_tls_blocks_, NULL);
226  assert(retval == 0);
227  n_downloads = statistics.RegisterTemplated(
228  "n_downloads",
229  "overall number of downloaded files (incl. catalogs, chunks)");
230  n_invocations = statistics.RegisterTemplated(
231  "n_invocations",
232  "overall number of object requests (incl. catalogs, chunks)");
233 }
234 
235 
237  int retval;
238 
239  {
241  for (unsigned i = 0; i < tls_blocks_.size(); ++i)
243  }
244 
245  retval = pthread_mutex_destroy(lock_tls_blocks_);
246  assert(retval == 0);
247  free(lock_tls_blocks_);
248 
249  retval = pthread_mutex_destroy(lock_queues_download_);
250  assert(retval == 0);
251  free(lock_queues_download_);
252 
253  retval = pthread_key_delete(thread_local_storage_);
254  assert(retval == 0);
255 }
256 
257 
263  if (object.label.IsCatalog() || object.label.IsPinned()) {
264  return cache_mgr_->OpenPinned(object);
265  } else {
266  return cache_mgr_->Open(object);
267  }
268 }
269 
270 
272  const shash::Any &id,
273  ThreadLocalStorage *tls) {
275  for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) {
276  int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd;
277  WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int));
278  }
279  tls->other_pipes_waiting.clear();
280  queues_download_.erase(id);
281 }
282 
283 } // namespace cvmfs
void CleanupTls(ThreadLocalStorage *tls)
Definition: fetch.cc:52
friend void TLSDestructor(void *data)
Definition: fetch.cc:25
download::DownloadManager * download_mgr_
Definition: repository.h:140
struct cvmcache_context * ctx
perf::Counter * n_invocations
Definition: fetch.h:189
uid_t * GetUidPtr()
Definition: jobinfo.h:157
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
Definition: clientctx.cc:57
CacheManager * cache_mgr_
Definition: fetch.h:185
ThreadQueues queues_download_
Definition: fetch.h:175
int OpenPinned(const LabeledObject &object)
Definition: cache.cc:162
void SetRangeOffset(off_t range_offset)
Definition: jobinfo.h:237
assert((mem||(size==0))&&"Out Of Memory")
std::vector< ThreadLocalStorage * > tls_blocks_
Definition: fetch.h:182
std::vector< int > other_pipes_waiting
Definition: fetch.h:148
pthread_mutex_t * lock_tls_blocks_
Definition: fetch.h:183
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
pthread_mutex_t * lock_queues_download_
Definition: fetch.h:176
bool IsSet()
Definition: clientctx.cc:74
virtual int Open(const LabeledObject &object)=0
Counter * RegisterTemplated(const std::string &name_minor, const std::string &desc)
Definition: statistics.h:109
const char * Code2Ascii(const Failures error)
int OpenSelect(const CacheManager::LabeledObject &object)
Definition: fetch.cc:262
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
void SetExpectedHash(const shash::Any *expected_hash)
Definition: jobinfo.h:232
InterruptCue ** GetInterruptCuePtr()
Definition: jobinfo.h:159
void SetRangeSize(off_t range_size)
Definition: jobinfo.h:238
void SetCompressed(bool compressed)
Definition: jobinfo.h:217
Failures error_code() const
Definition: jobinfo.h:200
void Inc(class Counter *counter)
Definition: statistics.h:50
pid_t * GetPidPtr()
Definition: jobinfo.h:156
void SignalWaitingThreads(const int fd, const shash::Any &id, ThreadLocalStorage *tls)
Definition: fetch.cc:271
void SetSink(cvmfs::Sink *sink)
Definition: jobinfo.h:231
void SetExtraInfo(const std::string *extra_info)
Definition: jobinfo.h:235
download::JobInfo download_job
Definition: fetch.h:153
Failures Fetch(JobInfo *info)
Definition: download.cc:1984
Definition: mutex.h:42
void TLSDestructor(void *data)
Definition: fetch.cc:25
void SetUrl(const std::string *url)
Definition: jobinfo.h:216
virtual int Dup(int fd)=0
perf::Counter * n_downloads
Definition: fetch.h:188
void SetProbeHosts(bool probe_hosts)
Definition: jobinfo.h:218
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
pthread_key_t thread_local_storage_
Definition: fetch.h:173
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
static ClientCtx * GetInstance()
Definition: clientctx.cc:45
gid_t * GetGidPtr()
Definition: jobinfo.h:158
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545