10 #include <sys/socket.h>
39 lock_tls_blocks_ =
reinterpret_cast<pthread_mutex_t *
>(
40 smalloc(
sizeof(pthread_mutex_t)));
41 int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
47 pthread_mutex_destroy(lock_tls_blocks_);
48 free(lock_tls_blocks_);
50 for (
unsigned i = 0; i < tls_blocks_.size(); ++i) {
51 delete tls_blocks_[i];
54 int retval = pthread_key_delete(thread_local_storage_);
60 if (instance_ == NULL) {
62 int retval = pthread_key_create(&instance_->thread_local_storage_,
73 pthread_getspecific(thread_local_storage_));
74 if ((tls == NULL) || !tls->
is_set) {
77 *client_instance = NULL;
88 pthread_getspecific(thread_local_storage_));
98 pthread_getspecific(thread_local_storage_));
102 int retval = pthread_setspecific(thread_local_storage_, tls);
105 tls_blocks_.push_back(tls);
121 for (vector<ThreadLocalStorage *>::iterator
122 i = instance_->tls_blocks_.begin(),
123 iEnd = instance_->tls_blocks_.end();
126 instance_->tls_blocks_.erase(i);
135 pthread_getspecific(thread_local_storage_));
149 : id(id), name(name) {
151 reponame = strdup(tokens[0].c_str());
152 if (tokens.size() > 1)
200 cvmfs::MsgBreadcrumbReply msg_reply;
203 msg_reply.set_req_id(msg_req->req_id());
205 bool retval = transport->
ParseMsgHash(msg_req->breadcrumb().hash(),
209 "malformed hash received from client");
210 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
212 breadcrumb.
timestamp = msg_req->breadcrumb().timestamp();
213 if (msg_req->breadcrumb().has_revision()) {
214 breadcrumb.
revision = msg_req->breadcrumb().revision();
218 cvmfs::EnumStatus status =
StoreBreadcrumb(msg_req->breadcrumb().fqrn(),
220 msg_reply.set_status(status);
229 cvmfs::MsgBreadcrumbReply msg_reply;
232 msg_reply.set_req_id(msg_req->req_id());
234 cvmfs::EnumStatus status =
LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
235 msg_reply.set_status(status);
236 if (status == cvmfs::STATUS_OK) {
238 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
240 cvmfs::MsgBreadcrumb *msg_breadcrumb =
new cvmfs::MsgBreadcrumb();
241 msg_breadcrumb->set_fqrn(msg_req->fqrn());
242 msg_breadcrumb->set_allocated_hash(msg_hash);
243 msg_breadcrumb->set_timestamp(breadcrumb.
timestamp);
244 msg_breadcrumb->set_revision(breadcrumb.
revision);
245 msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
254 if (msg_req->has_name()) {
258 session_id,
"anonymous client (" +
StringifyInt(session_id) +
")");
260 cvmfs::MsgHandshakeAck msg_ack;
263 msg_ack.set_status(cvmfs::STATUS_OK);
264 msg_ack.set_name(
name_);
267 msg_ack.set_session_id(session_id);
270 msg_ack.set_pid(getpid());
278 cvmfs::MsgInfoReply msg_reply;
281 msg_reply.set_req_id(msg_req->req_id());
283 cvmfs::EnumStatus status =
GetInfo(&info);
284 if (status != cvmfs::STATUS_OK) {
286 "failed to query cache status");
292 msg_reply.set_status(status);
298 if (!msg_req->has_conncnt_change_by())
300 int32_t conncnt_change_by = msg_req->conncnt_change_by();
303 "invalid request to drop connection counter below zero");
306 if (conncnt_change_by > 0) {
307 LogSessionInfo(msg_req->session_id(),
"lock session beyond lifetime");
318 cvmfs::MsgListReply msg_reply;
321 msg_reply.set_req_id(msg_req->req_id());
322 int64_t listing_id = msg_req->listing_id();
323 msg_reply.set_listing_id(listing_id);
324 msg_reply.set_is_last_part(
true);
326 cvmfs::EnumStatus status;
327 if (msg_req->listing_id() == 0) {
329 status =
ListingBegin(listing_id, msg_req->object_type());
330 if (status != cvmfs::STATUS_OK) {
332 "failed to start enumeration of objects");
333 msg_reply.set_status(status);
337 msg_reply.set_listing_id(listing_id);
342 unsigned total_size = 0;
343 while ((status =
ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
344 cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
345 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
347 msg_list_record->set_allocated_hash(msg_hash);
348 msg_list_record->set_pinned(item.
pinned);
349 msg_list_record->set_description(item.
description);
351 total_size +=
sizeof(item) + item.
description.length();
355 if (status == cvmfs::STATUS_OUTOFBOUNDS) {
357 status = cvmfs::STATUS_OK;
359 msg_reply.set_is_last_part(
false);
361 if (status != cvmfs::STATUS_OK) {
362 LogSessionError(msg_req->session_id(), status,
"failed enumerate objects");
364 msg_reply.set_status(status);
372 cvmfs::MsgObjectInfoReply msg_reply;
375 msg_reply.set_req_id(msg_req->req_id());
377 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
380 "malformed hash received from client");
381 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
385 msg_reply.set_status(status);
386 if (status == cvmfs::STATUS_OK) {
388 msg_reply.set_size(info.
size);
389 }
else if (status != cvmfs::STATUS_NOENTRY) {
391 "failed retrieving object details");
401 cvmfs::MsgReadReply msg_reply;
404 msg_reply.set_req_id(msg_req->req_id());
406 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
409 "malformed hash received from client");
410 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
414 unsigned size = msg_req->size();
416 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(smalloc(size));
418 unsigned char buffer[
size];
420 cvmfs::EnumStatus status =
Pread(object_id, msg_req->offset(), &
size, buffer);
421 msg_reply.set_status(status);
422 if (status == cvmfs::STATUS_OK) {
426 "failed to read from object");
438 cvmfs::MsgRefcountReply msg_reply;
441 msg_reply.set_req_id(msg_req->req_id());
443 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
446 "malformed hash received from client");
447 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
449 cvmfs::EnumStatus status =
ChangeRefcount(object_id, msg_req->change_by());
450 msg_reply.set_status(status);
451 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
453 "failed to open/close object " + object_id.
ToString());
465 bool retval = transport.
RecvFrame(&frame_recv);
468 "failed to receive request from connection (%d)", errno);
472 google::protobuf::MessageLite *msg_typed = frame_recv.
GetMsgTyped();
474 if (msg_typed->GetTypeName() ==
"cvmfs.MsgHandshake") {
475 cvmfs::MsgHandshake *msg_req =
reinterpret_cast<cvmfs::MsgHandshake *
>(
478 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgQuit") {
479 cvmfs::MsgQuit *msg_req =
reinterpret_cast<cvmfs::MsgQuit *
>(msg_typed);
480 map<uint64_t, SessionInfo>::const_iterator iter =
sessions_.find(
481 msg_req->session_id());
483 free(iter->second.reponame);
484 free(iter->second.client_instance);
488 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgIoctl") {
489 HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
490 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgRefcountReq") {
491 cvmfs::MsgRefcountReq *msg_req =
reinterpret_cast<cvmfs::MsgRefcountReq *
>(
494 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgObjectInfoReq") {
495 cvmfs::MsgObjectInfoReq
496 *msg_req =
reinterpret_cast<cvmfs::MsgObjectInfoReq *
>(msg_typed);
498 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgReadReq") {
499 cvmfs::MsgReadReq *msg_req =
reinterpret_cast<cvmfs::MsgReadReq *
>(
502 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreReq") {
503 cvmfs::MsgStoreReq *msg_req =
reinterpret_cast<cvmfs::MsgStoreReq *
>(
506 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreAbortReq") {
507 cvmfs::MsgStoreAbortReq
508 *msg_req =
reinterpret_cast<cvmfs::MsgStoreAbortReq *
>(msg_typed);
510 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgInfoReq") {
511 cvmfs::MsgInfoReq *msg_req =
reinterpret_cast<cvmfs::MsgInfoReq *
>(
514 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgShrinkReq") {
515 cvmfs::MsgShrinkReq *msg_req =
reinterpret_cast<cvmfs::MsgShrinkReq *
>(
518 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgListReq") {
519 cvmfs::MsgListReq *msg_req =
reinterpret_cast<cvmfs::MsgListReq *
>(
522 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbStoreReq") {
523 cvmfs::MsgBreadcrumbStoreReq
524 *msg_req =
reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *
>(msg_typed);
526 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbLoadReq") {
527 cvmfs::MsgBreadcrumbLoadReq
528 *msg_req =
reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *
>(msg_typed);
532 "unexpected message from client: %s",
533 std::string(msg_typed->GetTypeName()).c_str());
544 cvmfs::MsgShrinkReply msg_reply;
547 msg_reply.set_req_id(msg_req->req_id());
548 uint64_t used_bytes = 0;
549 cvmfs::EnumStatus status =
Shrink(msg_req->shrink_to(), &used_bytes);
550 msg_reply.set_used_bytes(used_bytes);
551 msg_reply.set_status(status);
552 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
553 LogSessionError(msg_req->session_id(), status,
"failed to cleanup cache");
562 cvmfs::MsgStoreReply msg_reply;
564 msg_reply.set_req_id(msg_req->req_id());
565 msg_reply.set_part_nr(0);
567 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
568 bool retval =
txn_ids_.Lookup(uniq_req, &txn_id);
571 "malformed transaction id received from client");
572 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
574 cvmfs::EnumStatus status =
AbortTxn(txn_id);
575 msg_reply.set_status(status);
576 if (status != cvmfs::STATUS_OK) {
578 "failed to abort transaction");
590 cvmfs::MsgStoreReply msg_reply;
592 msg_reply.set_req_id(msg_req->req_id());
593 msg_reply.set_part_nr(msg_req->part_nr());
595 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
599 "malformed hash or bad object size received from client");
600 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
605 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
607 cvmfs::EnumStatus status = cvmfs::STATUS_OK;
608 if (msg_req->part_nr() == 1) {
611 "invalid attempt to restart running transaction");
612 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
619 if (msg_req->has_expected_size()) {
620 info.
size = msg_req->expected_size();
622 if (msg_req->has_object_type()) {
625 if (msg_req->has_description()) {
628 status =
StartTxn(object_id, txn_id, info);
629 if (status != cvmfs::STATUS_OK) {
631 "failed to start transaction");
632 msg_reply.set_status(status);
638 retval =
txn_ids_.Lookup(uniq_req, &txn_id);
641 "invalid transaction received from client");
642 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
651 reinterpret_cast<unsigned char *>(frame->
attachment()),
653 if (status != cvmfs::STATUS_OK) {
654 LogSessionError(msg_req->session_id(), status,
"failure writing object");
655 msg_reply.set_status(status);
661 if (msg_req->last_part()) {
663 if (status != cvmfs::STATUS_OK) {
665 "failure committing object");
669 msg_reply.set_status(status);
679 if (tokens[0] ==
"unix") {
680 string lock_path = tokens[1] +
".lock";
684 "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
692 "failed to lock on %s, file is busy", lock_path.c_str());
699 }
else if (tokens[0] ==
"tcp") {
700 vector<string> tcp_address =
SplitString(tokens[1],
':');
701 if (tcp_address.size() != 2) {
710 "unknown endpoint in locator: %s", locator.c_str());
716 if (errno == EADDRINUSE) {
721 "failed to create endpoint %s (%d)", locator.c_str(), errno);
735 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
736 map<uint64_t, SessionInfo>::const_iterator iter =
sessions_.find(session_id);
738 session_str = iter->second.name;
741 session_str.c_str(), msg.c_str());
746 cvmfs::EnumStatus status,
747 const std::string &msg) {
748 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
749 map<uint64_t, SessionInfo>::const_iterator iter =
sessions_.find(session_id);
751 session_str = iter->second.name;
754 session_str.c_str(), msg.c_str(), status,
764 vector<struct pollfd> watch_fds;
766 struct pollfd watch_ctrl;
768 watch_ctrl.events = POLLIN | POLLPRI;
769 watch_fds.push_back(watch_ctrl);
770 struct pollfd watch_socket;
772 watch_socket.events = POLLIN | POLLPRI;
773 watch_fds.push_back(watch_socket);
775 bool terminated =
false;
776 while (!terminated) {
777 for (
unsigned i = 0; i < watch_fds.size(); ++i)
778 watch_fds[i].revents = 0;
779 int retval = poll(&watch_fds[0], watch_fds.size(), -1);
788 if (watch_fds[0].revents) {
790 ReadPipe(watch_fds[0].fd, &signal, 1);
797 if (watch_fds.size() > 2) {
799 "terminating external cache manager with pending connections");
805 if (watch_fds[1].revents) {
806 struct sockaddr_un remote;
807 socklen_t socket_size =
sizeof(remote);
808 int fd_con = accept(watch_fds[1].fd, (
struct sockaddr *)&remote,
812 "failed to establish connection (%d)", errno);
815 struct pollfd watch_con;
816 watch_con.fd = fd_con;
817 watch_con.events = POLLIN | POLLPRI;
818 watch_fds.push_back(watch_con);
823 for (
unsigned i = 2; i < watch_fds.size();) {
824 if (watch_fds[i].revents) {
827 close(watch_fds[i].fd);
829 watch_fds.erase(watch_fds.begin() + i);
834 "stopping cache plugin, no more active clients");
848 for (
unsigned i = 2; i < watch_fds.size(); ++i)
849 close(watch_fds[i].fd);
852 signal(SIGPIPE, save_sigpipe);
862 if (pipe_ready == NULL)
881 for (; iter != iter_end; ++iter) {
885 cvmfs::MsgDetach msg_detach;
void Get(uint64_t *id, char **reponame, char **client_instance)
int MakeSocket(const std::string &path, const int mode)
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
static void CleanupInstance()
void Set(uint64_t id, char *reponame, char *client_instance)
static const uint64_t kSizeUnknown
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
std::string ToString(const bool with_suffix=false) const
bool is_set
either not yet set or deliberately unset
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
std::set< int > connections_
std::map< uint64_t, SessionInfo > sessions_
uint64_t capabilities() const
void ProcessRequests(unsigned num_workers)
cvmfs::EnumObjectType object_type
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
static SessionCtx * instance_
static const unsigned kDefaultMaxObjectSize
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
void MakePipe(int pipe_fd[2])
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
uint64_t num_inlimbo_clients_
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
bool Listen(const std::string &locator)
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
vector< string > SplitString(const string &str, char delim)
static const unsigned kListingSize
static void TlsDestructor(void *data)
void NotifySupervisor(char signal)
void * attachment() const
unsigned max_object_size_
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
uint32_t att_size() const
static const char kSignalDetach
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
string StringifyInt(const int64_t value)
static const unsigned kPbProtocolVersion
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
void SendDetachRequests()
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
atomic_int64 next_lst_id_
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
static const char kSignalTerminate
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
atomic_int64 next_txn_id_
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
static const char * kEnvReadyNotifyFd
void LogSessionInfo(uint64_t session_id, const std::string &msg)
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
bool HandleRequest(int fd_con)
static SessionCtx * GetInstance()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)