10 #include <sys/socket.h>
39 lock_tls_blocks_ =
reinterpret_cast<pthread_mutex_t *
>(
40 smalloc(
sizeof(pthread_mutex_t)));
41 const int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
47 pthread_mutex_destroy(lock_tls_blocks_);
48 free(lock_tls_blocks_);
50 for (
unsigned i = 0; i < tls_blocks_.size(); ++i) {
51 delete tls_blocks_[i];
54 const int retval = pthread_key_delete(thread_local_storage_);
60 if (instance_ == NULL) {
63 pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
73 pthread_getspecific(thread_local_storage_));
74 if ((tls == NULL) || !tls->
is_set) {
77 *client_instance = NULL;
88 pthread_getspecific(thread_local_storage_));
98 pthread_getspecific(thread_local_storage_));
102 const int retval = pthread_setspecific(thread_local_storage_, tls);
105 tls_blocks_.push_back(tls);
121 for (vector<ThreadLocalStorage *>::iterator
122 i = instance_->tls_blocks_.begin(),
123 iEnd = instance_->tls_blocks_.end();
126 instance_->tls_blocks_.erase(i);
135 pthread_getspecific(thread_local_storage_));
149 : id(id), name(name) {
151 reponame = strdup(tokens[0].c_str());
152 if (tokens.size() > 1)
200 cvmfs::MsgBreadcrumbReply msg_reply;
203 msg_reply.set_req_id(msg_req->req_id());
205 const bool retval = transport->
ParseMsgHash(msg_req->breadcrumb().hash(),
209 "malformed hash received from client");
210 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
212 breadcrumb.
timestamp = msg_req->breadcrumb().timestamp();
213 if (msg_req->breadcrumb().has_revision()) {
214 breadcrumb.
revision = msg_req->breadcrumb().revision();
218 const cvmfs::EnumStatus status =
220 msg_reply.set_status(status);
229 cvmfs::MsgBreadcrumbReply msg_reply;
232 msg_reply.set_req_id(msg_req->req_id());
234 const cvmfs::EnumStatus status =
LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
235 msg_reply.set_status(status);
236 if (status == cvmfs::STATUS_OK) {
238 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
240 cvmfs::MsgBreadcrumb *msg_breadcrumb =
new cvmfs::MsgBreadcrumb();
241 msg_breadcrumb->set_fqrn(msg_req->fqrn());
242 msg_breadcrumb->set_allocated_hash(msg_hash);
243 msg_breadcrumb->set_timestamp(breadcrumb.
timestamp);
244 msg_breadcrumb->set_revision(breadcrumb.
revision);
245 msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
254 if (msg_req->has_name()) {
258 session_id,
"anonymous client (" +
StringifyInt(session_id) +
")");
260 cvmfs::MsgHandshakeAck msg_ack;
263 msg_ack.set_status(cvmfs::STATUS_OK);
264 msg_ack.set_name(
name_);
267 msg_ack.set_session_id(session_id);
270 msg_ack.set_pid(getpid());
278 cvmfs::MsgInfoReply msg_reply;
281 msg_reply.set_req_id(msg_req->req_id());
283 const cvmfs::EnumStatus status =
GetInfo(&info);
284 if (status != cvmfs::STATUS_OK) {
286 "failed to query cache status");
292 msg_reply.set_status(status);
298 if (!msg_req->has_conncnt_change_by())
300 const int32_t conncnt_change_by = msg_req->conncnt_change_by();
303 "invalid request to drop connection counter below zero");
306 if (conncnt_change_by > 0) {
307 LogSessionInfo(msg_req->session_id(),
"lock session beyond lifetime");
318 cvmfs::MsgListReply msg_reply;
321 msg_reply.set_req_id(msg_req->req_id());
322 int64_t listing_id = msg_req->listing_id();
323 msg_reply.set_listing_id(listing_id);
324 msg_reply.set_is_last_part(
true);
326 cvmfs::EnumStatus status;
327 if (msg_req->listing_id() == 0) {
329 status =
ListingBegin(listing_id, msg_req->object_type());
330 if (status != cvmfs::STATUS_OK) {
332 "failed to start enumeration of objects");
333 msg_reply.set_status(status);
337 msg_reply.set_listing_id(listing_id);
342 unsigned total_size = 0;
343 while ((status =
ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
344 cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
345 cvmfs::MsgHash *msg_hash =
new cvmfs::MsgHash();
347 msg_list_record->set_allocated_hash(msg_hash);
348 msg_list_record->set_pinned(item.
pinned);
349 msg_list_record->set_description(item.
description);
351 total_size +=
sizeof(item) + item.
description.length();
355 if (status == cvmfs::STATUS_OUTOFBOUNDS) {
357 status = cvmfs::STATUS_OK;
359 msg_reply.set_is_last_part(
false);
361 if (status != cvmfs::STATUS_OK) {
362 LogSessionError(msg_req->session_id(), status,
"failed enumerate objects");
364 msg_reply.set_status(status);
372 cvmfs::MsgObjectInfoReply msg_reply;
375 msg_reply.set_req_id(msg_req->req_id());
377 const bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
380 "malformed hash received from client");
381 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
384 const cvmfs::EnumStatus status =
GetObjectInfo(object_id, &info);
385 msg_reply.set_status(status);
386 if (status == cvmfs::STATUS_OK) {
388 msg_reply.set_size(info.
size);
389 }
else if (status != cvmfs::STATUS_NOENTRY) {
391 "failed retrieving object details");
401 cvmfs::MsgReadReply msg_reply;
404 msg_reply.set_req_id(msg_req->req_id());
406 const bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
409 "malformed hash received from client");
410 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
414 unsigned size = msg_req->size();
416 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(smalloc(size));
418 unsigned char buffer[
size];
420 const cvmfs::EnumStatus status =
421 Pread(object_id, msg_req->offset(), &
size, buffer);
422 msg_reply.set_status(status);
423 if (status == cvmfs::STATUS_OK) {
427 "failed to read from object");
439 cvmfs::MsgRefcountReply msg_reply;
442 msg_reply.set_req_id(msg_req->req_id());
444 const bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
447 "malformed hash received from client");
448 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
450 const cvmfs::EnumStatus status =
452 msg_reply.set_status(status);
453 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
455 "failed to open/close object " + object_id.
ToString());
467 const bool retval = transport.
RecvFrame(&frame_recv);
470 "failed to receive request from connection (%d)", errno);
474 google::protobuf::MessageLite *msg_typed = frame_recv.
GetMsgTyped();
476 if (msg_typed->GetTypeName() ==
"cvmfs.MsgHandshake") {
477 cvmfs::MsgHandshake *msg_req =
reinterpret_cast<cvmfs::MsgHandshake *
>(
480 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgQuit") {
481 cvmfs::MsgQuit *msg_req =
reinterpret_cast<cvmfs::MsgQuit *
>(msg_typed);
482 const map<uint64_t, SessionInfo>::const_iterator iter =
485 free(iter->second.reponame);
486 free(iter->second.client_instance);
490 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgIoctl") {
491 HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
492 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgRefcountReq") {
493 cvmfs::MsgRefcountReq *msg_req =
reinterpret_cast<cvmfs::MsgRefcountReq *
>(
496 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgObjectInfoReq") {
497 cvmfs::MsgObjectInfoReq
498 *msg_req =
reinterpret_cast<cvmfs::MsgObjectInfoReq *
>(msg_typed);
500 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgReadReq") {
501 cvmfs::MsgReadReq *msg_req =
reinterpret_cast<cvmfs::MsgReadReq *
>(
504 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreReq") {
505 cvmfs::MsgStoreReq *msg_req =
reinterpret_cast<cvmfs::MsgStoreReq *
>(
508 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgStoreAbortReq") {
509 cvmfs::MsgStoreAbortReq
510 *msg_req =
reinterpret_cast<cvmfs::MsgStoreAbortReq *
>(msg_typed);
512 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgInfoReq") {
513 cvmfs::MsgInfoReq *msg_req =
reinterpret_cast<cvmfs::MsgInfoReq *
>(
516 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgShrinkReq") {
517 cvmfs::MsgShrinkReq *msg_req =
reinterpret_cast<cvmfs::MsgShrinkReq *
>(
520 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgListReq") {
521 cvmfs::MsgListReq *msg_req =
reinterpret_cast<cvmfs::MsgListReq *
>(
524 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbStoreReq") {
525 cvmfs::MsgBreadcrumbStoreReq
526 *msg_req =
reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *
>(msg_typed);
528 }
else if (msg_typed->GetTypeName() ==
"cvmfs.MsgBreadcrumbLoadReq") {
529 cvmfs::MsgBreadcrumbLoadReq
530 *msg_req =
reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *
>(msg_typed);
534 "unexpected message from client: %s",
535 std::string(msg_typed->GetTypeName()).c_str());
546 cvmfs::MsgShrinkReply msg_reply;
549 msg_reply.set_req_id(msg_req->req_id());
550 uint64_t used_bytes = 0;
551 const cvmfs::EnumStatus status =
Shrink(msg_req->shrink_to(), &used_bytes);
552 msg_reply.set_used_bytes(used_bytes);
553 msg_reply.set_status(status);
554 if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
555 LogSessionError(msg_req->session_id(), status,
"failed to cleanup cache");
564 cvmfs::MsgStoreReply msg_reply;
566 msg_reply.set_req_id(msg_req->req_id());
567 msg_reply.set_part_nr(0);
569 const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
570 const bool retval =
txn_ids_.Lookup(uniq_req, &txn_id);
573 "malformed transaction id received from client");
574 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
576 const cvmfs::EnumStatus status =
AbortTxn(txn_id);
577 msg_reply.set_status(status);
578 if (status != cvmfs::STATUS_OK) {
580 "failed to abort transaction");
592 cvmfs::MsgStoreReply msg_reply;
594 msg_reply.set_req_id(msg_req->req_id());
595 msg_reply.set_part_nr(msg_req->part_nr());
597 bool retval = transport->
ParseMsgHash(msg_req->object_id(), &object_id);
601 "malformed hash or bad object size received from client");
602 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
607 const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
609 cvmfs::EnumStatus status = cvmfs::STATUS_OK;
610 if (msg_req->part_nr() == 1) {
613 "invalid attempt to restart running transaction");
614 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
621 if (msg_req->has_expected_size()) {
622 info.
size = msg_req->expected_size();
624 if (msg_req->has_object_type()) {
627 if (msg_req->has_description()) {
630 status =
StartTxn(object_id, txn_id, info);
631 if (status != cvmfs::STATUS_OK) {
633 "failed to start transaction");
634 msg_reply.set_status(status);
640 retval =
txn_ids_.Lookup(uniq_req, &txn_id);
643 "invalid transaction received from client");
644 msg_reply.set_status(cvmfs::STATUS_MALFORMED);
653 reinterpret_cast<unsigned char *>(frame->
attachment()),
655 if (status != cvmfs::STATUS_OK) {
656 LogSessionError(msg_req->session_id(), status,
"failure writing object");
657 msg_reply.set_status(status);
663 if (msg_req->last_part()) {
665 if (status != cvmfs::STATUS_OK) {
667 "failure committing object");
671 msg_reply.set_status(status);
681 if (tokens[0] ==
"unix") {
682 const string lock_path = tokens[1] +
".lock";
686 "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
694 "failed to lock on %s, file is busy", lock_path.c_str());
701 }
else if (tokens[0] ==
"tcp") {
702 vector<string> tcp_address =
SplitString(tokens[1],
':');
703 if (tcp_address.size() != 2) {
712 "unknown endpoint in locator: %s", locator.c_str());
718 if (errno == EADDRINUSE) {
723 "failed to create endpoint %s (%d)", locator.c_str(), errno);
737 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
738 const map<uint64_t, SessionInfo>::const_iterator iter =
741 session_str = iter->second.name;
744 session_str.c_str(), msg.c_str());
749 cvmfs::EnumStatus status,
750 const std::string &msg) {
751 string session_str(
"unidentified client (" +
StringifyInt(session_id) +
")");
752 const map<uint64_t, SessionInfo>::const_iterator iter =
755 session_str = iter->second.name;
758 session_str.c_str(), msg.c_str(), status,
768 vector<struct pollfd> watch_fds;
770 struct pollfd watch_ctrl;
772 watch_ctrl.events = POLLIN | POLLPRI;
773 watch_fds.push_back(watch_ctrl);
774 struct pollfd watch_socket;
776 watch_socket.events = POLLIN | POLLPRI;
777 watch_fds.push_back(watch_socket);
779 bool terminated =
false;
780 while (!terminated) {
781 for (
unsigned i = 0; i < watch_fds.size(); ++i)
782 watch_fds[i].revents = 0;
783 const int retval = poll(&watch_fds[0], watch_fds.size(), -1);
792 if (watch_fds[0].revents) {
794 ReadPipe(watch_fds[0].fd, &signal, 1);
801 if (watch_fds.size() > 2) {
803 "terminating external cache manager with pending connections");
809 if (watch_fds[1].revents) {
810 struct sockaddr_un remote;
811 socklen_t socket_size =
sizeof(remote);
813 accept(watch_fds[1].fd, (
struct sockaddr *)&remote, &socket_size);
816 "failed to establish connection (%d)", errno);
819 struct pollfd watch_con;
820 watch_con.fd = fd_con;
821 watch_con.events = POLLIN | POLLPRI;
822 watch_fds.push_back(watch_con);
827 for (
unsigned i = 2; i < watch_fds.size();) {
828 if (watch_fds[i].revents) {
829 const bool proceed = cache_plugin->
HandleRequest(watch_fds[i].fd);
831 close(watch_fds[i].fd);
833 watch_fds.erase(watch_fds.begin() + i);
838 "stopping cache plugin, no more active clients");
852 for (
unsigned i = 2; i < watch_fds.size(); ++i)
853 close(watch_fds[i].fd);
856 signal(SIGPIPE, save_sigpipe);
866 if (pipe_ready == NULL)
885 const set<int>::const_iterator iter_end =
connections_.end();
886 for (; iter != iter_end; ++iter) {
890 cvmfs::MsgDetach msg_detach;
void Get(uint64_t *id, char **reponame, char **client_instance)
int MakeSocket(const std::string &path, const int mode)
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
static void CleanupInstance()
void Set(uint64_t id, char *reponame, char *client_instance)
static const uint64_t kSizeUnknown
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
std::string ToString(const bool with_suffix=false) const
bool is_set
either not yet set or deliberately unset
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
std::set< int > connections_
std::map< uint64_t, SessionInfo > sessions_
uint64_t capabilities() const
void ProcessRequests(unsigned num_workers)
cvmfs::EnumObjectType object_type
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
static SessionCtx * instance_
static const unsigned kDefaultMaxObjectSize
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
void MakePipe(int pipe_fd[2])
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
uint64_t num_inlimbo_clients_
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
bool Listen(const std::string &locator)
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
vector< string > SplitString(const string &str, char delim)
static const unsigned kListingSize
static void TlsDestructor(void *data)
void NotifySupervisor(char signal)
void * attachment() const
unsigned max_object_size_
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
uint32_t att_size() const
static const char kSignalDetach
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
string StringifyInt(const int64_t value)
static const unsigned kPbProtocolVersion
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
void SendDetachRequests()
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
atomic_int64 next_lst_id_
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
static const char kSignalTerminate
void WritePipe(int fd, const void *buf, size_t nbyte)
void ReadPipe(int fd, void *buf, size_t nbyte)
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
atomic_int64 next_txn_id_
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
static const char * kEnvReadyNotifyFd
void LogSessionInfo(uint64_t session_id, const std::string &msg)
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
bool HandleRequest(int fd_con)
static SessionCtx * GetInstance()
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)