CernVM-FS  2.13.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_stream.cc
Go to the documentation of this file.
1 
6 #include "cache_stream.h"
7 
8 #include <algorithm>
9 #include <cstdlib>
10 #include <cstring>
11 #include <string>
12 
13 #include "clientctx.h"
14 #include "network/download.h"
15 #include "network/sink.h"
16 #include "quota.h"
17 #include "statistics.h"
18 #include "util/mutex.h"
19 #include "util/platform.h"
20 #include "util/smalloc.h"
21 
22 
23 namespace {
24 
25 class StreamingSink : public cvmfs::Sink {
26  public:
27  StreamingSink(void *buf, uint64_t size, uint64_t offset,
28  unsigned char *object)
29  : Sink(false /* is_owner */)
30  , pos_(0)
31  , window_buf_(buf)
32  , window_size_(size)
33  , window_offset_(offset)
34  , object_(object) { }
35 
36  virtual ~StreamingSink() { }
37 
38  virtual int64_t Write(const void *buf, uint64_t sz) {
39  if (object_) {
40  memcpy(object_ + pos_, buf, sz);
41  }
42 
43  uint64_t old_pos = pos_;
44  pos_ += sz;
45 
46  if (!window_buf_)
47  return static_cast<int64_t>(sz);
48 
49  if (pos_ < window_offset_)
50  return static_cast<int64_t>(sz);
51 
52  if (old_pos >= (window_offset_ + window_size_))
53  return static_cast<int64_t>(sz);
54 
55  uint64_t copy_offset = std::max(old_pos, window_offset_);
56  uint64_t inbuf_offset = copy_offset - old_pos;
57  uint64_t outbuf_offset = copy_offset - window_offset_;
58  uint64_t copy_size = std::min(sz - inbuf_offset,
59  window_size_ - outbuf_offset);
60 
61  memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
62  reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
63  copy_size);
64 
65  return static_cast<int64_t>(sz);
66  }
67 
68  virtual int Reset() {
69  pos_ = 0;
70  return 0;
71  }
72 
73  virtual int Purge() { return Reset(); }
74  virtual bool IsValid() {
75  return (window_buf_ != NULL) || (window_size_ == 0);
76  }
77  virtual int Flush() { return 0; }
78  virtual bool Reserve(size_t /* size */) { return true; }
79  virtual bool RequiresReserve() { return false; }
80  virtual std::string Describe() {
81  std::string result = "Streaming sink that is ";
82  result += IsValid() ? "valid" : "invalid";
83  return result;
84  }
85 
86  int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); }
87 
88  private:
89  uint64_t pos_;
90  void *window_buf_;
91  uint64_t window_size_;
92  uint64_t window_offset_;
93  unsigned char *object_;
94 }; // class StreamingSink
95 
96 static inline uint32_t hasher_any(const shash::Any &key) {
97  return *const_cast<uint32_t *>(reinterpret_cast<const uint32_t *>(key.digest)
98  + 1);
99 }
100 
101 } // anonymous namespace
102 
103 
104 const size_t StreamingCacheManager::kDefaultBufferSize = 64 * 1024 * 1024;
105 
106 
108  sz_transferred_bytes = statistics->Register(
109  "streaming_cache_mgr.sz_transferred_bytes",
110  "Number of bytes downloaded by the streaming cache manager");
111  sz_transfer_ms = statistics->Register(
112  "streaming_cache_mgr.sz_transfer_ms",
113  "Time spent downloading data by the streaming cache manager");
114  n_downloads = statistics->Register("streaming_cache_mgr.n_downloads",
115  "Number of objects requested remotely");
116  n_buffer_hits = statistics->Register(
117  "streaming_cache_mgr.n_buffer_hits",
118  "Number of requests served from the buffer");
119  n_buffer_evicts = statistics->Register(
120  "streaming_cache_mgr.n_buffer_evicts",
121  "Number of objects evicted from the buffer");
122  n_buffer_objects = statistics->Register(
123  "streaming_cache_mgr.n_buffer_objects",
124  "Number of objects in the buffer");
125  n_buffer_obstacles = statistics->Register(
126  "streaming_cache_mgr.n_buffer_obstacles",
127  "Number of objects that could not be stored in the buffer "
128  "(e.g., too large)");
129 }
130 
131 
133  const FdInfo &info) {
134  if (info.label.IsExternal())
135  return external_download_mgr_;
136  return regular_download_mgr_;
137 }
138 
139 
141  void *buf,
142  uint64_t size,
143  uint64_t offset) {
144  // Note: objects stored in the ring buffer are prepended by their hash
145 
146  {
149  if (buffered_objects_.Lookup(info.object_id, &handle)) {
150  perf::Inc(counters_->n_buffer_hits);
151  buffer_->CopySlice(handle, size, offset + sizeof(shash::Any), buf);
152  return buffer_->GetObjectSize(handle) - sizeof(shash::Any);
153  }
154  }
155 
156  unsigned char *object = NULL;
157  size_t nbytes_in_buffer = 0;
159  && (info.label.size + sizeof(shash::Any)
160  <= buffer_->GetMaxObjectSize())) {
161  nbytes_in_buffer = sizeof(shash::Any) + info.label.size;
162  object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer));
163  } else {
164  perf::Inc(counters_->n_buffer_obstacles);
165  }
166 
167  StreamingSink sink(buf, size, offset,
168  object ? (object + sizeof(shash::Any)) : NULL);
169  std::string url;
170  if (info.label.IsExternal()) {
171  url = info.label.path;
172  } else {
173  url = "/data/" + info.object_id.MakePath();
174  }
175  bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault;
176 
177  download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */,
178  &info.object_id, &sink);
179  download_job.SetExtraInfo(&info.label.path);
180  download_job.SetRangeOffset(info.label.range_offset);
181  download_job.SetRangeSize(static_cast<int64_t>(info.label.size));
183  if (ctx->IsSet()) {
184  ctx->Get(download_job.GetUidPtr(),
185  download_job.GetGidPtr(),
186  download_job.GetPidPtr(),
187  download_job.GetInterruptCuePtr());
188  }
189 
190  {
191  uint64_t timestamp = platform_monotonic_time_ns();
192  SelectDownloadManager(info)->Fetch(&download_job);
193  perf::Xadd(counters_->sz_transfer_ms,
194  (platform_monotonic_time_ns() - timestamp) / (1000 * 1000));
195  }
196 
197  perf::Inc(counters_->n_downloads);
198  perf::Xadd(counters_->sz_transferred_bytes, sink.GetNBytesStreamed());
199 
200  if (download_job.error_code() != download::kFailOk) {
201  free(object);
202  return -EIO;
203  }
204 
205  if (object) {
206  memcpy(object, &info.object_id, sizeof(shash::Any));
208  while (!buffer_->HasSpaceFor(nbytes_in_buffer)) {
209  RingBuffer::ObjectHandle_t deleted_handle = buffer_->RemoveBack();
210  // As long as we don't add any new objects, the deleted_handle can still
211  // be accessed
212  shash::Any deleted_hash;
213  buffer_->CopySlice(deleted_handle, sizeof(shash::Any), 0, &deleted_hash);
214  buffered_objects_.Erase(deleted_hash);
215  perf::Inc(counters_->n_buffer_evicts);
216  perf::Dec(counters_->n_buffer_objects);
217  }
218  RingBuffer::ObjectHandle_t handle = buffer_->PushFront(object,
219  nbytes_in_buffer);
220  buffered_objects_.Insert(info.object_id, handle);
221  perf::Inc(counters_->n_buffer_objects);
222  }
223  free(object);
224 
225  return sink.GetNBytesStreamed();
226 }
227 
228 
230  unsigned max_open_fds,
231  CacheManager *cache_mgr,
232  download::DownloadManager *regular_download_mgr,
233  download::DownloadManager *external_download_mgr,
234  size_t buffer_size,
235  perf::Statistics *statistics)
236  : cache_mgr_(cache_mgr)
237  , regular_download_mgr_(regular_download_mgr)
238  , external_download_mgr_(external_download_mgr)
239  , fd_table_(max_open_fds, FdInfo())
240  , counters_(new Counters(statistics)) {
241  lock_fd_table_ = reinterpret_cast<pthread_mutex_t *>(
242  smalloc(sizeof(pthread_mutex_t)));
243  int retval = pthread_mutex_init(lock_fd_table_, NULL);
244  assert(retval == 0);
245 
246  delete quota_mgr_;
247  quota_mgr_ = cache_mgr_->quota_mgr();
248 
249  buffer_ = new RingBuffer(buffer_size);
251  lock_buffer_ = reinterpret_cast<pthread_mutex_t *>(
252  smalloc(sizeof(pthread_mutex_t)));
253  retval = pthread_mutex_init(lock_buffer_, NULL);
254  assert(retval == 0);
255 }
256 
258  pthread_mutex_destroy(lock_buffer_);
259  free(lock_buffer_);
260  pthread_mutex_destroy(lock_fd_table_);
261  free(lock_fd_table_);
262  quota_mgr_ = NULL; // gets deleted by cache_mgr_
263 }
264 
266  return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe();
267 }
268 
270  bool result = cache_mgr_->AcquireQuotaManager(quota_mgr);
271  if (result)
272  quota_mgr_ = cache_mgr_->quota_mgr();
273  return result;
274 }
275 
277  int fd_in_cache_mgr = cache_mgr_->Open(object);
278  if (fd_in_cache_mgr >= 0) {
279  MutexLockGuard lock_guard(lock_fd_table_);
280  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
281  }
282 
283  if (fd_in_cache_mgr != -ENOENT)
284  return fd_in_cache_mgr;
285 
286  if (object.label.IsCatalog() || object.label.IsPinned()
287  || object.label.IsCertificate()) {
288  return -ENOENT;
289  }
290 
291  MutexLockGuard lock_guard(lock_fd_table_);
292  return fd_table_.OpenFd(FdInfo(object));
293 }
294 
295 int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) {
296  MutexLockGuard lock_guard(lock_fd_table_);
297  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
298 }
299 
301  FdInfo info;
302  {
303  MutexLockGuard lock_guard(lock_fd_table_);
304  info = fd_table_.GetHandle(fd);
305  }
306 
307  if (!info.IsValid())
308  return -EBADF;
309 
310  if (info.fd_in_cache_mgr >= 0)
311  return cache_mgr_->GetSize(info.fd_in_cache_mgr);
312 
313  return Stream(info, NULL, 0, 0);
314 }
315 
317  FdInfo info;
318 
319  MutexLockGuard lock_guard(lock_fd_table_);
320  info = fd_table_.GetHandle(fd);
321 
322  if (!info.IsValid())
323  return -EBADF;
324 
325  if (info.fd_in_cache_mgr >= 0) {
326  int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr);
327  if (dup_fd < 0)
328  return dup_fd;
329  return fd_table_.OpenFd(FdInfo(dup_fd));
330  }
331 
332  return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label)));
333 }
334 
336  FdInfo info;
337  {
338  MutexLockGuard lock_guard(lock_fd_table_);
339  info = fd_table_.GetHandle(fd);
340  if (!info.IsValid())
341  return -EBADF;
342  fd_table_.CloseFd(fd);
343  }
344 
345  if (info.fd_in_cache_mgr >= 0)
346  return cache_mgr_->Close(info.fd_in_cache_mgr);
347 
348  return 0;
349 }
350 
351 int64_t StreamingCacheManager::Pread(int fd, void *buf, uint64_t size,
352  uint64_t offset) {
353  FdInfo info;
354  {
355  MutexLockGuard lock_guard(lock_fd_table_);
356  info = fd_table_.GetHandle(fd);
357  }
358 
359  if (!info.IsValid())
360  return -EBADF;
361 
362  if (info.fd_in_cache_mgr >= 0)
363  return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset);
364 
365  int64_t nbytes_streamed = Stream(info, buf, size, offset);
366  if (nbytes_streamed < 0)
367  return nbytes_streamed;
368  if (static_cast<uint64_t>(nbytes_streamed) < offset)
369  return 0;
370  if (static_cast<uint64_t>(nbytes_streamed) > (offset + size))
371  return size;
372  return nbytes_streamed - offset;
373 }
374 
376  FdInfo info;
377  {
378  MutexLockGuard lock_guard(lock_fd_table_);
379  info = fd_table_.GetHandle(fd);
380  }
381 
382  if (!info.IsValid())
383  return -EBADF;
384 
385  if (info.fd_in_cache_mgr >= 0)
386  return cache_mgr_->Readahead(info.fd_in_cache_mgr);
387 
388  return 0;
389 }
390 
392  int fd = cache_mgr_->OpenFromTxn(txn);
393  if (fd < 0)
394  return fd;
395 
396  MutexLockGuard lock_guard(lock_fd_table_);
397  return fd_table_.OpenFd(FdInfo(fd));
398 }
399 
401  SavedState *state = new SavedState();
402  state->fd_table = fd_table_.Clone();
403  state->state_backing_cachemgr = cache_mgr_->SaveState(-1);
404  return state;
405 }
406 
408  // When DoRestoreState is called, we have fd 0 assigned to the root file
409  // catalog
410  FdInfo handle_root = fd_table_.GetHandle(0);
411 
412  SavedState *state = reinterpret_cast<SavedState *>(data);
413 
414  int new_backing_root_fd = cache_mgr_->RestoreState(
415  -1, state->state_backing_cachemgr);
416  fd_table_.AssignFrom(*state->fd_table);
417 
418  int new_root_fd = -1;
419  if (handle_root.IsValid()) {
420  if (new_backing_root_fd >= 0)
421  handle_root.fd_in_cache_mgr = new_backing_root_fd;
422  new_root_fd = fd_table_.OpenFd(handle_root);
423  // There must be a free file descriptor because the root file catalog gets
424  // closed before a reload
425  assert(new_root_fd >= 0);
426  }
427  return new_root_fd;
428 }
429 
431  SavedState *state = reinterpret_cast<SavedState *>(data);
432  cache_mgr_->FreeState(-1, state->state_backing_cachemgr);
433  delete state->fd_table;
434  delete state;
435  return true;
436 }
437 
439  *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr;
440  return cache_mgr_.Release();
441 }
Counters(perf::Statistics *statistics)
void Dec(class Counter *counter)
Definition: statistics.h:49
virtual int Close(int fd)
Counter * Register(const std::string &name, const std::string &desc)
Definition: statistics.cc:163
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
struct cvmcache_context * ctx
int PlantFd(int fd_in_cache_mgr)
off_t range_offset
Definition: cache.h:127
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
uid_t * GetUidPtr()
Definition: jobinfo.h:157
UniquePtr< Counters > counters_
Definition: cache_stream.h:169
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
Definition: clientctx.cc:57
pthread_mutex_t * lock_fd_table_
Definition: cache_stream.h:160
static const size_t kDefaultBufferSize
Definition: cache_stream.h:33
perf::Counter * sz_transferred_bytes
Definition: cache_stream.h:36
void SetRangeOffset(off_t range_offset)
Definition: jobinfo.h:237
assert((mem||(size==0))&&"Out Of Memory")
uint64_t platform_monotonic_time_ns()
static uint32_t hasher_any(const ComparableHash &key)
SmallHashDynamic< shash::Any, RingBuffer::ObjectHandle_t > buffered_objects_
Definition: cache_stream.h:166
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr, size_t buffer_size, perf::Statistics *statistics)
bool IsSet()
Definition: clientctx.cc:74
download::DownloadManager * external_download_mgr_
Definition: cache_stream.h:158
unsigned char digest[digest_size_]
Definition: hash.h:121
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
Definition: cache_stream.h:157
virtual int Dup(int fd)
size_t ObjectHandle_t
Definition: ring_buffer.h:22
FdTable< FdInfo > fd_table_
Definition: cache_stream.h:161
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
Definition: cache.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
StreamingSink(void *buf, uint64_t size, uint64_t offset, unsigned char *object)
Definition: cache_stream.cc:27
InterruptCue ** GetInterruptCuePtr()
Definition: jobinfo.h:159
void Insert(const Key &key, const Value &value)
Definition: smallhash.h:106
UniquePtr< RingBuffer > buffer_
Definition: cache_stream.h:165
void SetRangeSize(off_t range_size)
Definition: jobinfo.h:238
perf::Counter * n_buffer_objects
Definition: cache_stream.h:41
virtual int64_t Write(const void *buf, uint64_t sz)
Definition: cache_stream.cc:38
Failures error_code() const
Definition: jobinfo.h:200
UniquePtr< CacheManager > cache_mgr_
Definition: cache_stream.h:156
void Inc(class Counter *counter)
Definition: statistics.h:50
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
pid_t * GetPidPtr()
Definition: jobinfo.h:156
pthread_mutex_t * lock_buffer_
Definition: cache_stream.h:167
virtual ~StreamingCacheManager()
CacheManager::Label label
Definition: cache_stream.h:123
zlib::Algorithms zip_algorithm
Definition: cache.h:126
perf::Counter * n_buffer_obstacles
Definition: cache_stream.h:42
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
Definition: jobinfo.h:235
virtual int Readahead(int fd)
T * Release()
Definition: pointer.h:48
Failures Fetch(JobInfo *info)
Definition: download.cc:1982
bool IsExternal() const
Definition: cache.h:104
QuotaManager * quota_mgr_
Definition: cache.h:233
Definition: mutex.h:42
bool Erase(const Key &key)
Definition: smallhash.h:112
virtual int Open(const LabeledObject &object)
std::string path
Definition: cache.h:133
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
std::string MakePath() const
Definition: hash.h:306
bool Lookup(const Key &key, Value *value) const
Definition: smallhash.h:70
void Init(uint32_t expected_size, Key empty, uint32_t(*hasher)(const Key &key))
Definition: smallhash.h:58
static void size_t size
Definition: smalloc.h:54
virtual void * DoSaveState()
static ClientCtx * GetInstance()
Definition: clientctx.cc:45
static const uint64_t kSizeUnknown
Definition: cache.h:74
gid_t * GetGidPtr()
Definition: jobinfo.h:158