10 #include <sys/socket.h>
39 lock_tls_blocks_ =
reinterpret_cast<pthread_mutex_t *
>(
40 smalloc(
sizeof(pthread_mutex_t)));
41 int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
47 pthread_mutex_destroy(lock_tls_blocks_);
48 free(lock_tls_blocks_);
50 for (
unsigned i = 0; i < tls_blocks_.size(); ++i) {
51 delete tls_blocks_[i];
54 int retval = pthread_key_delete(thread_local_storage_);
60 if (instance_ == NULL) {
63 pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
73 pthread_getspecific(thread_local_storage_));
74 if ((tls == NULL) || !tls->
is_set) {
77 *client_instance = NULL;
88 pthread_getspecific(thread_local_storage_));
98 pthread_getspecific(thread_local_storage_));
102 int retval = pthread_setspecific(thread_local_storage_, tls);
105 tls_blocks_.push_back(tls);
121 for (vector<ThreadLocalStorage *>::iterator i =
122 instance_->tls_blocks_.begin(), iEnd = instance_->tls_blocks_.end();
126 instance_->tls_blocks_.erase(i);
135 pthread_getspecific(thread_local_storage_));
153 reponame = strdup(tokens[0].c_str());
154 if (tokens.size() > 1)
201 cvmfs::MsgBreadcrumbStoreReq *msg_req,
205 cvmfs::MsgBreadcrumbReply msg_reply;
208 msg_reply.set_req_id(msg_req->req_id());
210 bool retval = transport->
ParseMsgHash(msg_req->breadcrumb().hash(),
214 "malformed hash received from client");
215 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
217 breadcrumb.
timestamp = msg_req->breadcrumb().timestamp();
218 if (msg_req->breadcrumb().has_revision()) {
219 breadcrumb.
revision = msg_req->breadcrumb().revision();
223 cvmfs::EnumStatus status =
225 msg_reply.set_status(status);
232 cvmfs::MsgBreadcrumbLoadReq *msg_req,
236 cvmfs::MsgBreadcrumbReply msg_reply;
239 msg_reply.set_req_id(msg_req->req_id());
241 cvmfs::EnumStatus status =
243 msg_reply.set_status(status);
244 if (status == cvmfs::STATUS_OK) {
246 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
248 cvmfs::MsgBreadcrumb *msg_breadcrumb =
new cvmfs::MsgBreadcrumb();
249 msg_breadcrumb->set_fqrn(msg_req->fqrn());
250 msg_breadcrumb->set_allocated_hash(msg_hash);
251 msg_breadcrumb->set_timestamp(breadcrumb.
timestamp);
252 msg_breadcrumb->set_revision(breadcrumb.
revision);
253 msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
260 cvmfs::MsgHandshake *msg_req,
264 if (msg_req->has_name()) {
270 cvmfs::MsgHandshakeAck msg_ack;
273 msg_ack.set_status(cvmfs::STATUS_OK);
274 msg_ack.set_name(
name_);
277 msg_ack.set_session_id(session_id);
280 msg_ack.set_pid(getpid());
286 cvmfs::MsgInfoReq *msg_req,
290 cvmfs::MsgInfoReply msg_reply;
293 msg_reply.set_req_id(msg_req->req_id());
295 cvmfs::EnumStatus status =
GetInfo(&info);
296 if (status != cvmfs::STATUS_OK) {
298 "failed to query cache status");
304 msg_reply.set_status(status);
310 if (!msg_req->has_conncnt_change_by())
312 int32_t conncnt_change_by = msg_req->conncnt_change_by();
315 "invalid request to drop connection counter below zero");
318 if (conncnt_change_by > 0) {
319 LogSessionInfo(msg_req->session_id(),
"lock session beyond lifetime");
328 cvmfs::MsgListReq *msg_req,
332 cvmfs::MsgListReply msg_reply;
335 msg_reply.set_req_id(msg_req->req_id());
336 int64_t listing_id = msg_req->listing_id();
337 msg_reply.set_listing_id(listing_id);
338 msg_reply.set_is_last_part(
true);
340 cvmfs::EnumStatus status;
341 if (msg_req->listing_id() == 0) {
343 status =
ListingBegin(listing_id, msg_req->object_type());
344 if (status != cvmfs::STATUS_OK) {
346 "failed to start enumeration of objects");
347 msg_reply.set_status(status);
351 msg_reply.set_listing_id(listing_id);
356 unsigned total_size = 0;
357 while ((status =
ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
358 cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
359 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
361 msg_list_record->set_allocated_hash(msg_hash);
362 msg_list_record->set_pinned(item.
pinned);
363 msg_list_record->set_description(item.
description);
365 total_size +=
sizeof(item) + item.
description.length();
369 if (status == cvmfs::STATUS_OUTOFBOUNDS) {
371 status = cvmfs::STATUS_OK;
373 msg_reply.set_is_last_part(
false);
375 if (status != cvmfs::STATUS_OK) {
376 LogSessionError(msg_req->session_id(), status,
"failed enumerate objects");
378 msg_reply.set_status(status);
384 cvmfs::MsgObjectInfoReq *msg_req,
388 cvmfs::MsgObjectInfoReply msg_reply;
391 msg_reply.set_req_id(msg_req->req_id());
393 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
396 "malformed hash received from client");
397 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
401 msg_reply.set_status(status);
402 if (status == cvmfs::STATUS_OK) {
404 msg_reply.set_size(info.
size);
405 }
else if (status != cvmfs::STATUS_NOENTRY) {
407 "failed retrieving object details");
415 cvmfs::MsgReadReq *msg_req,
419 cvmfs::MsgReadReply msg_reply;
422 msg_reply.set_req_id(msg_req->req_id());
424 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
427 "malformed hash received from client");
428 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
432 unsigned size = msg_req->size();
434 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(smalloc(size));
436 unsigned char buffer[
size];
438 cvmfs::EnumStatus status =
Pread(object_id, msg_req->offset(), &
size, buffer);
439 msg_reply.set_status(status);
440 if (status == cvmfs::STATUS_OK) {
444 "failed to read from object");
454 cvmfs::MsgRefcountReq *msg_req,
458 cvmfs::MsgRefcountReply msg_reply;
461 msg_reply.set_req_id(msg_req->req_id());
463 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
466 "malformed hash received from client");
467 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
469 cvmfs::EnumStatus status =
ChangeRefcount(object_id, msg_req->change_by());
470 msg_reply.set_status(status);
471 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
473 "failed to open/close object " + object_id.
ToString());
485 bool retval = transport.
RecvFrame(&frame_recv);
488 "failed to receive request from connection (%d)", errno);
492 google::protobuf::MessageLite *msg_typed = frame_recv.
GetMsgTyped();
494 if (msg_typed->GetTypeName() ==
"cvmfs.MsgHandshake") {
495 cvmfs::MsgHandshake *msg_req =
496 reinterpret_cast<cvmfs::MsgHandshake *
>(msg_typed);
498 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgQuit") {
499 cvmfs::MsgQuit *msg_req =
reinterpret_cast<cvmfs::MsgQuit *
>(msg_typed);
500 map<uint64_t, SessionInfo>::const_iterator iter =
503 free(iter->second.reponame);
504 free(iter->second.client_instance);
508 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgIoctl") {
509 HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
510 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgRefcountReq") {
511 cvmfs::MsgRefcountReq *msg_req =
512 reinterpret_cast<cvmfs::MsgRefcountReq *
>(msg_typed);
514 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgObjectInfoReq") {
515 cvmfs::MsgObjectInfoReq *msg_req =
516 reinterpret_cast<cvmfs::MsgObjectInfoReq *
>(msg_typed);
518 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgReadReq") {
519 cvmfs::MsgReadReq *msg_req =
520 reinterpret_cast<cvmfs::MsgReadReq *
>(msg_typed);
522 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreReq") {
523 cvmfs::MsgStoreReq *msg_req =
524 reinterpret_cast<cvmfs::MsgStoreReq *
>(msg_typed);
526 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreAbortReq") {
527 cvmfs::MsgStoreAbortReq *msg_req =
528 reinterpret_cast<cvmfs::MsgStoreAbortReq *
>(msg_typed);
530 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgInfoReq") {
531 cvmfs::MsgInfoReq *msg_req =
532 reinterpret_cast<cvmfs::MsgInfoReq *
>(msg_typed);
534 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgShrinkReq") {
535 cvmfs::MsgShrinkReq *msg_req =
536 reinterpret_cast<cvmfs::MsgShrinkReq *
>(msg_typed);
538 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgListReq") {
539 cvmfs::MsgListReq *msg_req =
540 reinterpret_cast<cvmfs::MsgListReq *
>(msg_typed);
542 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbStoreReq") {
543 cvmfs::MsgBreadcrumbStoreReq *msg_req =
544 reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *
>(msg_typed);
546 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbLoadReq") {
547 cvmfs::MsgBreadcrumbLoadReq *msg_req =
548 reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *
>(msg_typed);
552 "unexpected message from client: %s",
553 msg_typed->GetTypeName().c_str());
562 cvmfs::MsgShrinkReq *msg_req,
566 cvmfs::MsgShrinkReply msg_reply;
569 msg_reply.set_req_id(msg_req->req_id());
570 uint64_t used_bytes = 0;
571 cvmfs::EnumStatus status =
Shrink(msg_req->shrink_to(), &used_bytes);
572 msg_reply.set_used_bytes(used_bytes);
573 msg_reply.set_status(status);
574 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
575 LogSessionError(msg_req->session_id(), status,
"failed to cleanup cache");
582 cvmfs::MsgStoreAbortReq *msg_req,
586 cvmfs::MsgStoreReply msg_reply;
588 msg_reply.set_req_id(msg_req->req_id());
589 msg_reply.set_part_nr(0);
591 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
592 bool retval =
txn_ids_.Lookup(uniq_req, &txn_id);
595 "malformed transaction id received from client");
596 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
598 cvmfs::EnumStatus status =
AbortTxn(txn_id);
599 msg_reply.set_status(status);
600 if (status != cvmfs::STATUS_OK) {
602 "failed to abort transaction");
611 cvmfs::MsgStoreReq *msg_req,
616 cvmfs::MsgStoreReply msg_reply;
618 msg_reply.set_req_id(msg_req->req_id());
619 msg_reply.set_part_nr(msg_req->part_nr());
621 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
627 "malformed hash or bad object size received from client");
628 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
633 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
635 cvmfs::EnumStatus status = cvmfs::STATUS_OK;
636 if (msg_req->part_nr() == 1) {
639 "invalid attempt to restart running transaction");
640 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
647 if (msg_req->has_expected_size()) {info.
size = msg_req->expected_size();}
648 if (msg_req->has_object_type()) {info.
object_type = msg_req->object_type();}
649 if (msg_req->has_description()) {info.
description = msg_req->description();}
650 status =
StartTxn(object_id, txn_id, info);
651 if (status != cvmfs::STATUS_OK) {
653 "failed to start transaction");
654 msg_reply.set_status(status);
660 retval =
txn_ids_.Lookup(uniq_req, &txn_id);
663 "invalid transaction received from client");
664 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
673 reinterpret_cast<unsigned char *>(frame->
attachment()),
675 if (status != cvmfs::STATUS_OK) {
676 LogSessionError(msg_req->session_id(), status,
"failure writing object");
677 msg_reply.set_status(status);
683 if (msg_req->last_part()) {
685 if (status != cvmfs::STATUS_OK) {
687 "failure committing object");
691 msg_reply.set_status(status);
697 return atomic_read32(&
running_) != 0;
703 if (tokens[0] ==
"unix") {
704 string lock_path = tokens[1] +
".lock";
708 "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
716 "failed to lock on %s, file is busy", lock_path.c_str());
723 }
else if (tokens[0] ==
"tcp") {
724 vector<string> tcp_address =
SplitString(tokens[1],
':');
725 if (tcp_address.size() != 2) {
727 "invalid locator: %s", locator.c_str());
734 "unknown endpoint in locator: %s", locator.c_str());
740 if (errno == EADDRINUSE) {
745 "failed to create endpoint %s (%d)", locator.c_str(), errno);
759 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
760 map<uint64_t, SessionInfo>::const_iterator iter =
sessions_.find(session_id);
762 session_str = iter->second.name;
765 "session '%s': %s", session_str.c_str(), msg.c_str());
771 cvmfs::EnumStatus status,
772 const std::string &msg)
774 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
775 map<uint64_t, SessionInfo>::const_iterator iter =
sessions_.find(session_id);
777 session_str = iter->second.name;
780 "session '%s': %s (%d - %s)",
781 session_str.c_str(), msg.c_str(), status,
791 vector<struct pollfd> watch_fds;
793 struct pollfd watch_ctrl;
795 watch_ctrl.events = POLLIN | POLLPRI;
796 watch_fds.push_back(watch_ctrl);
797 struct pollfd watch_socket;
799 watch_socket.events = POLLIN | POLLPRI;
800 watch_fds.push_back(watch_socket);
802 bool terminated =
false;
803 while (!terminated) {
804 for (
unsigned i = 0; i < watch_fds.size(); ++i)
805 watch_fds[i].revents = 0;
806 int retval = poll(&watch_fds[0], watch_fds.size(), -1);
815 if (watch_fds[0].revents) {
817 ReadPipe(watch_fds[0].fd, &signal, 1);
824 if (watch_fds.size() > 2) {
826 "terminating external cache manager with pending connections");
832 if (watch_fds[1].revents) {
833 struct sockaddr_un remote;
834 socklen_t socket_size =
sizeof(remote);
836 accept(watch_fds[1].fd, (
struct sockaddr *)&remote, &socket_size);
839 "failed to establish connection (%d)", errno);
842 struct pollfd watch_con;
843 watch_con.fd = fd_con;
844 watch_con.events = POLLIN | POLLPRI;
845 watch_fds.push_back(watch_con);
850 for (
unsigned i = 2; i < watch_fds.size(); ) {
851 if (watch_fds[i].revents) {
854 close(watch_fds[i].fd);
856 watch_fds.erase(watch_fds.begin() + i);
862 "stopping cache plugin, no more active clients");
876 for (
unsigned i = 2; i < watch_fds.size(); ++i)
877 close(watch_fds[i].fd);
880 signal(SIGPIPE, save_sigpipe);
890 if (pipe_ready == NULL)
909 for (; iter != iter_end; ++iter) {
913 cvmfs::MsgDetach msg_detach;
void Get(uint64_t *id, char **reponame, char **client_instance)
int MakeSocket(const std::string &path, const int mode)
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
static void CleanupInstance()
void Set(uint64_t id, char *reponame, char *client_instance)
static const uint64_t kSizeUnknown
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
std::string ToString(const bool with_suffix=false) const
bool is_set
either not yet set or deliberately unset
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
std::set< int > connections_
std::map< uint64_t, SessionInfo > sessions_
uint64_t capabilities() const
void ProcessRequests(unsigned num_workers)
cvmfs::EnumObjectType object_type
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
static SessionCtx * instance_
static const unsigned kDefaultMaxObjectSize
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
void MakePipe(int pipe_fd[2])
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
uint64_t num_inlimbo_clients_
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
bool Listen(const std::string &locator)
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
vector< string > SplitString(const string &str, char delim)
static const unsigned kListingSize
static void TlsDestructor(void *data)
void NotifySupervisor(char signal)
void * attachment() const
unsigned max_object_size_
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
uint32_t att_size() const
static const char kSignalDetach
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
string StringifyInt(const int64_t value)
static const unsigned kPbProtocolVersion
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
void SendDetachRequests()
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
atomic_int64 next_lst_id_
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
static const char kSignalTerminate
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
atomic_int64 next_txn_id_
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
static const char * kEnvReadyNotifyFd
void LogSessionInfo(uint64_t session_id, const std::string &msg)
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
bool HandleRequest(int fd_con)
static SessionCtx * GetInstance()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)