GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/fetch.cc Lines: 136 136 100.0 %
Date: 2019-02-03 02:48:13 Branches: 45 58 77.6 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "cvmfs_config.h"
6
#include "fetch.h"
7
8
#include <unistd.h>
9
10
#include "backoff.h"
11
#include "cache.h"
12
#include "clientctx.h"
13
#include "download.h"
14
#include "logging.h"
15
#include "quota.h"
16
#include "statistics.h"
17
#include "util/posix.h"
18
19
using namespace std;  // NOLINT
20
21
namespace cvmfs {
22
23
8
void TLSDestructor(void *data) {
24
  Fetcher::ThreadLocalStorage *tls =
25
8
    static_cast<Fetcher::ThreadLocalStorage *>(data);
26
  std::vector<Fetcher::ThreadLocalStorage *> *tls_blocks =
27
8
    &tls->fetcher->tls_blocks_;
28
29
8
  pthread_mutex_lock(tls->fetcher->lock_tls_blocks_);
30
16
  for (vector<Fetcher::ThreadLocalStorage *>::iterator i =
31
8
       tls_blocks->begin(), iEnd = tls_blocks->end(); i != iEnd; ++i)
32
  {
33
16
    if ((*i) == tls) {
34
8
      tls_blocks->erase(i);
35
8
      break;
36
    }
37
  }
38
8
  pthread_mutex_unlock(tls->fetcher->lock_tls_blocks_);
39
8
  tls->fetcher->CleanupTls(tls);
40
8
}
41
42
43
/**
44
 * Called when a thread exists, releases a ThreadLocalStorage object and
45
 * removes the pointer to it from tls_blocks_.
46
 */
47
86
void Fetcher::CleanupTls(ThreadLocalStorage *tls) {
48
86
  ClosePipe(tls->pipe_wait);
49
86
  delete tls;
50
86
}
51
52
53
/**
54
 * Initialized thread-local storage if called the first time in a new thread.
55
 */
56
150
Fetcher::ThreadLocalStorage *Fetcher::GetTls() {
57
  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
58
150
    pthread_getspecific(thread_local_storage_));
59
150
  if (tls != NULL)
60
64
    return tls;
61
62
86
  tls = new ThreadLocalStorage();
63
86
  tls->fetcher = this;
64
86
  MakePipe(tls->pipe_wait);
65
86
  tls->download_job.destination = download::kDestinationSink;
66
86
  tls->download_job.compressed = true;
67
86
  tls->download_job.probe_hosts = true;
68
86
  int retval = pthread_setspecific(thread_local_storage_, tls);
69
86
  assert(retval == 0);
70
86
  pthread_mutex_lock(lock_tls_blocks_);
71
86
  tls_blocks_.push_back(tls);
72
86
  pthread_mutex_unlock(lock_tls_blocks_);
73
86
  return tls;
74
}
75
76
77
143
int Fetcher::Fetch(
78
  const shash::Any &id,
79
  const uint64_t size,
80
  const std::string &name,
81
  const zlib::Algorithms compression_algorithm,
82
  const CacheManager::ObjectType object_type,
83
  const std::string &alt_url,
84
  off_t range_offset)
