CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_stream.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "cache_stream.h"
7 
8 #include <algorithm>
9 #include <cstdlib>
10 #include <cstring>
11 #include <string>
12 
13 #include "network/download.h"
14 #include "network/sink.h"
15 #include "quota.h"
16 #include "util/mutex.h"
17 #include "util/smalloc.h"
18 
19 
20 namespace {
21 
22 class StreamingSink : public cvmfs::Sink {
23  public:
24  StreamingSink(void *buf, uint64_t size, uint64_t offset)
25  : Sink(false /* is_owner */)
26  , pos_(0)
27  , window_buf_(buf)
28  , window_size_(size)
29  , window_offset_(offset)
30  { }
31 
32  virtual ~StreamingSink() {}
33 
34  virtual int64_t Write(const void *buf, uint64_t sz) {
35  uint64_t old_pos = pos_;
36  pos_ += sz;
37 
38  if (!window_buf_)
39  return static_cast<int64_t>(sz);
40 
41  if (pos_ < window_offset_)
42  return static_cast<int64_t>(sz);
43 
44  if (old_pos >= (window_offset_ + window_size_))
45  return static_cast<int64_t>(sz);
46 
47  uint64_t copy_offset = std::max(old_pos, window_offset_);
48  uint64_t inbuf_offset = copy_offset - old_pos;
49  uint64_t outbuf_offset = copy_offset - window_offset_;
50  uint64_t copy_size =
51  std::min(sz - inbuf_offset, window_size_ - outbuf_offset);
52 
53  memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
54  reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
55  copy_size);
56 
57  return static_cast<int64_t>(sz);
58  }
59 
60  virtual int Reset() {
61  pos_ = 0;
62  return 0;
63  }
64 
65  virtual int Purge() { return Reset(); }
66  virtual bool IsValid() {
67  return (window_buf_ != NULL) || (window_size_ == 0);
68  }
69  virtual int Flush() { return 0; }
70  virtual bool Reserve(size_t /* size */) { return true; }
71  virtual bool RequiresReserve() { return false; }
72  virtual std::string Describe() {
73  std::string result = "Streaming sink that is ";
74  result += IsValid() ? "valid" : "invalid";
75  return result;
76  }
77 
78  int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); }
79 
80  private:
81  uint64_t pos_;
82  void *window_buf_;
83  uint64_t window_size_;
84  uint64_t window_offset_;
85 }; // class StreamingSink
86 
87 } // anonymous namespace
88 
89 
91  const FdInfo &info)
92 {
93  if (info.label.IsExternal())
95  return regular_download_mgr_;
96 }
97 
99  const FdInfo &info,
100  void *buf,
101  uint64_t size,
102  uint64_t offset)
103 {
104  StreamingSink sink(buf, size, offset);
105  std::string url;
106  if (info.label.IsExternal()) {
107  url = info.label.path;
108  } else {
109  url = "/data/" + info.object_id.MakePath();
110  }
111  bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault;
112 
113  download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */,
114  &info.object_id, &sink);
115  download_job.SetExtraInfo(&info.label.path);
116  download_job.SetRangeOffset(info.label.range_offset);
117  download_job.SetRangeSize(static_cast<int64_t>(info.label.size));
118  SelectDownloadManager(info)->Fetch(&download_job);
119 
120  if (download_job.error_code() != download::kFailOk) {
121  return -EIO;
122  }
123 
124  return sink.GetNBytesStreamed();
125 }
126 
127 
129  unsigned max_open_fds,
130  CacheManager *cache_mgr,
131  download::DownloadManager *regular_download_mgr,
132  download::DownloadManager *external_download_mgr)
133  : cache_mgr_(cache_mgr)
134  , regular_download_mgr_(regular_download_mgr)
135  , external_download_mgr_(external_download_mgr)
136  , fd_table_(max_open_fds, FdInfo())
137 {
139  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
140  int retval = pthread_mutex_init(lock_fd_table_, NULL);
141  assert(retval == 0);
142 
143  delete quota_mgr_;
144  quota_mgr_ = cache_mgr_->quota_mgr();
145 }
146 
148  pthread_mutex_destroy(lock_fd_table_);
149  free(lock_fd_table_);
150  quota_mgr_ = NULL; // gets deleted by cache_mgr_
151 }
152 
154  return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe();
155 }
156 
158  bool result = cache_mgr_->AcquireQuotaManager(quota_mgr);
159  if (result)
160  quota_mgr_ = cache_mgr_->quota_mgr();
161  return result;
162 }
163 
165  int fd_in_cache_mgr = cache_mgr_->Open(object);
166  if (fd_in_cache_mgr >= 0) {
167  MutexLockGuard lock_guard(lock_fd_table_);
168  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
169  }
170 
171  if (fd_in_cache_mgr != -ENOENT)
172  return fd_in_cache_mgr;
173 
174  if (object.label.IsCatalog() || object.label.IsPinned() ||
175  object.label.IsCertificate())
176  {
177  return -ENOENT;
178  }
179 
180  MutexLockGuard lock_guard(lock_fd_table_);
181  return fd_table_.OpenFd(FdInfo(object));
182 }
183 
184 int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) {
185  MutexLockGuard lock_guard(lock_fd_table_);
186  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
187 }
188 
190  FdInfo info;
191  {
192  MutexLockGuard lock_guard(lock_fd_table_);
193  info = fd_table_.GetHandle(fd);
194  }
195 
196  if (!info.IsValid())
197  return -EBADF;
198 
199  if (info.fd_in_cache_mgr >= 0)
200  return cache_mgr_->GetSize(info.fd_in_cache_mgr);
201 
202  return Stream(info, NULL, 0, 0);
203 }
204 
206  FdInfo info;
207 
208  MutexLockGuard lock_guard(lock_fd_table_);
209  info = fd_table_.GetHandle(fd);
210 
211  if (!info.IsValid())
212  return -EBADF;
213 
214  if (info.fd_in_cache_mgr >= 0) {
215  int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr);
216  if (dup_fd < 0)
217  return dup_fd;
218  return fd_table_.OpenFd(FdInfo(dup_fd));
219  }
220 
221  return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label)));
222 }
223 
225  FdInfo info;
226  {
227  MutexLockGuard lock_guard(lock_fd_table_);
228  info = fd_table_.GetHandle(fd);
229  if (!info.IsValid())
230  return -EBADF;
231  fd_table_.CloseFd(fd);
232  }
233 
234  if (info.fd_in_cache_mgr >= 0)
235  return cache_mgr_->Close(info.fd_in_cache_mgr);
236 
237  return 0;
238 }
239 
241  int fd, void *buf, uint64_t size, uint64_t offset)
242 {
243  FdInfo info;
244  {
245  MutexLockGuard lock_guard(lock_fd_table_);
246  info = fd_table_.GetHandle(fd);
247  }
248 
249  if (!info.IsValid())
250  return -EBADF;
251 
252  if (info.fd_in_cache_mgr >= 0)
253  return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset);
254 
255  uint64_t nbytes_streamed = Stream(info, buf, size, offset);
256  if (nbytes_streamed < offset)
257  return 0;
258  if (nbytes_streamed > (offset + size))
259  return static_cast<int64_t>(size);
260  return static_cast<int64_t>(nbytes_streamed - offset);
261 }
262 
264  FdInfo info;
265  {
266  MutexLockGuard lock_guard(lock_fd_table_);
267  info = fd_table_.GetHandle(fd);
268  }
269 
270  if (!info.IsValid())
271  return -EBADF;
272 
273  if (info.fd_in_cache_mgr >= 0)
274  return cache_mgr_->Readahead(info.fd_in_cache_mgr);
275 
276  return 0;
277 }
278 
280  int fd = cache_mgr_->OpenFromTxn(txn);
281  if (fd < 0)
282  return fd;
283 
284  MutexLockGuard lock_guard(lock_fd_table_);
285  return fd_table_.OpenFd(FdInfo(fd));
286 }
287 
289  SavedState *state = new SavedState();
290  state->fd_table = fd_table_.Clone();
291  state->state_backing_cachemgr = cache_mgr_->SaveState(-1);
292  return state;
293 }
294 
296  // When DoRestoreState is called, we have fd 0 assigned to the root file
297  // catalog
298  FdInfo handle_root = fd_table_.GetHandle(0);
299 
300  SavedState *state = reinterpret_cast<SavedState *>(data);
301 
302  int new_backing_root_fd =
303  cache_mgr_->RestoreState(-1, state->state_backing_cachemgr);
304  fd_table_.AssignFrom(*state->fd_table);
305 
306  int new_root_fd = -1;
307  if (handle_root.IsValid()) {
308  if (new_backing_root_fd >= 0)
309  handle_root.fd_in_cache_mgr = new_backing_root_fd;
310  new_root_fd = fd_table_.OpenFd(handle_root);
311  // There must be a free file descriptor because the root file catalog gets
312  // closed before a reload
313  assert(new_root_fd >= 0);
314  }
315  return new_root_fd;
316 }
317 
319  SavedState *state = reinterpret_cast<SavedState *>(data);
320  cache_mgr_->FreeState(-1, state->state_backing_cachemgr);
321  delete state->fd_table;
322  delete state;
323  return true;
324 }
325 
327  *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr;
328  return cache_mgr_.Release();
329 }
virtual int Close(int fd)
int PlantFd(int fd_in_cache_mgr)
off_t range_offset
Definition: cache.h:127
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
pthread_mutex_t * lock_fd_table_
Definition: cache_stream.h:139
void SetRangeOffset(off_t range_offset)
Definition: jobinfo.h:233
assert((mem||(size==0))&&"Out Of Memory")
download::DownloadManager * external_download_mgr_
Definition: cache_stream.h:137
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
Definition: cache_stream.h:136
virtual int Dup(int fd)
FdTable< FdInfo > fd_table_
Definition: cache_stream.h:140
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
Definition: cache.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
void SetRangeSize(off_t range_size)
Definition: jobinfo.h:234
virtual int64_t Write(const void *buf, uint64_t sz)
Definition: cache_stream.cc:34
Failures error_code() const
Definition: jobinfo.h:200
UniquePtr< CacheManager > cache_mgr_
Definition: cache_stream.h:135
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
Definition: cache_stream.cc:90
virtual ~StreamingCacheManager()
CacheManager::Label label
Definition: cache_stream.h:100
zlib::Algorithms zip_algorithm
Definition: cache.h:126
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
Definition: jobinfo.h:230
virtual int Readahead(int fd)
T * Release()
Definition: pointer.h:44
Failures Fetch(JobInfo *info)
Definition: download.cc:1860
bool IsExternal() const
Definition: cache.h:104
QuotaManager * quota_mgr_
Definition: cache.h:235
Definition: mutex.h:42
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr)
virtual int Open(const LabeledObject &object)
std::string path
Definition: cache.h:133
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
StreamingSink(void *buf, uint64_t size, uint64_t offset)
Definition: cache_stream.cc:24
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
Definition: cache_stream.cc:98
std::string MakePath() const
Definition: hash.h:316
static void size_t size
Definition: smalloc.h:54
virtual void * DoSaveState()