5 #include "cvmfs_config.h"
29 , window_offset_(offset)
34 virtual int64_t
Write(
const void *buf, uint64_t sz) {
35 uint64_t old_pos = pos_;
39 return static_cast<int64_t
>(sz);
41 if (pos_ < window_offset_)
42 return static_cast<int64_t
>(sz);
44 if (old_pos >= (window_offset_ + window_size_))
45 return static_cast<int64_t>(sz);
47 uint64_t copy_offset = std::max(old_pos, window_offset_);
48 uint64_t inbuf_offset = copy_offset - old_pos;
49 uint64_t outbuf_offset = copy_offset - window_offset_;
51 std::min(sz - inbuf_offset, window_size_ - outbuf_offset);
53 memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
54 reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
57 return static_cast<int64_t
>(sz);
65 virtual int Purge() {
return Reset(); }
67 return (window_buf_ != NULL) || (window_size_ == 0);
69 virtual int Flush() {
return 0; }
70 virtual bool Reserve(
size_t ) {
return true; }
73 std::string result =
"Streaming sink that is ";
74 result += IsValid() ?
"valid" :
"invalid";
104 StreamingSink sink(buf, size, offset);
124 return sink.GetNBytesStreamed();
129 unsigned max_open_fds,
133 : cache_mgr_(cache_mgr)
134 , regular_download_mgr_(regular_download_mgr)
135 , external_download_mgr_(external_download_mgr)
136 , fd_table_(max_open_fds,
FdInfo())
139 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
154 return "Streaming shim, underlying cache manager:\n" +
cache_mgr_->Describe();
158 bool result =
cache_mgr_->AcquireQuotaManager(quota_mgr);
165 int fd_in_cache_mgr =
cache_mgr_->Open(
object);
166 if (fd_in_cache_mgr >= 0) {
171 if (fd_in_cache_mgr != -ENOENT)
172 return fd_in_cache_mgr;
174 if (
object.label.IsCatalog() ||
object.label.IsPinned() ||
175 object.label.IsCertificate())
202 return Stream(info, NULL, 0, 0);
241 int fd,
void *buf, uint64_t
size, uint64_t offset)
255 uint64_t nbytes_streamed =
Stream(info, buf, size, offset);
256 if (nbytes_streamed < offset)
258 if (nbytes_streamed > (offset + size))
259 return static_cast<int64_t
>(
size);
260 return static_cast<int64_t
>(nbytes_streamed - offset);
302 int new_backing_root_fd =
306 int new_root_fd = -1;
308 if (new_backing_root_fd >= 0)
310 new_root_fd =
fd_table_.OpenFd(handle_root);
327 *root_fd =
fd_table_.GetHandle(0).fd_in_cache_mgr;
virtual int Close(int fd)
virtual bool Reserve(size_t)
int PlantFd(int fd_in_cache_mgr)
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
void * state_backing_cachemgr
pthread_mutex_t * lock_fd_table_
int64_t GetNBytesStreamed() const
void SetRangeOffset(off_t range_offset)
assert((mem||(size==0))&&"Out Of Memory")
download::DownloadManager * external_download_mgr_
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
virtual std::string Describe()
FdTable< FdInfo > fd_table_
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
virtual bool RequiresReserve()
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
void SetRangeSize(off_t range_size)
virtual int64_t Write(const void *buf, uint64_t sz)
Failures error_code() const
UniquePtr< CacheManager > cache_mgr_
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
FdTable< FdInfo > * fd_table
virtual ~StreamingCacheManager()
CacheManager::Label label
zlib::Algorithms zip_algorithm
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
virtual int Readahead(int fd)
Failures Fetch(JobInfo *info)
QuotaManager * quota_mgr_
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr)
virtual int Open(const LabeledObject &object)
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
StreamingSink(void *buf, uint64_t size, uint64_t offset)
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
std::string MakePath() const
virtual void * DoSaveState()