CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_stream.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "cache_stream.h"
7 
8 #include <algorithm>
9 #include <cstdlib>
10 #include <cstring>
11 #include <string>
12 
13 #include "clientctx.h"
14 #include "network/download.h"
15 #include "network/sink.h"
16 #include "quota.h"
17 #include "util/mutex.h"
18 #include "util/smalloc.h"
19 
20 
21 namespace {
22 
23 class StreamingSink : public cvmfs::Sink {
24  public:
25  StreamingSink(void *buf, uint64_t size, uint64_t offset)
26  : Sink(false /* is_owner */)
27  , pos_(0)
28  , window_buf_(buf)
29  , window_size_(size)
30  , window_offset_(offset)
31  { }
32 
33  virtual ~StreamingSink() {}
34 
35  virtual int64_t Write(const void *buf, uint64_t sz) {
36  uint64_t old_pos = pos_;
37  pos_ += sz;
38 
39  if (!window_buf_)
40  return static_cast<int64_t>(sz);
41 
42  if (pos_ < window_offset_)
43  return static_cast<int64_t>(sz);
44 
45  if (old_pos >= (window_offset_ + window_size_))
46  return static_cast<int64_t>(sz);
47 
48  uint64_t copy_offset = std::max(old_pos, window_offset_);
49  uint64_t inbuf_offset = copy_offset - old_pos;
50  uint64_t outbuf_offset = copy_offset - window_offset_;
51  uint64_t copy_size =
52  std::min(sz - inbuf_offset, window_size_ - outbuf_offset);
53 
54  memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
55  reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
56  copy_size);
57 
58  return static_cast<int64_t>(sz);
59  }
60 
61  virtual int Reset() {
62  pos_ = 0;
63  return 0;
64  }
65 
66  virtual int Purge() { return Reset(); }
67  virtual bool IsValid() {
68  return (window_buf_ != NULL) || (window_size_ == 0);
69  }
70  virtual int Flush() { return 0; }
71  virtual bool Reserve(size_t /* size */) { return true; }
72  virtual bool RequiresReserve() { return false; }
73  virtual std::string Describe() {
74  std::string result = "Streaming sink that is ";
75  result += IsValid() ? "valid" : "invalid";
76  return result;
77  }
78 
79  int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); }
80 
81  private:
82  uint64_t pos_;
83  void *window_buf_;
84  uint64_t window_size_;
85  uint64_t window_offset_;
86 }; // class StreamingSink
87 
88 } // anonymous namespace
89 
90 
92  const FdInfo &info)
93 {
94  if (info.label.IsExternal())
96  return regular_download_mgr_;
97 }
98 
100  const FdInfo &info,
101  void *buf,
102  uint64_t size,
103  uint64_t offset)
104 {
105  StreamingSink sink(buf, size, offset);
106  std::string url;
107  if (info.label.IsExternal()) {
108  url = info.label.path;
109  } else {
110  url = "/data/" + info.object_id.MakePath();
111  }
112  bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault;
113 
114  download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */,
115  &info.object_id, &sink);
116  download_job.SetExtraInfo(&info.label.path);
117  download_job.SetRangeOffset(info.label.range_offset);
118  download_job.SetRangeSize(static_cast<int64_t>(info.label.size));
120  if (ctx->IsSet()) {
121  ctx->Get(download_job.GetUidPtr(),
122  download_job.GetGidPtr(),
123  download_job.GetPidPtr(),
124  download_job.GetInterruptCuePtr());
125  }
126 
127  SelectDownloadManager(info)->Fetch(&download_job);
128 
129  if (download_job.error_code() != download::kFailOk) {
130  return -EIO;
131  }
132 
133  return sink.GetNBytesStreamed();
134 }
135 
136 
138  unsigned max_open_fds,
139  CacheManager *cache_mgr,
140  download::DownloadManager *regular_download_mgr,
141  download::DownloadManager *external_download_mgr)
142  : cache_mgr_(cache_mgr)
143  , regular_download_mgr_(regular_download_mgr)
144  , external_download_mgr_(external_download_mgr)
145  , fd_table_(max_open_fds, FdInfo())
146 {
148  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
149  int retval = pthread_mutex_init(lock_fd_table_, NULL);
150  assert(retval == 0);
151 
152  delete quota_mgr_;
153  quota_mgr_ = cache_mgr_->quota_mgr();
154 }
155 
157  pthread_mutex_destroy(lock_fd_table_);
158  free(lock_fd_table_);
159  quota_mgr_ = NULL; // gets deleted by cache_mgr_
160 }
161 
163  return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe();
164 }
165 
167  bool result = cache_mgr_->AcquireQuotaManager(quota_mgr);
168  if (result)
169  quota_mgr_ = cache_mgr_->quota_mgr();
170  return result;
171 }
172 
174  int fd_in_cache_mgr = cache_mgr_->Open(object);
175  if (fd_in_cache_mgr >= 0) {
176  MutexLockGuard lock_guard(lock_fd_table_);
177  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
178  }
179 
180  if (fd_in_cache_mgr != -ENOENT)
181  return fd_in_cache_mgr;
182 
183  if (object.label.IsCatalog() || object.label.IsPinned() ||
184  object.label.IsCertificate())
185  {
186  return -ENOENT;
187  }
188 
189  MutexLockGuard lock_guard(lock_fd_table_);
190  return fd_table_.OpenFd(FdInfo(object));
191 }
192 
193 int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) {
194  MutexLockGuard lock_guard(lock_fd_table_);
195  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
196 }
197 
199  FdInfo info;
200  {
201  MutexLockGuard lock_guard(lock_fd_table_);
202  info = fd_table_.GetHandle(fd);
203  }
204 
205  if (!info.IsValid())
206  return -EBADF;
207 
208  if (info.fd_in_cache_mgr >= 0)
209  return cache_mgr_->GetSize(info.fd_in_cache_mgr);
210 
211  return Stream(info, NULL, 0, 0);
212 }
213 
215  FdInfo info;
216 
217  MutexLockGuard lock_guard(lock_fd_table_);
218  info = fd_table_.GetHandle(fd);
219 
220  if (!info.IsValid())
221  return -EBADF;
222 
223  if (info.fd_in_cache_mgr >= 0) {
224  int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr);
225  if (dup_fd < 0)
226  return dup_fd;
227  return fd_table_.OpenFd(FdInfo(dup_fd));
228  }
229 
230  return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label)));
231 }
232 
234  FdInfo info;
235  {
236  MutexLockGuard lock_guard(lock_fd_table_);
237  info = fd_table_.GetHandle(fd);
238  if (!info.IsValid())
239  return -EBADF;
240  fd_table_.CloseFd(fd);
241  }
242 
243  if (info.fd_in_cache_mgr >= 0)
244  return cache_mgr_->Close(info.fd_in_cache_mgr);
245 
246  return 0;
247 }
248 
250  int fd, void *buf, uint64_t size, uint64_t offset)
251 {
252  FdInfo info;
253  {
254  MutexLockGuard lock_guard(lock_fd_table_);
255  info = fd_table_.GetHandle(fd);
256  }
257 
258  if (!info.IsValid())
259  return -EBADF;
260 
261  if (info.fd_in_cache_mgr >= 0)
262  return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset);
263 
264  int64_t nbytes_streamed = Stream(info, buf, size, offset);
265  if (nbytes_streamed < 0)
266  return nbytes_streamed;
267  if (static_cast<uint64_t>(nbytes_streamed) < offset)
268  return 0;
269  if (static_cast<uint64_t>(nbytes_streamed) > (offset + size))
270  return size;
271  return nbytes_streamed - offset;
272 }
273 
275  FdInfo info;
276  {
277  MutexLockGuard lock_guard(lock_fd_table_);
278  info = fd_table_.GetHandle(fd);
279  }
280 
281  if (!info.IsValid())
282  return -EBADF;
283 
284  if (info.fd_in_cache_mgr >= 0)
285  return cache_mgr_->Readahead(info.fd_in_cache_mgr);
286 
287  return 0;
288 }
289 
291  int fd = cache_mgr_->OpenFromTxn(txn);
292  if (fd < 0)
293  return fd;
294 
295  MutexLockGuard lock_guard(lock_fd_table_);
296  return fd_table_.OpenFd(FdInfo(fd));
297 }
298 
300  SavedState *state = new SavedState();
301  state->fd_table = fd_table_.Clone();
302  state->state_backing_cachemgr = cache_mgr_->SaveState(-1);
303  return state;
304 }
305 
307  // When DoRestoreState is called, we have fd 0 assigned to the root file
308  // catalog
309  FdInfo handle_root = fd_table_.GetHandle(0);
310 
311  SavedState *state = reinterpret_cast<SavedState *>(data);
312 
313  int new_backing_root_fd =
314  cache_mgr_->RestoreState(-1, state->state_backing_cachemgr);
315  fd_table_.AssignFrom(*state->fd_table);
316 
317  int new_root_fd = -1;
318  if (handle_root.IsValid()) {
319  if (new_backing_root_fd >= 0)
320  handle_root.fd_in_cache_mgr = new_backing_root_fd;
321  new_root_fd = fd_table_.OpenFd(handle_root);
322  // There must be a free file descriptor because the root file catalog gets
323  // closed before a reload
324  assert(new_root_fd >= 0);
325  }
326  return new_root_fd;
327 }
328 
330  SavedState *state = reinterpret_cast<SavedState *>(data);
331  cache_mgr_->FreeState(-1, state->state_backing_cachemgr);
332  delete state->fd_table;
333  delete state;
334  return true;
335 }
336 
338  *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr;
339  return cache_mgr_.Release();
340 }
virtual int Close(int fd)
struct cvmcache_context * ctx
int PlantFd(int fd_in_cache_mgr)
off_t range_offset
Definition: cache.h:127
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
uid_t * GetUidPtr()
Definition: jobinfo.h:159
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
Definition: clientctx.cc:57
pthread_mutex_t * lock_fd_table_
Definition: cache_stream.h:139
void SetRangeOffset(off_t range_offset)
Definition: jobinfo.h:233
assert((mem||(size==0))&&"Out Of Memory")
bool IsSet()
Definition: clientctx.cc:74
download::DownloadManager * external_download_mgr_
Definition: cache_stream.h:137
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
Definition: cache_stream.h:136
virtual int Dup(int fd)
FdTable< FdInfo > fd_table_
Definition: cache_stream.h:140
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
Definition: cache.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
InterruptCue ** GetInterruptCuePtr()
Definition: jobinfo.h:161
void SetRangeSize(off_t range_size)
Definition: jobinfo.h:234
virtual int64_t Write(const void *buf, uint64_t sz)
Definition: cache_stream.cc:35
Failures error_code() const
Definition: jobinfo.h:200
UniquePtr< CacheManager > cache_mgr_
Definition: cache_stream.h:135
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
Definition: cache_stream.cc:91
pid_t * GetPidPtr()
Definition: jobinfo.h:158
virtual ~StreamingCacheManager()
CacheManager::Label label
Definition: cache_stream.h:100
zlib::Algorithms zip_algorithm
Definition: cache.h:126
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
Definition: jobinfo.h:230
virtual int Readahead(int fd)
T * Release()
Definition: pointer.h:44
Failures Fetch(JobInfo *info)
Definition: download.cc:1860
bool IsExternal() const
Definition: cache.h:104
QuotaManager * quota_mgr_
Definition: cache.h:235
Definition: mutex.h:42
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr)
virtual int Open(const LabeledObject &object)
std::string path
Definition: cache.h:133
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
StreamingSink(void *buf, uint64_t size, uint64_t offset)
Definition: cache_stream.cc:25
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
Definition: cache_stream.cc:99
std::string MakePath() const
Definition: hash.h:316
static void size_t size
Definition: smalloc.h:54
virtual void * DoSaveState()
static ClientCtx * GetInstance()
Definition: clientctx.cc:45
gid_t * GetGidPtr()
Definition: jobinfo.h:160