4 #include "cvmfs_config.h"
10 #include <sys/socket.h>
39 lock_tls_blocks_ =
reinterpret_cast<pthread_mutex_t *
>(
40 smalloc(
sizeof(pthread_mutex_t)));
41 int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
47 pthread_mutex_destroy(lock_tls_blocks_);
48 free(lock_tls_blocks_);
50 for (
unsigned i = 0; i < tls_blocks_.size(); ++i) {
51 delete tls_blocks_[i];
54 int retval = pthread_key_delete(thread_local_storage_);
60 if (instance_ == NULL) {
63 pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
73 pthread_getspecific(thread_local_storage_));
74 if ((tls == NULL) || !tls->
is_set) {
77 *client_instance = NULL;
88 pthread_getspecific(thread_local_storage_));
98 pthread_getspecific(thread_local_storage_));
102 int retval = pthread_setspecific(thread_local_storage_, tls);
105 tls_blocks_.push_back(tls);
121 for (vector<ThreadLocalStorage *>::iterator i =
122 instance_->tls_blocks_.begin(), iEnd = instance_->tls_blocks_.end();
126 instance_->tls_blocks_.erase(i);
135 pthread_getspecific(thread_local_storage_));
153 reponame = strdup(tokens[0].c_str());
154 if (tokens.size() > 1)
201 cvmfs::MsgBreadcrumbStoreReq *msg_req,
205 cvmfs::MsgBreadcrumbReply msg_reply;
208 msg_reply.set_req_id(msg_req->req_id());
210 bool retval = transport->
ParseMsgHash(msg_req->breadcrumb().hash(),
214 "malformed hash received from client");
215 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
217 breadcrumb.
timestamp = msg_req->breadcrumb().timestamp();
218 cvmfs::EnumStatus status =
220 msg_reply.set_status(status);
227 cvmfs::MsgBreadcrumbLoadReq *msg_req,
231 cvmfs::MsgBreadcrumbReply msg_reply;
234 msg_reply.set_req_id(msg_req->req_id());
236 cvmfs::EnumStatus status =
238 msg_reply.set_status(status);
239 if (status == cvmfs::STATUS_OK) {
241 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
243 cvmfs::MsgBreadcrumb *msg_breadcrumb =
new cvmfs::MsgBreadcrumb();
244 msg_breadcrumb->set_fqrn(msg_req->fqrn());
245 msg_breadcrumb->set_allocated_hash(msg_hash);
246 msg_breadcrumb->set_timestamp(breadcrumb.
timestamp);
247 msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
254 cvmfs::MsgHandshake *msg_req,
258 if (msg_req->has_name()) {
264 cvmfs::MsgHandshakeAck msg_ack;
267 msg_ack.set_status(cvmfs::STATUS_OK);
268 msg_ack.set_name(
name_);
271 msg_ack.set_session_id(session_id);
274 msg_ack.set_pid(getpid());
280 cvmfs::MsgInfoReq *msg_req,
284 cvmfs::MsgInfoReply msg_reply;
287 msg_reply.set_req_id(msg_req->req_id());
289 cvmfs::EnumStatus status =
GetInfo(&info);
290 if (status != cvmfs::STATUS_OK) {
292 "failed to query cache status");
298 msg_reply.set_status(status);
304 if (!msg_req->has_conncnt_change_by())
306 int32_t conncnt_change_by = msg_req->conncnt_change_by();
309 "invalid request to drop connection counter below zero");
312 if (conncnt_change_by > 0) {
313 LogSessionInfo(msg_req->session_id(),
"lock session beyond lifetime");
322 cvmfs::MsgListReq *msg_req,
326 cvmfs::MsgListReply msg_reply;
329 msg_reply.set_req_id(msg_req->req_id());
330 int64_t listing_id = msg_req->listing_id();
331 msg_reply.set_listing_id(listing_id);
332 msg_reply.set_is_last_part(
true);
334 cvmfs::EnumStatus status;
335 if (msg_req->listing_id() == 0) {
337 status =
ListingBegin(listing_id, msg_req->object_type());
338 if (status != cvmfs::STATUS_OK) {
340 "failed to start enumeration of objects");
341 msg_reply.set_status(status);
345 msg_reply.set_listing_id(listing_id);
350 unsigned total_size = 0;
351 while ((status =
ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
352 cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
353 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
355 msg_list_record->set_allocated_hash(msg_hash);
356 msg_list_record->set_pinned(item.
pinned);
357 msg_list_record->set_description(item.
description);
359 total_size +=
sizeof(item) + item.
description.length();
363 if (status == cvmfs::STATUS_OUTOFBOUNDS) {
365 status = cvmfs::STATUS_OK;
367 msg_reply.set_is_last_part(
false);
369 if (status != cvmfs::STATUS_OK) {
370 LogSessionError(msg_req->session_id(), status,
"failed enumerate objects");
372 msg_reply.set_status(status);
378 cvmfs::MsgObjectInfoReq *msg_req,
382 cvmfs::MsgObjectInfoReply msg_reply;
385 msg_reply.set_req_id(msg_req->req_id());
387 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
390 "malformed hash received from client");
391 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
395 msg_reply.set_status(status);
396 if (status == cvmfs::STATUS_OK) {
398 msg_reply.set_size(info.
size);
399 }
else if (status != cvmfs::STATUS_NOENTRY) {
401 "failed retrieving object details");
409 cvmfs::MsgReadReq *msg_req,
413 cvmfs::MsgReadReply msg_reply;
416 msg_reply.set_req_id(msg_req->req_id());
418 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
421 "malformed hash received from client");
422 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
426 unsigned size = msg_req->size();
428 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(smalloc(size));
430 unsigned char buffer[
size];
432 cvmfs::EnumStatus status =
Pread(object_id, msg_req->offset(), &
size, buffer);
433 msg_reply.set_status(status);
434 if (status == cvmfs::STATUS_OK) {
438 "failed to read from object");
448 cvmfs::MsgRefcountReq *msg_req,
452 cvmfs::MsgRefcountReply msg_reply;
455 msg_reply.set_req_id(msg_req->req_id());
457 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
460 "malformed hash received from client");
461 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
463 cvmfs::EnumStatus status =
ChangeRefcount(object_id, msg_req->change_by());
464 msg_reply.set_status(status);
465 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
467 "failed to open/close object " + object_id.
ToString());
479 bool retval = transport.
RecvFrame(&frame_recv);
482 "failed to receive request from connection (%d)", errno);
486 google::protobuf::MessageLite *msg_typed = frame_recv.
GetMsgTyped();
488 if (msg_typed->GetTypeName() ==
"cvmfs.MsgHandshake") {
489 cvmfs::MsgHandshake *msg_req =
490 reinterpret_cast<cvmfs::MsgHandshake *
>(msg_typed);
492 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgQuit") {
493 cvmfs::MsgQuit *msg_req =
reinterpret_cast<cvmfs::MsgQuit *
>(msg_typed);
494 map<uint64_t, SessionInfo>::const_iterator iter =
497 free(iter->second.reponame);
498 free(iter->second.client_instance);
502 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgIoctl") {
503 HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
504 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgRefcountReq") {
505 cvmfs::MsgRefcountReq *msg_req =
506 reinterpret_cast<cvmfs::MsgRefcountReq *
>(msg_typed);
508 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgObjectInfoReq") {
509 cvmfs::MsgObjectInfoReq *msg_req =
510 reinterpret_cast<cvmfs::MsgObjectInfoReq *
>(msg_typed);
512 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgReadReq") {
513 cvmfs::MsgReadReq *msg_req =
514 reinterpret_cast<cvmfs::MsgReadReq *
>(msg_typed);
516 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreReq") {
517 cvmfs::MsgStoreReq *msg_req =
518 reinterpret_cast<cvmfs::MsgStoreReq *
>(msg_typed);
520 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreAbortReq") {
521 cvmfs::MsgStoreAbortReq *msg_req =
522 reinterpret_cast<cvmfs::MsgStoreAbortReq *
>(msg_typed);
524 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgInfoReq") {
525 cvmfs::MsgInfoReq *msg_req =
526 reinterpret_cast<cvmfs::MsgInfoReq *
>(msg_typed);
528 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgShrinkReq") {
529 cvmfs::MsgShrinkReq *msg_req =
530 reinterpret_cast<cvmfs::MsgShrinkReq *
>(msg_typed);
532 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgListReq") {
533 cvmfs::MsgListReq *msg_req =
534 reinterpret_cast<cvmfs::MsgListReq *
>(msg_typed);
536 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbStoreReq") {
537 cvmfs::MsgBreadcrumbStoreReq *msg_req =
538 reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *
>(msg_typed);
540 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbLoadReq") {
541 cvmfs::MsgBreadcrumbLoadReq *msg_req =
542 reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *
>(msg_typed);
546 "unexpected message from client: %s",
547 msg_typed->GetTypeName().c_str());
556 cvmfs::MsgShrinkReq *msg_req,
560 cvmfs::MsgShrinkReply msg_reply;
563 msg_reply.set_req_id(msg_req->req_id());
564 uint64_t used_bytes = 0;
565 cvmfs::EnumStatus status =
Shrink(msg_req->shrink_to(), &used_bytes);
566 msg_reply.set_used_bytes(used_bytes);
567 msg_reply.set_status(status);
568 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
569 LogSessionError(msg_req->session_id(), status,
"failed to cleanup cache");
576 cvmfs::MsgStoreAbortReq *msg_req,
580 cvmfs::MsgStoreReply msg_reply;
582 msg_reply.set_req_id(msg_req->req_id());
583 msg_reply.set_part_nr(0);
585 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
586 bool retval =
txn_ids_.Lookup(uniq_req, &txn_id);
589 "malformed transaction id received from client");
590 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
592 cvmfs::EnumStatus status =
AbortTxn(txn_id);
593 msg_reply.set_status(status);
594 if (status != cvmfs::STATUS_OK) {
596 "failed to abort transaction");
605 cvmfs::MsgStoreReq *msg_req,
610 cvmfs::MsgStoreReply msg_reply;
612 msg_reply.set_req_id(msg_req->req_id());
613 msg_reply.set_part_nr(msg_req->part_nr());
615 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
621 "malformed hash or bad object size received from client");
622 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
627 UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
629 cvmfs::EnumStatus status = cvmfs::STATUS_OK;
630 if (msg_req->part_nr() == 1) {
633 "invalid attempt to restart running transaction");
634 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
641 if (msg_req->has_expected_size()) {info.
size = msg_req->expected_size();}
642 if (msg_req->has_object_type()) {info.
object_type = msg_req->object_type();}
643 if (msg_req->has_description()) {info.
description = msg_req->description();}
644 status =
StartTxn(object_id, txn_id, info);
645 if (status != cvmfs::STATUS_OK) {
647 "failed to start transaction");
648 msg_reply.set_status(status);
654 retval =
txn_ids_.Lookup(uniq_req, &txn_id);
657 "invalid transaction received from client");
658 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
667 reinterpret_cast<unsigned char *>(frame->
attachment()),
669 if (status != cvmfs::STATUS_OK) {
670 LogSessionError(msg_req->session_id(), status,
"failure writing object");
671 msg_reply.set_status(status);
677 if (msg_req->last_part()) {
679 if (status != cvmfs::STATUS_OK) {
681 "failure committing object");
685 msg_reply.set_status(status);
691 return atomic_read32(&
running_) != 0;
697 if (tokens[0] ==
"unix") {
698 string lock_path = tokens[1] +
".lock";
702 "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
710 "failed to lock on %s, file is busy", lock_path.c_str());
717 }
else if (tokens[0] ==
"tcp") {
718 vector<string> tcp_address =
SplitString(tokens[1],
':');
719 if (tcp_address.size() != 2) {
721 "invalid locator: %s", locator.c_str());
728 "unknown endpoint in locator: %s", locator.c_str());
734 if (errno == EADDRINUSE) {
739 "failed to create endpoint %s (%d)", locator.c_str(), errno);
753 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
754 map<uint64_t, SessionInfo>::const_iterator iter =
sessions_.find(session_id);
756 session_str = iter->second.name;
759 "session '%s': %s", session_str.c_str(), msg.c_str());
765 cvmfs::EnumStatus status,
766 const std::string &msg)
768 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
769 map<uint64_t, SessionInfo>::const_iterator iter =
sessions_.find(session_id);
771 session_str = iter->second.name;
774 "session '%s': %s (%d - %s)",
775 session_str.c_str(), msg.c_str(), status,
785 vector<struct pollfd> watch_fds;
787 struct pollfd watch_ctrl;
789 watch_ctrl.events = POLLIN | POLLPRI;
790 watch_fds.push_back(watch_ctrl);
791 struct pollfd watch_socket;
793 watch_socket.events = POLLIN | POLLPRI;
794 watch_fds.push_back(watch_socket);
796 bool terminated =
false;
797 while (!terminated) {
798 for (
unsigned i = 0; i < watch_fds.size(); ++i)
799 watch_fds[i].revents = 0;
800 int retval = poll(&watch_fds[0], watch_fds.size(), -1);
809 if (watch_fds[0].revents) {
811 ReadPipe(watch_fds[0].fd, &signal, 1);
818 if (watch_fds.size() > 2) {
820 "terminating external cache manager with pending connections");
826 if (watch_fds[1].revents) {
827 struct sockaddr_un remote;
828 socklen_t socket_size =
sizeof(remote);
830 accept(watch_fds[1].fd, (
struct sockaddr *)&remote, &socket_size);
833 "failed to establish connection (%d)", errno);
836 struct pollfd watch_con;
837 watch_con.fd = fd_con;
838 watch_con.events = POLLIN | POLLPRI;
839 watch_fds.push_back(watch_con);
844 for (
unsigned i = 2; i < watch_fds.size(); ) {
845 if (watch_fds[i].revents) {
848 close(watch_fds[i].fd);
850 watch_fds.erase(watch_fds.begin() + i);
856 "stopping cache plugin, no more active clients");
870 for (
unsigned i = 2; i < watch_fds.size(); ++i)
871 close(watch_fds[i].fd);
874 signal(SIGPIPE, save_sigpipe);
884 if (pipe_ready == NULL)
903 for (; iter != iter_end; ++iter) {
907 cvmfs::MsgDetach msg_detach;
void Get(uint64_t *id, char **reponame, char **client_instance)
#define LogCvmfs(source, mask,...)
int MakeSocket(const std::string &path, const int mode)
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
static void CleanupInstance()
void Set(uint64_t id, char *reponame, char *client_instance)
static const uint64_t kSizeUnknown
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
std::string ToString(const bool with_suffix=false) const
bool is_set
either not yet set or deliberately unset
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
std::set< int > connections_
std::map< uint64_t, SessionInfo > sessions_
uint64_t capabilities() const
void ProcessRequests(unsigned num_workers)
cvmfs::EnumObjectType object_type
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
static SessionCtx * instance_
static const unsigned kDefaultMaxObjectSize
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
void MakePipe(int pipe_fd[2])
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
uint64_t num_inlimbo_clients_
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
bool Listen(const std::string &locator)
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
vector< string > SplitString(const string &str, char delim)
static const unsigned kListingSize
static void TlsDestructor(void *data)
void NotifySupervisor(char signal)
void * attachment() const
unsigned max_object_size_
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
uint32_t att_size() const
static const char kSignalDetach
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
string StringifyInt(const int64_t value)
static const unsigned kPbProtocolVersion
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
void SendDetachRequests()
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
atomic_int64 next_lst_id_
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
static const char kSignalTerminate
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
atomic_int64 next_txn_id_
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
static const char * kEnvReadyNotifyFd
void LogSessionInfo(uint64_t session_id, const std::string &msg)
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
bool HandleRequest(int fd_con)
static SessionCtx * GetInstance()