11 #include <sys/socket.h>
43 switch (status_code) {
44 case cvmfs::STATUS_OK:
46 case cvmfs::STATUS_NOSUPPORT:
48 case cvmfs::STATUS_FORBIDDEN:
50 case cvmfs::STATUS_NOSPACE:
52 case cvmfs::STATUS_NOENTRY:
54 case cvmfs::STATUS_MALFORMED:
56 case cvmfs::STATUS_IOERR:
58 case cvmfs::STATUS_CORRUPTED:
60 case cvmfs::STATUS_TIMEOUT:
62 case cvmfs::STATUS_BADCOUNT:
64 case cvmfs::STATUS_OUTOFBOUNDS:
77 int result = Reset(txn);
79 free(reinterpret_cast<Transaction *>(txn)->buffer);
87 quota_mgr_ = quota_mgr;
100 bool retval = transport_.RecvFrame(rpc_job->
frame_recv());
103 google::protobuf::MessageLite *msg_typed =
105 assert(msg_typed->GetTypeName() ==
"cvmfs.MsgDetach");
106 quota_mgr_->BroadcastBackchannels(
"R");
115 inflight_rpcs_.push_back(
RpcInFlight(rpc_job, &signal));
127 cvmfs::MsgHash object_id;
128 transport_.FillMsgHash(
id, &object_id);
129 cvmfs::MsgRefcountReq msg_refcount;
130 msg_refcount.set_session_id(session_id_);
131 msg_refcount.set_req_id(NextRequestId());
132 msg_refcount.set_allocated_object_id(&object_id);
133 msg_refcount.set_change_by(change_by);
134 RpcJob rpc_job(&msg_refcount);
135 CallRemotely(&rpc_job);
136 msg_refcount.release_object_id();
147 handle = fd_table_.GetHandle(fd);
148 if (handle.
id == kInvalidHandle)
150 int retval = fd_table_.CloseFd(fd);
154 return ChangeRefcount(handle.
id, -1);
162 int retval = Flush(
true, transaction);
166 int refcount = transaction->
open_fds - 1;
168 return ChangeRefcount(transaction->
id, refcount);
170 free(transaction->
buffer);
177 const std::string &locator,
bool print_error)
181 if (tokens[0] ==
"unix") {
183 }
else if (tokens[0] ==
"tcp") {
184 vector<string> tcp_address =
SplitString(tokens[1],
':');
185 if (tcp_address.size() != 2)
195 "Failed to connect to socket: %s", strerror(errno));
198 "Failed to connect to socket (unknown error)");
204 "connected to cache plugin at %s", locator.c_str());
211 unsigned max_open_fds,
218 cvmfs::MsgHandshake msg_handshake;
219 msg_handshake.set_protocol_version(kPbProtocolVersion);
220 msg_handshake.set_name(ident);
228 google::protobuf::MessageLite *msg_typed = frame_recv.
GetMsgTyped();
229 if (msg_typed->GetTypeName() !=
"cvmfs.MsgHandshakeAck")
231 cvmfs::MsgHandshakeAck *msg_ack =
232 reinterpret_cast<cvmfs::MsgHandshakeAck *
>(msg_typed);
239 "external cache manager object size too large (%u)",
245 "external cache manager object size too small (%u)",
249 if (msg_ack->has_pid())
262 const std::string &locator,
263 const std::vector<std::string> &cmd_line)
266 unsigned num_attempts = 0;
267 bool try_again =
false;
270 if (num_attempts > 2) {
274 plugin_handle->fd_connection_ = ConnectLocator(locator, num_attempts > 1);
275 if (plugin_handle->
IsValid()) {
277 }
else if (plugin_handle->fd_connection_ == -EINVAL) {
279 "Invalid locator: %s", locator.c_str());
280 plugin_handle->error_msg_ =
"Invalid locator: " + locator;
283 if (num_attempts > 1) {
285 "Failed to connect to external cache manager: %d",
286 plugin_handle->fd_connection_);
288 plugin_handle->error_msg_ =
"Failed to connect to external cache manager";
291 try_again = SpawnPlugin(cmd_line);
294 return plugin_handle.
Release();
304 transaction->
label = label;
310 return "External cache manager\n";
334 int status_refcnt = ChangeRefcount(
id, 1);
335 if (status_refcnt == 0)
339 int retval = fd_table_.CloseFd(fd);
341 return status_refcnt;
348 for (
unsigned i = 1; i < fd_table_.GetMaxFds(); ++i) {
356 cvmfs::MsgIoctl msg_ioctl;
357 msg_ioctl.set_session_id(session_id_);
358 msg_ioctl.set_conncnt_change_by(-1);
360 transport_.SendFrame(&frame);
362 int new_root_fd = -1;
364 new_root_fd = fd_table_.OpenFd(handle_root);
374 cvmfs::MsgIoctl msg_ioctl;
375 msg_ioctl.set_session_id(session_id_);
376 msg_ioctl.set_conncnt_change_by(1);
378 transport_.SendFrame(&frame);
379 return fd_table_.Clone();
385 if (
id == kInvalidHandle)
393 unsigned max_open_fds)
396 , transport_(fd_connection)
398 , max_object_size_(0)
401 , capabilities_(cvmfs::CAP_NONE)
418 cvmfs::MsgQuit msg_quit;
438 cvmfs::MsgHash object_id;
440 cvmfs::MsgStoreReq msg_store;
443 msg_store.set_allocated_object_id(&object_id);
446 msg_store.set_last_part(do_commit);
449 cvmfs::EnumObjectType object_type;
451 msg_store.set_object_type(object_type);
455 RpcJob rpc_job(&msg_store);
459 msg_store.release_object_id();
462 if (msg_reply->status() == cvmfs::STATUS_OK) {
483 cvmfs::MsgHash object_id;
485 cvmfs::MsgObjectInfoReq msg_info;
488 msg_info.set_allocated_object_id(&object_id);
489 RpcJob rpc_job(&msg_info);
491 msg_info.release_object_id();
494 if (msg_reply->status() == cvmfs::STATUS_OK) {
495 assert(msg_reply->has_size());
496 return msg_reply->size();
516 uint64_t part_nr = 0;
517 google::protobuf::MessageLite *msg = frame_recv.
GetMsgTyped();
518 if (msg->GetTypeName() ==
"cvmfs.MsgRefcountReply") {
519 req_id =
reinterpret_cast<cvmfs::MsgRefcountReply *
>(msg)->req_id();
520 }
else if (msg->GetTypeName() ==
"cvmfs.MsgObjectInfoReply") {
521 req_id =
reinterpret_cast<cvmfs::MsgObjectInfoReply *
>(msg)->req_id();
522 }
else if (msg->GetTypeName() ==
"cvmfs.MsgReadReply") {
523 req_id =
reinterpret_cast<cvmfs::MsgReadReply *
>(msg)->req_id();
524 }
else if (msg->GetTypeName() ==
"cvmfs.MsgStoreReply") {
525 req_id =
reinterpret_cast<cvmfs::MsgStoreReply *
>(msg)->req_id();
526 part_nr =
reinterpret_cast<cvmfs::MsgStoreReply *
>(msg)->part_nr();
527 }
else if (msg->GetTypeName() ==
"cvmfs.MsgInfoReply") {
528 req_id =
reinterpret_cast<cvmfs::MsgInfoReply *
>(msg)->req_id();
529 }
else if (msg->GetTypeName() ==
"cvmfs.MsgShrinkReply") {
530 req_id =
reinterpret_cast<cvmfs::MsgShrinkReply *
>(msg)->req_id();
531 }
else if (msg->GetTypeName() ==
"cvmfs.MsgListReply") {
532 req_id =
reinterpret_cast<cvmfs::MsgListReply *
>(msg)->req_id();
533 }
else if (msg->GetTypeName() ==
"cvmfs.MsgBreadcrumbReply") {
534 req_id =
reinterpret_cast<cvmfs::MsgBreadcrumbReply *
>(msg)->req_id();
535 }
else if (msg->GetTypeName() ==
"cvmfs.MsgDetach") {
541 msg->GetTypeName().c_str());
549 if ((rpc_job->
req_id() == req_id) && (rpc_job->
part_nr() == part_nr)) {
557 if (rpc_inflight.
rpc_job == NULL) {
559 "got unmatched rpc reply");
568 "connection to external cache manager broken (%d)", errno);
584 int retval =
Flush(
true, transaction);
613 cvmfs::MsgHash object_id;
616 while (nbytes < size) {
617 uint64_t batch_size =
619 cvmfs::MsgReadReq msg_read;
622 msg_read.set_allocated_object_id(&object_id);
623 msg_read.set_offset(offset + nbytes);
624 msg_read.set_size(batch_size);
625 RpcJob rpc_job(&msg_read);
629 msg_read.release_object_id();
632 if (msg_reply->status() == cvmfs::STATUS_OK) {
657 transaction->
size = 0;
665 cvmfs::MsgHash object_id;
667 cvmfs::MsgStoreAbortReq msg_abort;
670 msg_abort.set_allocated_object_id(&object_id);
671 RpcJob rpc_job(&msg_abort);
673 msg_abort.release_object_id();
682 const std::string &fqrn)
687 cvmfs::MsgBreadcrumbLoadReq msg_breadcrumb_load;
690 msg_breadcrumb_load.set_fqrn(fqrn);
691 RpcJob rpc_job(&msg_breadcrumb_load);
696 if (msg_reply->status() == cvmfs::STATUS_OK) {
697 assert(msg_reply->has_breadcrumb());
698 assert(msg_reply->breadcrumb().fqrn() == fqrn);
703 breadcrumb.
timestamp = msg_reply->breadcrumb().timestamp();
704 if (msg_reply->breadcrumb().has_revision()) {
705 breadcrumb.
revision = msg_reply->breadcrumb().revision();
720 cvmfs::MsgBreadcrumb breadcrumb;
722 breadcrumb.set_allocated_hash(&hash);
724 breadcrumb.set_revision(manifest.
revision());
725 cvmfs::MsgBreadcrumbStoreReq msg_breadcrumb_store;
728 msg_breadcrumb_store.set_allocated_breadcrumb(&breadcrumb);
729 RpcJob rpc_job(&msg_breadcrumb_store);
731 msg_breadcrumb_store.release_breadcrumb();
732 breadcrumb.release_hash();
735 return msg_reply->status() == cvmfs::STATUS_OK;
751 if (cmd_line.empty())
756 set<int> preserve_filedes;
757 preserve_filedes.insert(pipe_ready[1]);
759 int fd_null_read = open(
"/dev/null", O_RDONLY);
760 int fd_null_write = open(
"/dev/null", O_WRONLY);
761 assert((fd_null_read >= 0) && (fd_null_write >= 0));
762 map<int, int> map_fildes;
763 map_fildes[fd_null_read] = 0;
764 map_fildes[fd_null_write] = 1;
765 map_fildes[fd_null_write] = 2;
780 close(fd_null_write);
783 "failed to start cache plugin '%s'",
790 "started cache plugin '%s' (pid %d), waiting for it to become ready",
792 close(pipe_ready[1]);
794 if (read(pipe_ready[0], &buf, 1) != 1) {
795 close(pipe_ready[0]);
797 "cache plugin did not start properly");
800 close(pipe_ready[0]);
805 "cache plugin failed to create an endpoint");
838 "Transaction size (%" PRIu64
") > expected size (%" PRIu64
")",
844 uint64_t written = 0;
845 const unsigned char *read_pos =
reinterpret_cast<const unsigned char *
>(buf);
846 while (written < size) {
848 bool do_commit =
false;
851 int retval =
Flush(do_commit, transaction);
853 transaction->
size += written;
859 uint64_t remaining = size - written;
861 uint64_t batch_size = std::min(remaining, space_in_buffer);
862 memcpy(transaction->
buffer + transaction->
buf_pos, read_pos, batch_size);
863 transaction->
buf_pos += batch_size;
864 written += batch_size;
865 read_pos += batch_size;
875 cvmfs::EnumObjectType
type,
876 vector<cvmfs::MsgListRecord> *result)
881 uint64_t listing_id = 0;
882 bool more_data =
false;
884 cvmfs::MsgListReq msg_list;
887 msg_list.set_listing_id(listing_id);
888 msg_list.set_object_type(type);
893 if (msg_reply->status() != cvmfs::STATUS_OK)
895 more_data = !msg_reply->is_last_part();
896 listing_id = msg_reply->listing_id();
897 for (
int i = 0; i < msg_reply->list_record_size(); ++i) {
898 result->push_back(msg_reply->list_record(i));
910 cvmfs::MsgShrinkReq msg_shrink;
913 msg_shrink.set_shrink_to(leave_size);
918 return msg_reply->status() == cvmfs::STATUS_OK;
935 return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
937 cvmfs::MsgInfoReq msg_info;
944 if (msg_reply->status() == cvmfs::STATUS_OK) {
945 quota_info->
size = msg_reply->size_bytes();
946 quota_info->
used = msg_reply->used_bytes();
947 quota_info->
pinned = msg_reply->pinned_bytes();
948 if (msg_reply->no_shrink() >= 0)
949 quota_info->
no_shrink = msg_reply->no_shrink();
992 switch (capability) {
1010 vector<string> result;
1011 vector<cvmfs::MsgListRecord> raw_list;
1012 bool retval =
DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
1015 for (
unsigned i = 0; i < raw_list.size(); ++i)
1016 result.push_back(raw_list[i].description());
1022 vector<string> result;
1023 vector<cvmfs::MsgListRecord> raw_list;
1024 bool retval =
DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
1027 for (
unsigned i = 0; i < raw_list.size(); ++i)
1028 result.push_back(raw_list[i].description());
1034 vector<string> result;
1035 vector<cvmfs::MsgListRecord> raw_lists[3];
1036 bool retval =
DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
1039 retval =
DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
1042 retval =
DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
1045 for (
unsigned i = 0; i <
sizeof(raw_lists) /
sizeof(raw_lists[0]); ++i) {
1046 for (
unsigned j = 0; j < raw_lists[i].size(); ++j) {
1047 if (raw_lists[i][j].pinned())
1048 result.push_back(raw_lists[i][j].description());
1056 vector<string> result;
1057 vector<cvmfs::MsgListRecord> raw_list;
1058 bool retval =
DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
1061 for (
unsigned i = 0; i < raw_list.size(); ++i)
1062 result.push_back(raw_list[i].description());
1068 int back_channel[2],
1069 const string &channel_id)
1081 int back_channel[2],
1082 const string &channel_id)
CacheTransport transport_
int Flush(bool do_commit, Transaction *transaction)
pthread_mutex_t lock_inflight_rpcs_
void AssignFrom(const FdTable< HandleT > &other)
virtual ~ExternalCacheManager()
virtual uint64_t GetSizePinned()
int ChangeRefcount(const shash::Any &id, int change_by)
void set_attachment_recv(void *data, unsigned size)
const manifest::Manifest * manifest() const
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
std::string GetDescription() const
virtual uint64_t GetCleanupRate(uint64_t period_s)
bool DoListing(cvmfs::EnumObjectType type, std::vector< cvmfs::MsgListRecord > *result)
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
int Ack2Errno(cvmfs::EnumStatus status_code)
virtual int Readahead(int fd)
virtual bool Cleanup(const uint64_t leave_size)
void BroadcastBackchannels(const std::string &message)
pthread_rwlock_t rwlock_fd_table_
cvmfs::MsgShrinkReply * msg_shrink_reply()
cvmfs::MsgRefcountReply * msg_refcount_reply()
string JoinStrings(const vector< string > &strings, const string &joint)
std::string ToString(const bool with_suffix=false) const
int ConnectTcpEndpoint(const std::string &ipv4_address, int portno)
cvmfs::MsgReadReply * msg_read_reply()
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
virtual int AbortTxn(void *txn)
virtual std::string Describe()
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
virtual std::vector< std::string > ListCatalogs()
std::vector< RpcInFlight > inflight_rpcs_
static void * MainRead(void *data)
atomic_int64 next_request_id_
virtual int Reset(void *txn)
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void FillObjectType(int object_flags, cvmfs::EnumObjectType *wire_type)
virtual std::vector< std::string > ListPinned()
virtual std::vector< std::string > List()
static const char kReadyNotification
void set_attachment_send(void *data, unsigned size)
void MakePipe(int pipe_fd[2])
cvmfs::MsgListReply * msg_list_reply()
ExternalCacheManager(int fd_connection, unsigned max_open_fds)
virtual int Close(int fd)
uint64_t revision() const
ExternalCacheManager * cache_mgr_
cvmfs::MsgStoreReply * msg_store_reply()
FdTable< ReadOnlyHandle > fd_table_
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
cvmfs::MsgBreadcrumbReply * msg_breadcrumb_reply()
void MergeFrom(const Frame &other)
uint64_t publish_timestamp() const
vector< string > SplitString(const string &str, char delim)
std::string repository_name() const
uint32_t max_object_size_
const char kSuffixCatalog
virtual void * DoSaveState()
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
uint32_t att_size() const
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
CacheTransport::Frame * frame_send()
static const shash::Any kInvalidHandle
cvmfs::MsgObjectInfoReply * msg_object_info_reply()
shash::Any catalog_hash() const
string StringifyInt(const int64_t value)
int GetInfo(QuotaInfo *quota_info)
google::protobuf::MessageLite * GetMsgTyped()
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
std::map< shash::Md5, int > back_channels_
ExternalQuotaManager(ExternalCacheManager *cache_mgr)
static bool SpawnPlugin(const std::vector< std::string > &cmd_line)
cvmfs::MsgInfoReply * msg_info_reply()
int ConnectSocket(const std::string &path)
CacheTransport::Frame * frame_recv()
shash::Any GetHandle(int fd)
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
virtual int64_t GetSize(int fd)
static ExternalCacheManager * Create(int fd_connection, unsigned max_open_fds, const std::string &ident)
QuotaManager * quota_mgr_
pthread_mutex_t lock_send_fd_
static ExternalQuotaManager * Create(ExternalCacheManager *cache_mgr)
virtual bool DoFreeState(void *data)
void SafeSleepMs(const unsigned ms)
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
virtual bool HasCapability(Capabilities capability)
void set_attachment(void *attachment, uint32_t att_size)
void CallRemotely(RpcJob *rpc_job)
virtual std::vector< std::string > ListVolatile()
virtual int OpenFromTxn(void *txn)
static int ConnectLocator(const std::string &locator, bool print_error)
static PluginHandle * CreatePlugin(const std::string &locator, const std::vector< std::string > &cmd_line)
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
virtual uint64_t GetCapacity()
void Reset(uint32_t original_att_size)
virtual int CommitTxn(void *txn)
void ClosePipe(int pipe_fd[2])
virtual int DoRestoreState(void *data)
void UnlockBackChannels()
int fd_connection() const
static const char * kEnvReadyNotifyFd
virtual uint64_t GetSize()
int DoOpen(const shash::Any &id)
static const uint64_t kSizeUnknown
virtual int Open(const LabeledObject &object)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)