CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_stream.cc
Go to the documentation of this file.
1 
6 #include "cache_stream.h"
7 
8 #include <algorithm>
9 #include <cstdlib>
10 #include <cstring>
11 #include <string>
12 
13 #include "clientctx.h"
14 #include "network/download.h"
15 #include "network/sink.h"
16 #include "quota.h"
17 #include "statistics.h"
18 #include "util/mutex.h"
19 #include "util/platform.h"
20 #include "util/smalloc.h"
21 
22 
23 namespace {
24 
25 class StreamingSink : public cvmfs::Sink {
26  public:
27  StreamingSink(void *buf, uint64_t size, uint64_t offset,
28  unsigned char *object)
29  : Sink(false /* is_owner */)
30  , pos_(0)
31  , window_buf_(buf)
32  , window_size_(size)
33  , window_offset_(offset)
34  , object_(object)
35  { }
36 
37  virtual ~StreamingSink() {}
38 
39  virtual int64_t Write(const void *buf, uint64_t sz) {
40  if (object_) {
41  memcpy(object_ + pos_, buf, sz);
42  }
43 
44  uint64_t old_pos = pos_;
45  pos_ += sz;
46 
47  if (!window_buf_)
48  return static_cast<int64_t>(sz);
49 
50  if (pos_ < window_offset_)
51  return static_cast<int64_t>(sz);
52 
53  if (old_pos >= (window_offset_ + window_size_))
54  return static_cast<int64_t>(sz);
55 
56  uint64_t copy_offset = std::max(old_pos, window_offset_);
57  uint64_t inbuf_offset = copy_offset - old_pos;
58  uint64_t outbuf_offset = copy_offset - window_offset_;
59  uint64_t copy_size =
60  std::min(sz - inbuf_offset, window_size_ - outbuf_offset);
61 
62  memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
63  reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
64  copy_size);
65 
66  return static_cast<int64_t>(sz);
67  }
68 
69  virtual int Reset() {
70  pos_ = 0;
71  return 0;
72  }
73 
74  virtual int Purge() { return Reset(); }
75  virtual bool IsValid() {
76  return (window_buf_ != NULL) || (window_size_ == 0);
77  }
78  virtual int Flush() { return 0; }
79  virtual bool Reserve(size_t /* size */) { return true; }
80  virtual bool RequiresReserve() { return false; }
81  virtual std::string Describe() {
82  std::string result = "Streaming sink that is ";
83  result += IsValid() ? "valid" : "invalid";
84  return result;
85  }
86 
87  int64_t GetNBytesStreamed() const { return static_cast<int64_t>(pos_); }
88 
89  private:
90  uint64_t pos_;
91  void *window_buf_;
92  uint64_t window_size_;
93  uint64_t window_offset_;
94  unsigned char *object_;
95 }; // class StreamingSink
96 
97 static inline uint32_t hasher_any(const shash::Any &key) {
98  return *const_cast<uint32_t *>(
99  reinterpret_cast<const uint32_t *>(key.digest) + 1);
100 }
101 
102 } // anonymous namespace
103 
104 
105 const size_t StreamingCacheManager::kDefaultBufferSize = 64 * 1024 * 1024;
106 
107 
109  sz_transferred_bytes = statistics->Register(
110  "streaming_cache_mgr.sz_transferred_bytes",
111  "Number of bytes downloaded by the streaming cache manager");
112  sz_transfer_ms = statistics->Register(
113  "streaming_cache_mgr.sz_transfer_ms",
114  "Time spent downloading data by the streaming cache manager");
115  n_downloads = statistics->Register(
116  "streaming_cache_mgr.n_downloads", "Number of objects requested remotely");
117  n_buffer_hits = statistics->Register(
118  "streaming_cache_mgr.n_buffer_hits",
119  "Number of requests served from the buffer");
120  n_buffer_evicts = statistics->Register(
121  "streaming_cache_mgr.n_buffer_evicts",
122  "Number of objects evicted from the buffer");
123  n_buffer_objects = statistics->Register(
124  "streaming_cache_mgr.n_buffer_objects", "Number of objects in the buffer");
125  n_buffer_obstacles = statistics->Register(
126  "streaming_cache_mgr.n_buffer_obstacles",
127  "Number of objects that could not be stored in the buffer "
128  "(e.g., too large)");
129 }
130 
131 
133  const FdInfo &info)
134 {
135  if (info.label.IsExternal())
136  return external_download_mgr_;
137  return regular_download_mgr_;
138 }
139 
140 
142  const FdInfo &info,
143  void *buf,
144  uint64_t size,
145  uint64_t offset)
146 {
147  // Note: objects stored in the ring buffer are prepended by their hash
148 
149  {
152  if (buffered_objects_.Lookup(info.object_id, &handle)) {
153  perf::Inc(counters_->n_buffer_hits);
154  buffer_->CopySlice(handle, size, offset + sizeof(shash::Any), buf);
155  return buffer_->GetObjectSize(handle) - sizeof(shash::Any);
156  }
157  }
158 
159  unsigned char *object = NULL;
160  size_t nbytes_in_buffer = 0;
161  if ((info.label.size != CacheManager::kSizeUnknown) &&
162  (info.label.size + sizeof(shash::Any) <= buffer_->GetMaxObjectSize()))
163  {
164  nbytes_in_buffer = sizeof(shash::Any) + info.label.size;
165  object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer));
166  } else {
167  perf::Inc(counters_->n_buffer_obstacles);
168  }
169 
170  StreamingSink sink(buf, size, offset,
171  object ? (object + sizeof(shash::Any)) : NULL);
172  std::string url;
173  if (info.label.IsExternal()) {
174  url = info.label.path;
175  } else {
176  url = "/data/" + info.object_id.MakePath();
177  }
178  bool is_zipped = info.label.zip_algorithm == zlib::kZlibDefault;
179 
180  download::JobInfo download_job(&url, is_zipped, true /* probe_hosts */,
181  &info.object_id, &sink);
182  download_job.SetExtraInfo(&info.label.path);
183  download_job.SetRangeOffset(info.label.range_offset);
184  download_job.SetRangeSize(static_cast<int64_t>(info.label.size));
186  if (ctx->IsSet()) {
187  ctx->Get(download_job.GetUidPtr(),
188  download_job.GetGidPtr(),
189  download_job.GetPidPtr(),
190  download_job.GetInterruptCuePtr());
191  }
192 
193  {
194  uint64_t timestamp = platform_monotonic_time_ns();
195  SelectDownloadManager(info)->Fetch(&download_job);
196  perf::Xadd(counters_->sz_transfer_ms,
197  (platform_monotonic_time_ns() - timestamp) / (1000 * 1000));
198  }
199 
200  perf::Inc(counters_->n_downloads);
201  perf::Xadd(counters_->sz_transferred_bytes, sink.GetNBytesStreamed());
202 
203  if (download_job.error_code() != download::kFailOk) {
204  free(object);
205  return -EIO;
206  }
207 
208  if (object) {
209  memcpy(object, &info.object_id, sizeof(shash::Any));
211  while (!buffer_->HasSpaceFor(nbytes_in_buffer)) {
212  RingBuffer::ObjectHandle_t deleted_handle = buffer_->RemoveBack();
213  // As long as we don't add any new objects, the deleted_handle can still
214  // be accessed
215  shash::Any deleted_hash;
216  buffer_->CopySlice(deleted_handle, sizeof(shash::Any), 0, &deleted_hash);
217  buffered_objects_.Erase(deleted_hash);
218  perf::Inc(counters_->n_buffer_evicts);
219  perf::Dec(counters_->n_buffer_objects);
220  }
222  buffer_->PushFront(object, nbytes_in_buffer);
223  buffered_objects_.Insert(info.object_id, handle);
224  perf::Inc(counters_->n_buffer_objects);
225  }
226  free(object);
227 
228  return sink.GetNBytesStreamed();
229 }
230 
231 
233  unsigned max_open_fds,
234  CacheManager *cache_mgr,
235  download::DownloadManager *regular_download_mgr,
236  download::DownloadManager *external_download_mgr,
237  size_t buffer_size,
238  perf::Statistics *statistics)
239  : cache_mgr_(cache_mgr)
240  , regular_download_mgr_(regular_download_mgr)
241  , external_download_mgr_(external_download_mgr)
242  , fd_table_(max_open_fds, FdInfo())
243  , counters_(new Counters(statistics))
244 {
246  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
247  int retval = pthread_mutex_init(lock_fd_table_, NULL);
248  assert(retval == 0);
249 
250  delete quota_mgr_;
251  quota_mgr_ = cache_mgr_->quota_mgr();
252 
253  buffer_ = new RingBuffer(buffer_size);
255  lock_buffer_ =
256  reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
257  retval = pthread_mutex_init(lock_buffer_, NULL);
258  assert(retval == 0);
259 }
260 
262  pthread_mutex_destroy(lock_buffer_);
263  free(lock_buffer_);
264  pthread_mutex_destroy(lock_fd_table_);
265  free(lock_fd_table_);
266  quota_mgr_ = NULL; // gets deleted by cache_mgr_
267 }
268 
270  return "Streaming shim, underlying cache manager:\n" + cache_mgr_->Describe();
271 }
272 
274  bool result = cache_mgr_->AcquireQuotaManager(quota_mgr);
275  if (result)
276  quota_mgr_ = cache_mgr_->quota_mgr();
277  return result;
278 }
279 
281  int fd_in_cache_mgr = cache_mgr_->Open(object);
282  if (fd_in_cache_mgr >= 0) {
283  MutexLockGuard lock_guard(lock_fd_table_);
284  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
285  }
286 
287  if (fd_in_cache_mgr != -ENOENT)
288  return fd_in_cache_mgr;
289 
290  if (object.label.IsCatalog() || object.label.IsPinned() ||
291  object.label.IsCertificate())
292  {
293  return -ENOENT;
294  }
295 
296  MutexLockGuard lock_guard(lock_fd_table_);
297  return fd_table_.OpenFd(FdInfo(object));
298 }
299 
300 int StreamingCacheManager::PlantFd(int fd_in_cache_mgr) {
301  MutexLockGuard lock_guard(lock_fd_table_);
302  return fd_table_.OpenFd(FdInfo(fd_in_cache_mgr));
303 }
304 
306  FdInfo info;
307  {
308  MutexLockGuard lock_guard(lock_fd_table_);
309  info = fd_table_.GetHandle(fd);
310  }
311 
312  if (!info.IsValid())
313  return -EBADF;
314 
315  if (info.fd_in_cache_mgr >= 0)
316  return cache_mgr_->GetSize(info.fd_in_cache_mgr);
317 
318  return Stream(info, NULL, 0, 0);
319 }
320 
322  FdInfo info;
323 
324  MutexLockGuard lock_guard(lock_fd_table_);
325  info = fd_table_.GetHandle(fd);
326 
327  if (!info.IsValid())
328  return -EBADF;
329 
330  if (info.fd_in_cache_mgr >= 0) {
331  int dup_fd = cache_mgr_->Dup(info.fd_in_cache_mgr);
332  if (dup_fd < 0)
333  return dup_fd;
334  return fd_table_.OpenFd(FdInfo(dup_fd));
335  }
336 
337  return fd_table_.OpenFd(FdInfo(LabeledObject(info.object_id, info.label)));
338 }
339 
341  FdInfo info;
342  {
343  MutexLockGuard lock_guard(lock_fd_table_);
344  info = fd_table_.GetHandle(fd);
345  if (!info.IsValid())
346  return -EBADF;
347  fd_table_.CloseFd(fd);
348  }
349 
350  if (info.fd_in_cache_mgr >= 0)
351  return cache_mgr_->Close(info.fd_in_cache_mgr);
352 
353  return 0;
354 }
355 
357  int fd, void *buf, uint64_t size, uint64_t offset)
358 {
359  FdInfo info;
360  {
361  MutexLockGuard lock_guard(lock_fd_table_);
362  info = fd_table_.GetHandle(fd);
363  }
364 
365  if (!info.IsValid())
366  return -EBADF;
367 
368  if (info.fd_in_cache_mgr >= 0)
369  return cache_mgr_->Pread(info.fd_in_cache_mgr, buf, size, offset);
370 
371  int64_t nbytes_streamed = Stream(info, buf, size, offset);
372  if (nbytes_streamed < 0)
373  return nbytes_streamed;
374  if (static_cast<uint64_t>(nbytes_streamed) < offset)
375  return 0;
376  if (static_cast<uint64_t>(nbytes_streamed) > (offset + size))
377  return size;
378  return nbytes_streamed - offset;
379 }
380 
382  FdInfo info;
383  {
384  MutexLockGuard lock_guard(lock_fd_table_);
385  info = fd_table_.GetHandle(fd);
386  }
387 
388  if (!info.IsValid())
389  return -EBADF;
390 
391  if (info.fd_in_cache_mgr >= 0)
392  return cache_mgr_->Readahead(info.fd_in_cache_mgr);
393 
394  return 0;
395 }
396 
398  int fd = cache_mgr_->OpenFromTxn(txn);
399  if (fd < 0)
400  return fd;
401 
402  MutexLockGuard lock_guard(lock_fd_table_);
403  return fd_table_.OpenFd(FdInfo(fd));
404 }
405 
407  SavedState *state = new SavedState();
408  state->fd_table = fd_table_.Clone();
409  state->state_backing_cachemgr = cache_mgr_->SaveState(-1);
410  return state;
411 }
412 
414  // When DoRestoreState is called, we have fd 0 assigned to the root file
415  // catalog
416  FdInfo handle_root = fd_table_.GetHandle(0);
417 
418  SavedState *state = reinterpret_cast<SavedState *>(data);
419 
420  int new_backing_root_fd =
421  cache_mgr_->RestoreState(-1, state->state_backing_cachemgr);
422  fd_table_.AssignFrom(*state->fd_table);
423 
424  int new_root_fd = -1;
425  if (handle_root.IsValid()) {
426  if (new_backing_root_fd >= 0)
427  handle_root.fd_in_cache_mgr = new_backing_root_fd;
428  new_root_fd = fd_table_.OpenFd(handle_root);
429  // There must be a free file descriptor because the root file catalog gets
430  // closed before a reload
431  assert(new_root_fd >= 0);
432  }
433  return new_root_fd;
434 }
435 
437  SavedState *state = reinterpret_cast<SavedState *>(data);
438  cache_mgr_->FreeState(-1, state->state_backing_cachemgr);
439  delete state->fd_table;
440  delete state;
441  return true;
442 }
443 
445  *root_fd = fd_table_.GetHandle(0).fd_in_cache_mgr;
446  return cache_mgr_.Release();
447 }
Counters(perf::Statistics *statistics)
void Dec(class Counter *counter)
Definition: statistics.h:49
virtual int Close(int fd)
Counter * Register(const std::string &name, const std::string &desc)
Definition: statistics.cc:160
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
struct cvmcache_context * ctx
int PlantFd(int fd_in_cache_mgr)
off_t range_offset
Definition: cache.h:127
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
uid_t * GetUidPtr()
Definition: jobinfo.h:162
UniquePtr< Counters > counters_
Definition: cache_stream.h:172
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
Definition: clientctx.cc:57
pthread_mutex_t * lock_fd_table_
Definition: cache_stream.h:163
static const size_t kDefaultBufferSize
Definition: cache_stream.h:33
perf::Counter * sz_transferred_bytes
Definition: cache_stream.h:36
void SetRangeOffset(off_t range_offset)
Definition: jobinfo.h:239
assert((mem||(size==0))&&"Out Of Memory")
uint64_t platform_monotonic_time_ns()
static uint32_t hasher_any(const ComparableHash &key)
SmallHashDynamic< shash::Any, RingBuffer::ObjectHandle_t > buffered_objects_
Definition: cache_stream.h:169
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr, size_t buffer_size, perf::Statistics *statistics)
bool IsSet()
Definition: clientctx.cc:74
download::DownloadManager * external_download_mgr_
Definition: cache_stream.h:161
unsigned char digest[digest_size_]
Definition: hash.h:124
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
Definition: cache_stream.h:160
virtual int Dup(int fd)
size_t ObjectHandle_t
Definition: ring_buffer.h:22
FdTable< FdInfo > fd_table_
Definition: cache_stream.h:164
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
Definition: cache.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
StreamingSink(void *buf, uint64_t size, uint64_t offset, unsigned char *object)
Definition: cache_stream.cc:27
InterruptCue ** GetInterruptCuePtr()
Definition: jobinfo.h:164
void Insert(const Key &key, const Value &value)
Definition: smallhash.h:109
UniquePtr< RingBuffer > buffer_
Definition: cache_stream.h:168
void SetRangeSize(off_t range_size)
Definition: jobinfo.h:240
perf::Counter * n_buffer_objects
Definition: cache_stream.h:41
virtual int64_t Write(const void *buf, uint64_t sz)
Definition: cache_stream.cc:39
Failures error_code() const
Definition: jobinfo.h:204
UniquePtr< CacheManager > cache_mgr_
Definition: cache_stream.h:159
void Inc(class Counter *counter)
Definition: statistics.h:50
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
pid_t * GetPidPtr()
Definition: jobinfo.h:161
pthread_mutex_t * lock_buffer_
Definition: cache_stream.h:170
virtual ~StreamingCacheManager()
CacheManager::Label label
Definition: cache_stream.h:124
zlib::Algorithms zip_algorithm
Definition: cache.h:126
perf::Counter * n_buffer_obstacles
Definition: cache_stream.h:42
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
Definition: jobinfo.h:236
virtual int Readahead(int fd)
T * Release()
Definition: pointer.h:44
Failures Fetch(JobInfo *info)
Definition: download.cc:2001
bool IsExternal() const
Definition: cache.h:104
QuotaManager * quota_mgr_
Definition: cache.h:235
Definition: mutex.h:42
bool Erase(const Key &key)
Definition: smallhash.h:115
virtual int Open(const LabeledObject &object)
std::string path
Definition: cache.h:133
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
std::string MakePath() const
Definition: hash.h:316
bool Lookup(const Key &key, Value *value) const
Definition: smallhash.h:73
void Init(uint32_t expected_size, Key empty, uint32_t(*hasher)(const Key &key))
Definition: smallhash.h:60
static void size_t size
Definition: smalloc.h:54
virtual void * DoSaveState()
static ClientCtx * GetInstance()
Definition: clientctx.cc:45
static const uint64_t kSizeUnknown
Definition: cache.h:74
gid_t * GetGidPtr()
Definition: jobinfo.h:163