CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
fetch.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "fetch.h"
7 
8 #include <unistd.h>
9 
10 #include "backoff.h"
11 #include "cache.h"
12 #include "clientctx.h"
13 #include "download.h"
14 #include "logging.h"
15 #include "quota.h"
16 #include "statistics.h"
17 #include "util/posix.h"
18 #include "util_concurrency.h"
19 
20 using namespace std; // NOLINT
21 
22 namespace cvmfs {
23 
24 void TLSDestructor(void *data) {
26  static_cast<Fetcher::ThreadLocalStorage *>(data);
27  std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks =
28  &tls->fetcher->tls_blocks_;
29 
30  {
32  for (vector<Fetcher::ThreadLocalStorage *>::iterator
33  i = tls_blocks->begin(),
34  iEnd = tls_blocks->end();
35  i != iEnd; ++i) {
36  if ((*i) == tls) {
37  tls_blocks->erase(i);
38  break;
39  }
40  }
41  }
42  tls->fetcher->CleanupTls(tls);
43 }
44 
45 
50 void Fetcher::CleanupTls(ThreadLocalStorage *tls) {
51  ClosePipe(tls->pipe_wait);
52  delete tls;
53 }
54 
55 
59 Fetcher::ThreadLocalStorage *Fetcher::GetTls() {
60  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
61  pthread_getspecific(thread_local_storage_));
62  if (tls != NULL)
63  return tls;
64 
65  tls = new ThreadLocalStorage();
66  tls->fetcher = this;
67  MakePipe(tls->pipe_wait);
69  tls->download_job.compressed = true;
70  tls->download_job.probe_hosts = true;
71  int retval = pthread_setspecific(thread_local_storage_, tls);
72  assert(retval == 0);
73 
74  MutexLockGuard m(lock_tls_blocks_);
75  tls_blocks_.push_back(tls);
76 
77  return tls;
78 }
79 
80 
82  const shash::Any &id,
83  const uint64_t size,
84  const std::string &name,
85  const zlib::Algorithms compression_algorithm,
86  const CacheManager::ObjectType object_type,
87  const std::string &alt_url,
88  off_t range_offset)
89 {
90  int fd_return; // Read-only file descriptor that is returned
91  int retval;
92 
93  perf::Inc(n_invocations);
94 
95  // Try to open from local cache
96  if ((fd_return = OpenSelect(id, name, object_type)) >= 0) {
97  LogCvmfs(kLogCache, kLogDebug, "hit: %s", name.c_str());
98  return fd_return;
99  }
100 
101  ThreadLocalStorage *tls = GetTls();
102 
103  // Synchronization point: either act as a master thread for this object or
104  // enqueue to the list of waiting threads.
105  pthread_mutex_lock(lock_queues_download_);
106  ThreadQueues::iterator iDownloadQueue = queues_download_.find(id);
107  if (iDownloadQueue != queues_download_.end()) {
108  LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s", name.c_str());
109 
110  iDownloadQueue->second->push_back(tls->pipe_wait[1]);
111  pthread_mutex_unlock(lock_queues_download_);
112  ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int));
113 
114  LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s",
115  fd_return, name.c_str());
116  return fd_return;
117  } else {
118  // Seems we are the first one, check again in the cache (race condition)
119  fd_return = OpenSelect(id, name, object_type);
120  if (fd_return >= 0) {
121  pthread_mutex_unlock(lock_queues_download_);
122  return fd_return;
123  }
124 
125  // Create a new queue for this chunk
126  queues_download_[id] = &tls->other_pipes_waiting;
127  pthread_mutex_unlock(lock_queues_download_);
128  }
129 
130  perf::Inc(n_downloads);
131 
132  // Involve the download manager
133  LogCvmfs(kLogCache, kLogDebug, "downloading %s", name.c_str());
134  std::string url;
135  if (external_) {
136  url = !alt_url.empty() ? alt_url : name;
137  } else {
138  url = "/" + (alt_url.size() ? alt_url : "data/" + id.MakePath());
139  }
140  void *txn = alloca(cache_mgr_->SizeOfTxn());
141  retval = cache_mgr_->StartTxn(id, size, txn);
142  if (retval < 0) {
143  LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s",
144  name.c_str());
145  SignalWaitingThreads(retval, id, tls);
146  return retval;
147  }
148  cache_mgr_->CtrlTxn(CacheManager::ObjectInfo(object_type, name), 0, txn);
149 
150  LogCvmfs(kLogCache, kLogDebug, "miss: %s %s", name.c_str(), url.c_str());
151  TransactionSink sink(cache_mgr_, txn);
152  tls->download_job.url = &url;
153  tls->download_job.destination_sink = &sink;
154  tls->download_job.expected_hash = &id;
155  tls->download_job.extra_info = &name;
157  if (ctx->IsSet()) {
158  ctx->Get(&tls->download_job.uid,
159  &tls->download_job.gid,
160  &tls->download_job.pid);
161  }
162  tls->download_job.compressed = (compression_algorithm == zlib::kZlibDefault);
163  tls->download_job.range_offset = range_offset;
166 
168  LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str());
169 
170  fd_return = cache_mgr_->OpenFromTxn(txn);
171  if (fd_return < 0) {
172  cache_mgr_->AbortTxn(txn);
173  SignalWaitingThreads(fd_return, id, tls);
174  return fd_return;
175  }
176 
177  retval = cache_mgr_->CommitTxn(txn);
178  if (retval < 0) {
179  cache_mgr_->Close(fd_return);
180  SignalWaitingThreads(retval, id, tls);
181  return retval;
182  }
183  SignalWaitingThreads(fd_return, id, tls);
184  return fd_return;
185  }
186 
187  // Download failed
189  "failed to fetch %s (hash: %s, error %d [%s])", name.c_str(),
190  id.ToString().c_str(), tls->download_job.error_code,
192  cache_mgr_->AbortTxn(txn);
193  backoff_throttle_->Throttle();
194  SignalWaitingThreads(-EIO, id, tls);
195  return -EIO;
196 }
197 
198 
199 Fetcher::Fetcher(
200  CacheManager *cache_mgr,
201  download::DownloadManager *download_mgr,
202  BackoffThrottle *backoff_throttle,
203  perf::StatisticsTemplate statistics,
204  bool external)
205  : external_(external)
206  , lock_queues_download_(NULL)
207  , lock_tls_blocks_(NULL)
208  , cache_mgr_(cache_mgr)
209  , download_mgr_(download_mgr)
210  , backoff_throttle_(backoff_throttle)
211 {
212  int retval;
213  retval = pthread_key_create(&thread_local_storage_, TLSDestructor);
214  assert(retval == 0);
215  lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>(
216  smalloc(sizeof(pthread_mutex_t)));
217  retval = pthread_mutex_init(lock_queues_download_, NULL);
218  assert(retval == 0);
219  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
220  smalloc(sizeof(pthread_mutex_t)));
221  retval = pthread_mutex_init(lock_tls_blocks_, NULL);
222  assert(retval == 0);
223  n_downloads = statistics.RegisterTemplated("n_downloads",
224  "overall number of downloaded files (incl. catalogs, chunks)");
225  n_invocations = statistics.RegisterTemplated("n_invocations",
226  "overall number of object requests (incl. catalogs, chunks)");
227 }
228 
229 
231  int retval;
232 
233  {
235  for (unsigned i = 0; i < tls_blocks_.size(); ++i)
237  }
238 
239  retval = pthread_mutex_destroy(lock_tls_blocks_);
240  assert(retval == 0);
241  free(lock_tls_blocks_);
242 
243  retval = pthread_mutex_destroy(lock_queues_download_);
244  assert(retval == 0);
245  free(lock_queues_download_);
246 
247  retval = pthread_key_delete(thread_local_storage_);
248  assert(retval == 0);
249 }
250 
251 
257  const shash::Any &id,
258  const std::string &name,
259  const CacheManager::ObjectType object_type)
260 {
261  bool is_catalog = object_type == CacheManager::kTypeCatalog;
262  if (is_catalog || (object_type == CacheManager::kTypePinned)) {
263  return cache_mgr_->OpenPinned(id, name, is_catalog);
264  } else {
265  return cache_mgr_->Open(CacheManager::Bless(id, object_type, name));
266  }
267 }
268 
269 
271  const int fd,
272  const shash::Any &id,
273  ThreadLocalStorage *tls)
274 {
276  for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) {
277  int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd;
278  WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int));
279  }
280  tls->other_pipes_waiting.clear();
281  queues_download_.erase(id);
282 }
283 
284 } // namespace cvmfs
Destination destination
Definition: download.h:161
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
void CleanupTls(ThreadLocalStorage *tls)
Definition: fetch.cc:50
friend void TLSDestructor(void *data)
Definition: fetch.cc:24
download::DownloadManager * download_mgr_
Definition: repository.h:140
struct cvmcache_context * ctx
int OpenPinned(const shash::Any &id, const std::string &description, bool is_catalog)
Definition: cache.cc:170
perf::Counter * n_invocations
Definition: fetch.h:164
CacheManager * cache_mgr_
Definition: fetch.h:160
ThreadQueues queues_download_
Definition: fetch.h:150
off_t range_offset
Definition: download.h:174
assert((mem||(size==0))&&"Out Of Memory")
std::vector< ThreadLocalStorage * > tls_blocks_
Definition: fetch.h:157
std::vector< int > other_pipes_waiting
Definition: fetch.h:114
pthread_mutex_t * lock_tls_blocks_
Definition: fetch.h:158
virtual int Open(const BlessedObject &object)=0
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
pthread_mutex_t * lock_queues_download_
Definition: fetch.h:151
bool IsSet()
Definition: clientctx.cc:71
Counter * RegisterTemplated(const std::string &name_minor, const std::string &desc)
Definition: statistics.h:109
const char * Code2Ascii(const Failures error)
Definition: download.h:87
Algorithms
Definition: compression.h:44
Failures Fetch(const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
const shash::Any * expected_hash
Definition: download.h:170
void Inc(class Counter *counter)
Definition: statistics.h:50
const std::string * extra_info
Definition: download.h:171
int OpenSelect(const shash::Any &id, const std::string &name, const CacheManager::ObjectType object_type)
Definition: fetch.cc:256
cvmfs::Sink * destination_sink
Definition: download.h:169
void SignalWaitingThreads(const int fd, const shash::Any &id, ThreadLocalStorage *tls)
Definition: fetch.cc:270
static BlessedObject Bless(const shash::Any &id)
Definition: cache.h:125
download::JobInfo download_job
Definition: fetch.h:119
Failures error_code
Definition: download.h:288
Failures Fetch(JobInfo *info)
Definition: download.cc:1719
void TLSDestructor(void *data)
Definition: fetch.cc:24
virtual int Dup(int fd)=0
perf::Counter * n_downloads
Definition: fetch.h:163
void Get(uid_t *uid, gid_t *gid, pid_t *pid)
Definition: clientctx.cc:56
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:534
static void size_t size
Definition: smalloc.h:47
pthread_key_t thread_local_storage_
Definition: fetch.h:148
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:546
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:584
const std::string * url
Definition: download.h:151
static ClientCtx * GetInstance()
Definition: clientctx.cc:44