28 unsigned char *
object)
33 , window_offset_(offset)
38 virtual int64_t
Write(
const void *buf, uint64_t sz) {
40 memcpy(object_ + pos_, buf, sz);
43 uint64_t old_pos = pos_;
47 return static_cast<int64_t
>(sz);
49 if (pos_ < window_offset_)
50 return static_cast<int64_t
>(sz);
52 if (old_pos >= (window_offset_ + window_size_))
53 return static_cast<int64_t>(sz);
55 uint64_t copy_offset = std::max(old_pos, window_offset_);
56 uint64_t inbuf_offset = copy_offset - old_pos;
57 uint64_t outbuf_offset = copy_offset - window_offset_;
58 uint64_t copy_size = std::min(sz - inbuf_offset,
59 window_size_ - outbuf_offset);
61 memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
62 reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
65 return static_cast<int64_t
>(sz);
73 virtual int Purge() {
return Reset(); }
75 return (window_buf_ != NULL) || (window_size_ == 0);
77 virtual int Flush() {
return 0; }
78 virtual bool Reserve(
size_t ) {
return true; }
81 std::string result =
"Streaming sink that is ";
82 result += IsValid() ?
"valid" :
"invalid";
97 return *
const_cast<uint32_t *
>(
reinterpret_cast<const uint32_t *
>(key.
digest)
109 "streaming_cache_mgr.sz_transferred_bytes",
110 "Number of bytes downloaded by the streaming cache manager");
112 "streaming_cache_mgr.sz_transfer_ms",
113 "Time spent downloading data by the streaming cache manager");
115 "Number of objects requested remotely");
117 "streaming_cache_mgr.n_buffer_hits",
118 "Number of requests served from the buffer");
120 "streaming_cache_mgr.n_buffer_evicts",
121 "Number of objects evicted from the buffer");
123 "streaming_cache_mgr.n_buffer_objects",
124 "Number of objects in the buffer");
126 "streaming_cache_mgr.n_buffer_obstacles",
127 "Number of objects that could not be stored in the buffer "
128 "(e.g., too large)");
156 unsigned char *
object = NULL;
157 size_t nbytes_in_buffer = 0;
160 <=
buffer_->GetMaxObjectSize())) {
162 object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer));
167 StreamingSink sink(buf, size, offset,
168 object ? (
object +
sizeof(
shash::Any)) : NULL);
208 while (!
buffer_->HasSpaceFor(nbytes_in_buffer)) {
225 return sink.GetNBytesStreamed();
230 unsigned max_open_fds,
242 smalloc(
sizeof(pthread_mutex_t)));
252 smalloc(
sizeof(pthread_mutex_t)));
266 return "Streaming shim, underlying cache manager:\n" +
cache_mgr_->Describe();
270 bool result =
cache_mgr_->AcquireQuotaManager(quota_mgr);
277 int fd_in_cache_mgr =
cache_mgr_->Open(
object);
278 if (fd_in_cache_mgr >= 0) {
283 if (fd_in_cache_mgr != -ENOENT)
284 return fd_in_cache_mgr;
286 if (
object.label.IsCatalog() ||
object.label.IsPinned()
287 ||
object.label.IsCertificate()) {
313 return Stream(info, NULL, 0, 0);
365 int64_t nbytes_streamed =
Stream(info, buf, size, offset);
366 if (nbytes_streamed < 0)
367 return nbytes_streamed;
368 if (static_cast<uint64_t>(nbytes_streamed) < offset)
370 if (static_cast<uint64_t>(nbytes_streamed) > (offset + size))
372 return nbytes_streamed - offset;
414 int new_backing_root_fd =
cache_mgr_->RestoreState(
418 int new_root_fd = -1;
420 if (new_backing_root_fd >= 0)
422 new_root_fd =
fd_table_.OpenFd(handle_root);
439 *root_fd =
fd_table_.GetHandle(0).fd_in_cache_mgr;
Counters(perf::Statistics *statistics)
void Dec(class Counter *counter)
virtual int Close(int fd)
Counter * Register(const std::string &name, const std::string &desc)
int64_t Xadd(class Counter *counter, const int64_t delta)
struct cvmcache_context * ctx
virtual bool Reserve(size_t)
int PlantFd(int fd_in_cache_mgr)
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
perf::Counter * n_buffer_hits
UniquePtr< Counters > counters_
perf::Counter * sz_transfer_ms
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
void * state_backing_cachemgr
perf::Counter * n_downloads
pthread_mutex_t * lock_fd_table_
static const size_t kDefaultBufferSize
perf::Counter * sz_transferred_bytes
int64_t GetNBytesStreamed() const
void SetRangeOffset(off_t range_offset)
assert((mem||(size==0))&&"Out Of Memory")
static uint32_t hasher_any(const ComparableHash &key)
SmallHashDynamic< shash::Any, RingBuffer::ObjectHandle_t > buffered_objects_
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr, size_t buffer_size, perf::Statistics *statistics)
download::DownloadManager * external_download_mgr_
unsigned char digest[digest_size_]
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
virtual std::string Describe()
FdTable< FdInfo > fd_table_
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
virtual bool RequiresReserve()
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
StreamingSink(void *buf, uint64_t size, uint64_t offset, unsigned char *object)
InterruptCue ** GetInterruptCuePtr()
void Insert(const Key &key, const Value &value)
UniquePtr< RingBuffer > buffer_
void SetRangeSize(off_t range_size)
perf::Counter * n_buffer_objects
virtual int64_t Write(const void *buf, uint64_t sz)
Failures error_code() const
UniquePtr< CacheManager > cache_mgr_
void Inc(class Counter *counter)
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
FdTable< FdInfo > * fd_table
pthread_mutex_t * lock_buffer_
perf::Counter * n_buffer_evicts
virtual ~StreamingCacheManager()
CacheManager::Label label
zlib::Algorithms zip_algorithm
perf::Counter * n_buffer_obstacles
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
virtual int Readahead(int fd)
Failures Fetch(JobInfo *info)
QuotaManager * quota_mgr_
bool Erase(const Key &key)
virtual int Open(const LabeledObject &object)
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
std::string MakePath() const
bool Lookup(const Key &key, Value *value) const
void Init(uint32_t expected_size, Key empty, uint32_t(*hasher)(const Key &key))
virtual void * DoSaveState()
static ClientCtx * GetInstance()
static const uint64_t kSizeUnknown