85
{
86
  int fd_return;  // Read-only file descriptor that is returned
87
  int retval;
88
89
  // Try to open from local cache
90
143
  if ((fd_return = OpenSelect(id, name, object_type)) >= 0) {
91
33
    LogCvmfs(kLogCache, kLogDebug, "hit: %s", name.c_str());
92
33
    return fd_return;
93
  }
94
95
110
  ThreadLocalStorage *tls = GetTls();
96
97
  // Synchronization point: either act as a master thread for this object or
98
  // enqueue to the list of waiting threads.
99
110
  pthread_mutex_lock(lock_queues_download_);
100
110
  ThreadQueues::iterator iDownloadQueue = queues_download_.find(id);
101
110
  if (iDownloadQueue != queues_download_.end()) {
102
4
    LogCvmfs(kLogCache, kLogDebug, "waiting for download of %s", name.c_str());
103
104
4
    iDownloadQueue->second->push_back(tls->pipe_wait[1]);
105
4
    pthread_mutex_unlock(lock_queues_download_);
106
4
    ReadPipe(tls->pipe_wait[0], &fd_return, sizeof(int));
107
108
    LogCvmfs(kLogCache, kLogDebug, "received from another thread fd %d for %s",
109
4
             fd_return, name.c_str());
110
4
    return fd_return;
111
  } else {
112
    // Seems we are the first one, check again in the cache (race condition)
113
106
    fd_return = OpenSelect(id, name, object_type);
114
106
    if (fd_return >= 0) {
115
4
      pthread_mutex_unlock(lock_queues_download_);
116
4
      return fd_return;
117
    }
118
119
    // Create a new queue for this chunk
120
102
    queues_download_[id] = &tls->other_pipes_waiting;
121
102
    pthread_mutex_unlock(lock_queues_download_);
122
  }
123
124
102
  perf::Inc(n_downloads);
125
126
  // Involve the download manager
127
102
  LogCvmfs(kLogCache, kLogDebug, "downloading %s", name.c_str());
128
102
  std::string url;
129
102
  if (external_) {
130
8
    url = !alt_url.empty() ? alt_url : name;
131
  } else {
132

94
    url = "/" + (alt_url.size() ? alt_url : "data/" + id.MakePath());
133
  }
134
102
  void *txn = alloca(cache_mgr_->SizeOfTxn());
135
102
  retval = cache_mgr_->StartTxn(id, size, txn);
136
102
  if (retval < 0) {
137
    LogCvmfs(kLogCache, kLogDebug, "could not start transaction on %s",
138
4
             name.c_str());
139
4
    SignalWaitingThreads(retval, id, tls);
140
4
    return retval;
141
  }
142
98
  cache_mgr_->CtrlTxn(CacheManager::ObjectInfo(object_type, name), 0, txn);
143
144
98
  LogCvmfs(kLogCache, kLogDebug, "miss: %s %s", name.c_str(), url.c_str());
145
98
  TransactionSink sink(cache_mgr_, txn);
146
98
  tls->download_job.url = &url;
147
98
  tls->download_job.destination_sink = &sink;
148
98
  tls->download_job.expected_hash = &id;
149
98
  tls->download_job.extra_info = &name;
150
98
  ClientCtx *ctx = ClientCtx::GetInstance();
151
98
  if (ctx->IsSet()) {
152
    ctx->Get(&tls->download_job.uid,
153
             &tls->download_job.gid,
154
4
             &tls->download_job.pid);
155
  }
156
98
  tls->download_job.compressed = (compression_algorithm == zlib::kZlibDefault);
157
98
  tls->download_job.range_offset = range_offset;
158
98
  tls->download_job.range_size = size;
159
98
  download_mgr_->Fetch(&tls->download_job);
160
161
98
  if (tls->download_job.error_code == download::kFailOk) {
162
71
    LogCvmfs(kLogCache, kLogDebug, "finished downloading of %s", url.c_str());
163
164
71
    fd_return = cache_mgr_->OpenFromTxn(txn);
165
71
    if (fd_return < 0) {
166
4
      cache_mgr_->AbortTxn(txn);
167
4
      SignalWaitingThreads(fd_return, id, tls);
168
4
      return fd_return;
169
    }
170
171
67
    retval = cache_mgr_->CommitTxn(txn);
172
67
    if (retval < 0) {
173
8
      cache_mgr_->Close(fd_return);
174
8
      SignalWaitingThreads(retval, id, tls);
175
8
      return retval;
176
    }
177
59
    SignalWaitingThreads(fd_return, id, tls);
178
59
    return fd_return;
179
  }
180
181
  // Download failed
182
  LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
183
           "failed to fetch %s (hash: %s, error %d [%s])", name.c_str(),
184
           id.ToString().c_str(), tls->download_job.error_code,
185
27
           download::Code2Ascii(tls->download_job.error_code));
