11 #include <sys/socket.h>
43 switch (status_code) {
44 case cvmfs::STATUS_OK:
46 case cvmfs::STATUS_NOSUPPORT:
48 case cvmfs::STATUS_FORBIDDEN:
50 case cvmfs::STATUS_NOSPACE:
52 case cvmfs::STATUS_NOENTRY:
54 case cvmfs::STATUS_MALFORMED:
56 case cvmfs::STATUS_IOERR:
58 case cvmfs::STATUS_CORRUPTED:
60 case cvmfs::STATUS_TIMEOUT:
62 case cvmfs::STATUS_BADCOUNT:
64 case cvmfs::STATUS_OUTOFBOUNDS:
77 int result = Reset(txn);
79 free(reinterpret_cast<Transaction *>(txn)->buffer);
87 quota_mgr_ = quota_mgr;
100 bool retval = transport_.RecvFrame(rpc_job->
frame_recv());
103 google::protobuf::MessageLite *msg_typed = rpc_job->
frame_recv()
105 assert(msg_typed->GetTypeName() ==
"cvmfs.MsgDetach");
106 quota_mgr_->BroadcastBackchannels(
"R");
115 inflight_rpcs_.push_back(
RpcInFlight(rpc_job, &signal));
127 cvmfs::MsgHash object_id;
128 transport_.FillMsgHash(
id, &object_id);
129 cvmfs::MsgRefcountReq msg_refcount;
130 msg_refcount.set_session_id(session_id_);
131 msg_refcount.set_req_id(NextRequestId());
132 msg_refcount.set_allocated_object_id(&object_id);
133 msg_refcount.set_change_by(change_by);
134 RpcJob rpc_job(&msg_refcount);
135 CallRemotely(&rpc_job);
136 msg_refcount.release_object_id();
147 handle = fd_table_.GetHandle(fd);
148 if (handle.
id == kInvalidHandle)
150 int retval = fd_table_.CloseFd(fd);
154 return ChangeRefcount(handle.
id, -1);
161 std::string(transaction->
id.
ToString()).c_str());
162 int retval = Flush(
true, transaction);
166 int refcount = transaction->
open_fds - 1;
168 return ChangeRefcount(transaction->
id, refcount);
170 free(transaction->
buffer);
180 if (tokens[0] ==
"unix") {
182 }
else if (tokens[0] ==
"tcp") {
183 vector<string> tcp_address =
SplitString(tokens[1],
':');
184 if (tcp_address.size() != 2)
194 "Failed to connect to socket: %s", strerror(errno));
197 "Failed to connect to socket (unknown error)");
209 unsigned max_open_fds,
210 const string &ident) {
215 cvmfs::MsgHandshake msg_handshake;
216 msg_handshake.set_protocol_version(kPbProtocolVersion);
217 msg_handshake.set_name(ident);
225 google::protobuf::MessageLite *msg_typed = frame_recv.
GetMsgTyped();
226 if (msg_typed->GetTypeName() !=
"cvmfs.MsgHandshakeAck")
228 cvmfs::MsgHandshakeAck *msg_ack =
reinterpret_cast<cvmfs::MsgHandshakeAck *
>(
236 "external cache manager object size too large (%u)",
242 "external cache manager object size too small (%u)",
246 if (msg_ack->has_pid())
259 const std::string &locator,
const std::vector<std::string> &cmd_line) {
261 unsigned num_attempts = 0;
262 bool try_again =
false;
265 if (num_attempts > 2) {
269 plugin_handle->fd_connection_ = ConnectLocator(locator, num_attempts > 1);
270 if (plugin_handle->
IsValid()) {
272 }
else if (plugin_handle->fd_connection_ == -EINVAL) {
275 plugin_handle->error_msg_ =
"Invalid locator: " + locator;
278 if (num_attempts > 1) {
280 "Failed to connect to external cache manager: %d",
281 plugin_handle->fd_connection_);
283 plugin_handle->error_msg_ =
"Failed to connect to external cache manager";
286 try_again = SpawnPlugin(cmd_line);
289 return plugin_handle.
Release();
297 transaction->
label = label;
325 int status_refcnt = ChangeRefcount(
id, 1);
326 if (status_refcnt == 0)
330 int retval = fd_table_.CloseFd(fd);
332 return status_refcnt;
339 for (
unsigned i = 1; i < fd_table_.GetMaxFds(); ++i) {
347 cvmfs::MsgIoctl msg_ioctl;
348 msg_ioctl.set_session_id(session_id_);
349 msg_ioctl.set_conncnt_change_by(-1);
351 transport_.SendFrame(&frame);
353 int new_root_fd = -1;
355 new_root_fd = fd_table_.OpenFd(handle_root);
365 cvmfs::MsgIoctl msg_ioctl;
366 msg_ioctl.set_session_id(session_id_);
367 msg_ioctl.set_conncnt_change_by(1);
369 transport_.SendFrame(&frame);
370 return fd_table_.Clone();
376 if (
id == kInvalidHandle)
383 unsigned max_open_fds)
386 , transport_(fd_connection)
388 , max_object_size_(0)
391 , capabilities_(cvmfs::CAP_NONE) {
407 cvmfs::MsgQuit msg_quit;
427 std::string(transaction->
id.
ToString()).c_str());
428 cvmfs::MsgHash object_id;
430 cvmfs::MsgStoreReq msg_store;
433 msg_store.set_allocated_object_id(&object_id);
436 msg_store.set_last_part(do_commit);
439 cvmfs::EnumObjectType object_type;
441 msg_store.set_object_type(object_type);
445 RpcJob rpc_job(&msg_store);
449 msg_store.release_object_id();
452 if (msg_reply->status() == cvmfs::STATUS_OK) {
473 cvmfs::MsgHash object_id;
475 cvmfs::MsgObjectInfoReq msg_info;
478 msg_info.set_allocated_object_id(&object_id);
479 RpcJob rpc_job(&msg_info);
481 msg_info.release_object_id();
484 if (msg_reply->status() == cvmfs::STATUS_OK) {
485 assert(msg_reply->has_size());
486 return msg_reply->size();
506 uint64_t part_nr = 0;
507 google::protobuf::MessageLite *msg = frame_recv.
GetMsgTyped();
508 if (msg->GetTypeName() ==
"cvmfs.MsgRefcountReply") {
509 req_id =
reinterpret_cast<cvmfs::MsgRefcountReply *
>(msg)->req_id();
510 }
else if (msg->GetTypeName() ==
"cvmfs.MsgObjectInfoReply") {
511 req_id =
reinterpret_cast<cvmfs::MsgObjectInfoReply *
>(msg)->req_id();
512 }
else if (msg->GetTypeName() ==
"cvmfs.MsgReadReply") {
513 req_id =
reinterpret_cast<cvmfs::MsgReadReply *
>(msg)->req_id();
514 }
else if (msg->GetTypeName() ==
"cvmfs.MsgStoreReply") {
515 req_id =
reinterpret_cast<cvmfs::MsgStoreReply *
>(msg)->req_id();
516 part_nr =
reinterpret_cast<cvmfs::MsgStoreReply *
>(msg)->part_nr();
517 }
else if (msg->GetTypeName() ==
"cvmfs.MsgInfoReply") {
518 req_id =
reinterpret_cast<cvmfs::MsgInfoReply *
>(msg)->req_id();
519 }
else if (msg->GetTypeName() ==
"cvmfs.MsgShrinkReply") {
520 req_id =
reinterpret_cast<cvmfs::MsgShrinkReply *
>(msg)->req_id();
521 }
else if (msg->GetTypeName() ==
"cvmfs.MsgListReply") {
522 req_id =
reinterpret_cast<cvmfs::MsgListReply *
>(msg)->req_id();
523 }
else if (msg->GetTypeName() ==
"cvmfs.MsgBreadcrumbReply") {
524 req_id =
reinterpret_cast<cvmfs::MsgBreadcrumbReply *
>(msg)->req_id();
525 }
else if (msg->GetTypeName() ==
"cvmfs.MsgDetach") {
531 std::string(msg->GetTypeName()).c_str());
539 if ((rpc_job->
req_id() == req_id) && (rpc_job->
part_nr() == part_nr)) {
547 if (rpc_inflight.
rpc_job == NULL) {
549 "got unmatched rpc reply");
558 "connection to external cache manager broken (%d)", errno);
573 std::string(transaction->
id.
ToString()).c_str());
574 int retval =
Flush(
true, transaction);
601 cvmfs::MsgHash object_id;
604 while (nbytes < size) {
605 uint64_t batch_size = std::min(size - nbytes,
607 cvmfs::MsgReadReq msg_read;
610 msg_read.set_allocated_object_id(&object_id);
611 msg_read.set_offset(offset + nbytes);
612 msg_read.set_size(batch_size);
613 RpcJob rpc_job(&msg_read);
617 msg_read.release_object_id();
620 if (msg_reply->status() == cvmfs::STATUS_OK) {
645 transaction->
size = 0;
653 cvmfs::MsgHash object_id;
655 cvmfs::MsgStoreAbortReq msg_abort;
658 msg_abort.set_allocated_object_id(&object_id);
659 RpcJob rpc_job(&msg_abort);
661 msg_abort.release_object_id();
670 const std::string &fqrn) {
674 cvmfs::MsgBreadcrumbLoadReq msg_breadcrumb_load;
677 msg_breadcrumb_load.set_fqrn(fqrn);
678 RpcJob rpc_job(&msg_breadcrumb_load);
683 if (msg_reply->status() == cvmfs::STATUS_OK) {
684 assert(msg_reply->has_breadcrumb());
685 assert(msg_reply->breadcrumb().fqrn() == fqrn);
690 breadcrumb.
timestamp = msg_reply->breadcrumb().timestamp();
691 if (msg_reply->breadcrumb().has_revision()) {
692 breadcrumb.
revision = msg_reply->breadcrumb().revision();
707 cvmfs::MsgBreadcrumb breadcrumb;
709 breadcrumb.set_allocated_hash(&hash);
711 breadcrumb.set_revision(manifest.
revision());
712 cvmfs::MsgBreadcrumbStoreReq msg_breadcrumb_store;
715 msg_breadcrumb_store.set_allocated_breadcrumb(&breadcrumb);
716 RpcJob rpc_job(&msg_breadcrumb_store);
718 msg_breadcrumb_store.release_breadcrumb();
719 breadcrumb.release_hash();
722 return msg_reply->status() == cvmfs::STATUS_OK;
738 if (cmd_line.empty())
743 set<int> preserve_filedes;
744 preserve_filedes.insert(pipe_ready[1]);
746 int fd_null_read = open(
"/dev/null", O_RDONLY);
747 int fd_null_write = open(
"/dev/null", O_WRONLY);
748 assert((fd_null_read >= 0) && (fd_null_write >= 0));
749 map<int, int> map_fildes;
750 map_fildes[fd_null_read] = 0;
751 map_fildes[fd_null_write] = 1;
752 map_fildes[fd_null_write] = 2;
767 close(fd_null_write);
770 "failed to start cache plugin '%s'",
777 "started cache plugin '%s' (pid %d), waiting for it to become ready",
779 close(pipe_ready[1]);
781 if (read(pipe_ready[0], &buf, 1) != 1) {
782 close(pipe_ready[0]);
784 "cache plugin did not start properly");
787 close(pipe_ready[0]);
792 "cache plugin failed to create an endpoint");
807 transaction->
buffer =
reinterpret_cast<unsigned char *
>(
823 "Transaction size (%" PRIu64
") > expected size (%" PRIu64
")",
829 uint64_t written = 0;
830 const unsigned char *read_pos =
reinterpret_cast<const unsigned char *
>(buf);
831 while (written < size) {
833 bool do_commit =
false;
836 int retval =
Flush(do_commit, transaction);
838 transaction->
size += written;
844 uint64_t remaining = size - written;
846 uint64_t batch_size = std::min(remaining, space_in_buffer);
847 memcpy(transaction->
buffer + transaction->
buf_pos, read_pos, batch_size);
848 transaction->
buf_pos += batch_size;
849 written += batch_size;
850 read_pos += batch_size;
860 vector<cvmfs::MsgListRecord> *result) {
864 uint64_t listing_id = 0;
865 bool more_data =
false;
867 cvmfs::MsgListReq msg_list;
870 msg_list.set_listing_id(listing_id);
871 msg_list.set_object_type(type);
876 if (msg_reply->status() != cvmfs::STATUS_OK)
878 more_data = !msg_reply->is_last_part();
879 listing_id = msg_reply->listing_id();
880 for (
int i = 0; i < msg_reply->list_record_size(); ++i) {
881 result->push_back(msg_reply->list_record(i));
893 cvmfs::MsgShrinkReq msg_shrink;
896 msg_shrink.set_shrink_to(leave_size);
901 return msg_reply->status() == cvmfs::STATUS_OK;
917 return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
919 cvmfs::MsgInfoReq msg_info;
926 if (msg_reply->status() == cvmfs::STATUS_OK) {
927 quota_info->
size = msg_reply->size_bytes();
928 quota_info->
used = msg_reply->used_bytes();
929 quota_info->
pinned = msg_reply->pinned_bytes();
930 if (msg_reply->no_shrink() >= 0)
931 quota_info->
no_shrink = msg_reply->no_shrink();
974 switch (capability) {
992 vector<string> result;
993 vector<cvmfs::MsgListRecord> raw_list;
994 bool retval =
DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
997 for (
unsigned i = 0; i < raw_list.size(); ++i)
998 result.push_back(raw_list[i].description());
1004 vector<string> result;
1005 vector<cvmfs::MsgListRecord> raw_list;
1006 bool retval =
DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
1009 for (
unsigned i = 0; i < raw_list.size(); ++i)
1010 result.push_back(raw_list[i].description());
1016 vector<string> result;
1017 vector<cvmfs::MsgListRecord> raw_lists[3];
1018 bool retval =
DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
1021 retval =
DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
1024 retval =
DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
1027 for (
unsigned i = 0; i <
sizeof(raw_lists) /
sizeof(raw_lists[0]); ++i) {
1028 for (
unsigned j = 0; j < raw_lists[i].size(); ++j) {
1029 if (raw_lists[i][j].pinned())
1030 result.push_back(raw_lists[i][j].description());
1038 vector<string> result;
1039 vector<cvmfs::MsgListRecord> raw_list;
1040 bool retval =
DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
1043 for (
unsigned i = 0; i < raw_list.size(); ++i)
1044 result.push_back(raw_list[i].description());
1050 const string &channel_id) {
1061 const string &channel_id) {
CacheTransport transport_
int Flush(bool do_commit, Transaction *transaction)
pthread_mutex_t lock_inflight_rpcs_
void AssignFrom(const FdTable< HandleT > &other)
virtual ~ExternalCacheManager()
virtual uint64_t GetSizePinned()
int ChangeRefcount(const shash::Any &id, int change_by)
void set_attachment_recv(void *data, unsigned size)
const manifest::Manifest * manifest() const
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
std::string GetDescription() const
virtual uint64_t GetCleanupRate(uint64_t period_s)
bool DoListing(cvmfs::EnumObjectType type, std::vector< cvmfs::MsgListRecord > *result)
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
int Ack2Errno(cvmfs::EnumStatus status_code)
virtual int Readahead(int fd)
virtual bool Cleanup(const uint64_t leave_size)
void BroadcastBackchannels(const std::string &message)
pthread_rwlock_t rwlock_fd_table_
cvmfs::MsgShrinkReply * msg_shrink_reply()
cvmfs::MsgRefcountReply * msg_refcount_reply()
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
int ConnectTcpEndpoint(const std::string &ipv4_address, int portno)
cvmfs::MsgReadReply * msg_read_reply()
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
virtual int AbortTxn(void *txn)
virtual std::string Describe()
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
virtual std::vector< std::string > ListCatalogs()
std::vector< RpcInFlight > inflight_rpcs_
static void * MainRead(void *data)
atomic_int64 next_request_id_
virtual int Reset(void *txn)
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void FillObjectType(int object_flags, cvmfs::EnumObjectType *wire_type)
virtual std::vector< std::string > ListPinned()
virtual std::vector< std::string > List()
static const char kReadyNotification
void set_attachment_send(void *data, unsigned size)
void MakePipe(int pipe_fd[2])
cvmfs::MsgListReply * msg_list_reply()
ExternalCacheManager(int fd_connection, unsigned max_open_fds)
virtual int Close(int fd)
uint64_t revision() const
ExternalCacheManager * cache_mgr_
cvmfs::MsgStoreReply * msg_store_reply()
FdTable< ReadOnlyHandle > fd_table_
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
cvmfs::MsgBreadcrumbReply * msg_breadcrumb_reply()
void MergeFrom(const Frame &other)
uint64_t publish_timestamp() const
vector< string > SplitString(const string &str, char delim)
std::string repository_name() const
uint32_t max_object_size_
const char kSuffixCatalog
virtual void * DoSaveState()
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
uint32_t att_size() const
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
CacheTransport::Frame * frame_send()
static const shash::Any kInvalidHandle
cvmfs::MsgObjectInfoReply * msg_object_info_reply()
shash::Any catalog_hash() const
string StringifyInt(const int64_t value)
int GetInfo(QuotaInfo *quota_info)
google::protobuf::MessageLite * GetMsgTyped()
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
std::map< shash::Md5, int > back_channels_
ExternalQuotaManager(ExternalCacheManager *cache_mgr)
static bool SpawnPlugin(const std::vector< std::string > &cmd_line)
cvmfs::MsgInfoReply * msg_info_reply()
int ConnectSocket(const std::string &path)
CacheTransport::Frame * frame_recv()
shash::Any GetHandle(int fd)
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
virtual int64_t GetSize(int fd)
static ExternalCacheManager * Create(int fd_connection, unsigned max_open_fds, const std::string &ident)
QuotaManager * quota_mgr_
pthread_mutex_t lock_send_fd_
static ExternalQuotaManager * Create(ExternalCacheManager *cache_mgr)
virtual bool DoFreeState(void *data)
void SafeSleepMs(const unsigned ms)
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
virtual bool HasCapability(Capabilities capability)
void set_attachment(void *attachment, uint32_t att_size)
void CallRemotely(RpcJob *rpc_job)
virtual std::vector< std::string > ListVolatile()
virtual int OpenFromTxn(void *txn)
static int ConnectLocator(const std::string &locator, bool print_error)
static PluginHandle * CreatePlugin(const std::string &locator, const std::vector< std::string > &cmd_line)
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
virtual uint64_t GetCapacity()
void Reset(uint32_t original_att_size)
virtual int CommitTxn(void *txn)
void ClosePipe(int pipe_fd[2])
virtual int DoRestoreState(void *data)
void UnlockBackChannels()
int fd_connection() const
static const char * kEnvReadyNotifyFd
virtual uint64_t GetSize()
int DoOpen(const shash::Any &id)
static const uint64_t kSizeUnknown
virtual int Open(const LabeledObject &object)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)