28 unsigned char *
object)
33 , window_offset_(offset)
39 virtual int64_t
Write(
const void *buf, uint64_t sz) {
41 memcpy(object_ + pos_, buf, sz);
44 uint64_t old_pos = pos_;
48 return static_cast<int64_t
>(sz);
50 if (pos_ < window_offset_)
51 return static_cast<int64_t
>(sz);
53 if (old_pos >= (window_offset_ + window_size_))
54 return static_cast<int64_t>(sz);
56 uint64_t copy_offset = std::max(old_pos, window_offset_);
57 uint64_t inbuf_offset = copy_offset - old_pos;
58 uint64_t outbuf_offset = copy_offset - window_offset_;
60 std::min(sz - inbuf_offset, window_size_ - outbuf_offset);
62 memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
63 reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
66 return static_cast<int64_t
>(sz);
74 virtual int Purge() {
return Reset(); }
76 return (window_buf_ != NULL) || (window_size_ == 0);
78 virtual int Flush() {
return 0; }
79 virtual bool Reserve(
size_t ) {
return true; }
82 std::string result =
"Streaming sink that is ";
83 result += IsValid() ?
"valid" :
"invalid";
98 return *
const_cast<uint32_t *
>(
99 reinterpret_cast<const uint32_t *
>(key.
digest) + 1);
110 "streaming_cache_mgr.sz_transferred_bytes",
111 "Number of bytes downloaded by the streaming cache manager");
113 "streaming_cache_mgr.sz_transfer_ms",
114 "Time spent downloading data by the streaming cache manager");
116 "streaming_cache_mgr.n_downloads",
"Number of objects requested remotely");
118 "streaming_cache_mgr.n_buffer_hits",
119 "Number of requests served from the buffer");
121 "streaming_cache_mgr.n_buffer_evicts",
122 "Number of objects evicted from the buffer");
124 "streaming_cache_mgr.n_buffer_objects",
"Number of objects in the buffer");
126 "streaming_cache_mgr.n_buffer_obstacles",
127 "Number of objects that could not be stored in the buffer "
128 "(e.g., too large)");
159 unsigned char *
object = NULL;
160 size_t nbytes_in_buffer = 0;
165 object = reinterpret_cast<unsigned char *>(smalloc(nbytes_in_buffer));
170 StreamingSink sink(buf, size, offset,
171 object ? (
object +
sizeof(
shash::Any)) : NULL);
211 while (!
buffer_->HasSpaceFor(nbytes_in_buffer)) {
222 buffer_->PushFront(
object, nbytes_in_buffer);
228 return sink.GetNBytesStreamed();
233 unsigned max_open_fds,
246 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
256 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
270 return "Streaming shim, underlying cache manager:\n" +
cache_mgr_->Describe();
274 bool result =
cache_mgr_->AcquireQuotaManager(quota_mgr);
281 int fd_in_cache_mgr =
cache_mgr_->Open(
object);
282 if (fd_in_cache_mgr >= 0) {
287 if (fd_in_cache_mgr != -ENOENT)
288 return fd_in_cache_mgr;
290 if (
object.label.IsCatalog() ||
object.label.IsPinned() ||
291 object.label.IsCertificate())
318 return Stream(info, NULL, 0, 0);
357 int fd,
void *buf, uint64_t
size, uint64_t offset)
371 int64_t nbytes_streamed =
Stream(info, buf, size, offset);
372 if (nbytes_streamed < 0)
373 return nbytes_streamed;
374 if (static_cast<uint64_t>(nbytes_streamed) < offset)
376 if (static_cast<uint64_t>(nbytes_streamed) > (offset + size))
378 return nbytes_streamed - offset;
420 int new_backing_root_fd =
424 int new_root_fd = -1;
426 if (new_backing_root_fd >= 0)
428 new_root_fd =
fd_table_.OpenFd(handle_root);
445 *root_fd =
fd_table_.GetHandle(0).fd_in_cache_mgr;
Counters(perf::Statistics *statistics)
void Dec(class Counter *counter)
virtual int Close(int fd)
Counter * Register(const std::string &name, const std::string &desc)
int64_t Xadd(class Counter *counter, const int64_t delta)
struct cvmcache_context * ctx
virtual bool Reserve(size_t)
int PlantFd(int fd_in_cache_mgr)
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
perf::Counter * n_buffer_hits
UniquePtr< Counters > counters_
perf::Counter * sz_transfer_ms
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
void * state_backing_cachemgr
perf::Counter * n_downloads
pthread_mutex_t * lock_fd_table_
static const size_t kDefaultBufferSize
perf::Counter * sz_transferred_bytes
int64_t GetNBytesStreamed() const
void SetRangeOffset(off_t range_offset)
assert((mem||(size==0))&&"Out Of Memory")
static uint32_t hasher_any(const ComparableHash &key)
SmallHashDynamic< shash::Any, RingBuffer::ObjectHandle_t > buffered_objects_
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr, size_t buffer_size, perf::Statistics *statistics)
download::DownloadManager * external_download_mgr_
unsigned char digest[digest_size_]
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
virtual std::string Describe()
FdTable< FdInfo > fd_table_
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
virtual bool RequiresReserve()
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
StreamingSink(void *buf, uint64_t size, uint64_t offset, unsigned char *object)
InterruptCue ** GetInterruptCuePtr()
void Insert(const Key &key, const Value &value)
UniquePtr< RingBuffer > buffer_
void SetRangeSize(off_t range_size)
perf::Counter * n_buffer_objects
virtual int64_t Write(const void *buf, uint64_t sz)
Failures error_code() const
UniquePtr< CacheManager > cache_mgr_
void Inc(class Counter *counter)
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
FdTable< FdInfo > * fd_table
pthread_mutex_t * lock_buffer_
perf::Counter * n_buffer_evicts
virtual ~StreamingCacheManager()
CacheManager::Label label
zlib::Algorithms zip_algorithm
perf::Counter * n_buffer_obstacles
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
virtual int Readahead(int fd)
Failures Fetch(JobInfo *info)
QuotaManager * quota_mgr_
bool Erase(const Key &key)
virtual int Open(const LabeledObject &object)
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
std::string MakePath() const
bool Lookup(const Key &key, Value *value) const
void Init(uint32_t expected_size, Key empty, uint32_t(*hasher)(const Key &key))
virtual void * DoSaveState()
static ClientCtx * GetInstance()
static const uint64_t kSizeUnknown