186
27
  cache_mgr_->AbortTxn(txn);
187
27
  backoff_throttle_->Throttle();
188
27
  SignalWaitingThreads(-EIO, id, tls);
189
27
  return -EIO;
190
}
191
192
193
186
Fetcher::Fetcher(
194
  CacheManager *cache_mgr,
195
  download::DownloadManager *download_mgr,
196
  BackoffThrottle *backoff_throttle,
197
  perf::StatisticsTemplate statistics,
198
  bool external)
199
  : external_(external)
200
  , lock_queues_download_(NULL)
201
  , lock_tls_blocks_(NULL)
202
  , cache_mgr_(cache_mgr)
203
  , download_mgr_(download_mgr)
204
186
  , backoff_throttle_(backoff_throttle)
205
{
206
  int retval;
207
186
  retval = pthread_key_create(&thread_local_storage_, TLSDestructor);
208
186
  assert(retval == 0);
209
  lock_queues_download_ = reinterpret_cast<pthread_mutex_t *>(
210
186
    smalloc(sizeof(pthread_mutex_t)));
211
186
  retval = pthread_mutex_init(lock_queues_download_, NULL);
212
186
  assert(retval == 0);
213
  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
214
186
    smalloc(sizeof(pthread_mutex_t)));
215
186
  retval = pthread_mutex_init(lock_tls_blocks_, NULL);
216
186
  assert(retval == 0);
217
  n_downloads = statistics.RegisterTemplated("n_downloads",
218
186
    "overall number of downloaded files (incl. catalogs, chunks)");
219
186
}
220
221
222
186
Fetcher::~Fetcher() {
223
  int retval;
224
225
186
  pthread_mutex_lock(lock_tls_blocks_);
226
264
  for (unsigned i = 0; i < tls_blocks_.size(); ++i)
227
78
    CleanupTls(tls_blocks_[i]);
228
186
  pthread_mutex_unlock(lock_tls_blocks_);
229
230
186
  retval = pthread_mutex_destroy(lock_tls_blocks_);
231
186
  assert(retval == 0);
232
186
  free(lock_tls_blocks_);
233
234
186
  retval = pthread_mutex_destroy(lock_queues_download_);
235
186
  assert(retval == 0);
236
186
  free(lock_queues_download_);
237
238
186
  retval = pthread_key_delete(thread_local_storage_);
239
186
  assert(retval == 0);
240
}
241
242
243
/**
244
 * Depending on the object type, uses either Open() or OpenPinned() from the
245
 * cache manager
246
 */
247
249
int Fetcher::OpenSelect(
248
  const shash::Any &id,
249
  const std::string &name,
250
  const CacheManager::ObjectType object_type)
251
{
252
249
  bool is_catalog = object_type == CacheManager::kTypeCatalog;
253

249
  if (is_catalog || (object_type == CacheManager::kTypePinned)) {
254
139
    return cache_mgr_->OpenPinned(id, name, is_catalog);
255
  } else {
256
110
    return cache_mgr_->Open(CacheManager::Bless(id, object_type, name));
257
  }
258
}
259
260
261
114
void Fetcher::SignalWaitingThreads(
262
  const int fd,
263
  const shash::Any &id,
264
  ThreadLocalStorage *tls)
265
{
266
114
  pthread_mutex_lock(lock_queues_download_);
267
130
  for (unsigned i = 0, s = tls->other_pipes_waiting.size(); i < s; ++i) {
268
16
    int fd_dup = (fd >= 0) ? cache_mgr_->Dup(fd) : fd;
269
16
    WritePipe(tls->other_pipes_waiting[i], &fd_dup, sizeof(int));
270
  }
271
114
  tls->other_pipes_waiting.clear();
272
114
  queues_download_.erase(id);
273
114
  pthread_mutex_unlock(lock_queues_download_);
274
114
}
275
276
}  // namespace cvmfs