5 #include "cvmfs_config.h"
30 , window_offset_(offset)
35 virtual int64_t
Write(
const void *buf, uint64_t sz) {
36 uint64_t old_pos = pos_;
40 return static_cast<int64_t
>(sz);
42 if (pos_ < window_offset_)
43 return static_cast<int64_t
>(sz);
45 if (old_pos >= (window_offset_ + window_size_))
46 return static_cast<int64_t>(sz);
48 uint64_t copy_offset = std::max(old_pos, window_offset_);
49 uint64_t inbuf_offset = copy_offset - old_pos;
50 uint64_t outbuf_offset = copy_offset - window_offset_;
52 std::min(sz - inbuf_offset, window_size_ - outbuf_offset);
54 memcpy(reinterpret_cast<unsigned char *>(window_buf_) + outbuf_offset,
55 reinterpret_cast<const unsigned char *>(buf) + inbuf_offset,
58 return static_cast<int64_t
>(sz);
66 virtual int Purge() {
return Reset(); }
68 return (window_buf_ != NULL) || (window_size_ == 0);
70 virtual int Flush() {
return 0; }
71 virtual bool Reserve(
size_t ) {
return true; }
74 std::string result =
"Streaming sink that is ";
75 result += IsValid() ?
"valid" :
"invalid";
105 StreamingSink sink(buf, size, offset);
133 return sink.GetNBytesStreamed();
138 unsigned max_open_fds,
142 : cache_mgr_(cache_mgr)
143 , regular_download_mgr_(regular_download_mgr)
144 , external_download_mgr_(external_download_mgr)
145 , fd_table_(max_open_fds,
FdInfo())
148 reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
163 return "Streaming shim, underlying cache manager:\n" +
cache_mgr_->Describe();
167 bool result =
cache_mgr_->AcquireQuotaManager(quota_mgr);
174 int fd_in_cache_mgr =
cache_mgr_->Open(
object);
175 if (fd_in_cache_mgr >= 0) {
180 if (fd_in_cache_mgr != -ENOENT)
181 return fd_in_cache_mgr;
183 if (
object.label.IsCatalog() ||
object.label.IsPinned() ||
184 object.label.IsCertificate())
211 return Stream(info, NULL, 0, 0);
250 int fd,
void *buf, uint64_t
size, uint64_t offset)
264 int64_t nbytes_streamed =
Stream(info, buf, size, offset);
265 if (nbytes_streamed < 0)
266 return nbytes_streamed;
267 if (static_cast<uint64_t>(nbytes_streamed) < offset)
269 if (static_cast<uint64_t>(nbytes_streamed) > (offset + size))
271 return nbytes_streamed - offset;
313 int new_backing_root_fd =
317 int new_root_fd = -1;
319 if (new_backing_root_fd >= 0)
321 new_root_fd =
fd_table_.OpenFd(handle_root);
338 *root_fd =
fd_table_.GetHandle(0).fd_in_cache_mgr;
virtual int Close(int fd)
struct cvmcache_context * ctx
virtual bool Reserve(size_t)
int PlantFd(int fd_in_cache_mgr)
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
void Get(uid_t *uid, gid_t *gid, pid_t *pid, InterruptCue **ic)
void * state_backing_cachemgr
pthread_mutex_t * lock_fd_table_
int64_t GetNBytesStreamed() const
void SetRangeOffset(off_t range_offset)
assert((mem||(size==0))&&"Out Of Memory")
download::DownloadManager * external_download_mgr_
virtual int64_t GetSize(int fd)
download::DownloadManager * regular_download_mgr_
virtual std::string Describe()
FdTable< FdInfo > fd_table_
CacheManager * MoveOutBackingCacheMgr(int *root_fd)
virtual int OpenFromTxn(void *txn)
uint64_t size
unzipped size, if known
virtual bool RequiresReserve()
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
InterruptCue ** GetInterruptCuePtr()
void SetRangeSize(off_t range_size)
virtual int64_t Write(const void *buf, uint64_t sz)
Failures error_code() const
UniquePtr< CacheManager > cache_mgr_
download::DownloadManager * SelectDownloadManager(const FdInfo &info)
FdTable< FdInfo > * fd_table
virtual ~StreamingCacheManager()
CacheManager::Label label
zlib::Algorithms zip_algorithm
virtual std::string Describe()
void SetExtraInfo(const std::string *extra_info)
virtual int Readahead(int fd)
Failures Fetch(JobInfo *info)
QuotaManager * quota_mgr_
StreamingCacheManager(unsigned max_open_fds, CacheManager *cache_mgr, download::DownloadManager *regular_download_mgr, download::DownloadManager *external_download_mgr)
virtual int Open(const LabeledObject &object)
virtual bool DoFreeState(void *data)
virtual int DoRestoreState(void *data)
StreamingSink(void *buf, uint64_t size, uint64_t offset)
int64_t Stream(const FdInfo &info, void *buf, uint64_t size, uint64_t offset)
std::string MakePath() const
virtual void * DoSaveState()
static ClientCtx * GetInstance()