CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_extern.cc
Go to the documentation of this file.
1 
4 #include "cvmfs_config.h"
5 #include "cache_extern.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <stdint.h>
11 #include <sys/socket.h>
12 #include <unistd.h>
13 
14 #include <algorithm>
15 #include <cassert>
16 #ifdef __APPLE__
17 #include <cstdlib>
18 #endif
19 #include <cstring>
20 #include <map>
21 #include <new>
22 #include <set>
23 #include <string>
24 
25 #include "atomic.h"
26 #include "cache.pb.h"
27 #include "hash.h"
28 #include "logging.h"
29 #ifdef __APPLE__
30 #include "smalloc.h"
31 #endif
32 #include "util/exception.h"
33 #include "util/pointer.h"
34 #include "util/posix.h"
35 #include "util/string.h"
36 #include "util_concurrency.h"
37 
38 using namespace std; // NOLINT
39 
40 namespace {
41 
42 int Ack2Errno(cvmfs::EnumStatus status_code) {
43  switch (status_code) {
44  case cvmfs::STATUS_OK:
45  return 0;
46  case cvmfs::STATUS_NOSUPPORT:
47  return -EOPNOTSUPP;
48  case cvmfs::STATUS_FORBIDDEN:
49  return -EPERM;
50  case cvmfs::STATUS_NOSPACE:
51  return -ENOSPC;
52  case cvmfs::STATUS_NOENTRY:
53  return -ENOENT;
54  case cvmfs::STATUS_MALFORMED:
55  return -EINVAL;
56  case cvmfs::STATUS_IOERR:
57  return -EIO;
58  case cvmfs::STATUS_CORRUPTED:
59  return -EIO;
60  case cvmfs::STATUS_TIMEOUT:
61  return -EIO;
62  case cvmfs::STATUS_BADCOUNT:
63  return -EINVAL;
64  case cvmfs::STATUS_OUTOFBOUNDS:
65  return -EINVAL;
66  default:
67  return -EIO;
68  }
69 }
70 
71 } // anonymous namespace
72 
74 
75 
77  int result = Reset(txn);
78 #ifdef __APPLE__
79  free(reinterpret_cast<Transaction *>(txn)->buffer);
80 #endif
81  return result;
82 }
83 
84 
86  assert(quota_mgr != NULL);
87  quota_mgr_ = quota_mgr;
88  LogCvmfs(kLogCache, kLogDebug, "set quota manager");
89  return true;
90 }
91 
92 
94  if (!spawned_) {
95  transport_.SendFrame(rpc_job->frame_send());
96  uint32_t save_att_size = rpc_job->frame_recv()->att_size();
97  bool again;
98  do {
99  again = false;
100  bool retval = transport_.RecvFrame(rpc_job->frame_recv());
101  assert(retval);
102  if (rpc_job->frame_recv()->IsMsgOutOfBand()) {
103  google::protobuf::MessageLite *msg_typed =
104  rpc_job->frame_recv()->GetMsgTyped();
105  assert(msg_typed->GetTypeName() == "cvmfs.MsgDetach");
106  quota_mgr_->BroadcastBackchannels("R"); // release pinned catalogs
107  rpc_job->frame_recv()->Reset(save_att_size);
108  again = true;
109  }
110  } while (again);
111  } else {
112  Signal signal;
113  {
114  MutexLockGuard guard(lock_inflight_rpcs_);
115  inflight_rpcs_.push_back(RpcInFlight(rpc_job, &signal));
116  }
117  {
118  MutexLockGuard guard(lock_send_fd_);
119  transport_.SendFrame(rpc_job->frame_send());
120  }
121  signal.Wait();
122  }
123 }
124 
125 
126 int ExternalCacheManager::ChangeRefcount(const shash::Any &id, int change_by) {
127  cvmfs::MsgHash object_id;
128  transport_.FillMsgHash(id, &object_id);
129  cvmfs::MsgRefcountReq msg_refcount;
130  msg_refcount.set_session_id(session_id_);
131  msg_refcount.set_req_id(NextRequestId());
132  msg_refcount.set_allocated_object_id(&object_id);
133  msg_refcount.set_change_by(change_by);
134  RpcJob rpc_job(&msg_refcount);
135  CallRemotely(&rpc_job);
136  msg_refcount.release_object_id();
137 
138  cvmfs::MsgRefcountReply *msg_reply = rpc_job.msg_refcount_reply();
139  return Ack2Errno(msg_reply->status());
140 }
141 
142 
144  ReadOnlyHandle handle;
145  {
146  WriteLockGuard guard(rwlock_fd_table_);
147  handle = fd_table_.GetHandle(fd);
148  if (handle.id == kInvalidHandle)
149  return -EBADF;
150  int retval = fd_table_.CloseFd(fd);
151  assert(retval == 0);
152  }
153 
154  return ChangeRefcount(handle.id, -1);
155 }
156 
157 
159  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
160  LogCvmfs(kLogCache, kLogDebug, "committing %s",
161  transaction->id.ToString().c_str());
162  int retval = Flush(true, transaction);
163  if (retval != 0)
164  return retval;
165 
166  int refcount = transaction->open_fds - 1;
167  if (refcount != 0)
168  return ChangeRefcount(transaction->id, refcount);
169 #ifdef __APPLE__
170  free(transaction->buffer);
171 #endif
172  return 0;
173 }
174 
175 
177  const std::string &locator, bool print_error)
178 {
179  vector<string> tokens = SplitString(locator, '=');
180  int result = -1;
181  if (tokens[0] == "unix") {
182  result = ConnectSocket(tokens[1]);
183  } else if (tokens[0] == "tcp") {
184  vector<string> tcp_address = SplitString(tokens[1], ':');
185  if (tcp_address.size() != 2)
186  return -EINVAL;
187  result = ConnectTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
188  } else {
189  return -EINVAL;
190  }
191  if (result < 0) {
192  if (print_error) {
193  if (errno) {
195  "Failed to connect to socket: %s", strerror(errno));
196  } else {
198  "Failed to connect to socket (unknown error)");
199  }
200  }
201  return -EIO;
202  }
204  "connected to cache plugin at %s", locator.c_str());
205  return result;
206 }
207 
208 
210  int fd_connection,
211  unsigned max_open_fds,
212  const string &ident)
213 {
215  new ExternalCacheManager(fd_connection, max_open_fds));
216  assert(cache_mgr.IsValid());
217 
218  cvmfs::MsgHandshake msg_handshake;
219  msg_handshake.set_protocol_version(kPbProtocolVersion);
220  msg_handshake.set_name(ident);
221  CacheTransport::Frame frame_send(&msg_handshake);
222  cache_mgr->transport_.SendFrame(&frame_send);
223 
224  CacheTransport::Frame frame_recv;
225  bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
226  if (!retval)
227  return NULL;
228  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
229  if (msg_typed->GetTypeName() != "cvmfs.MsgHandshakeAck")
230  return NULL;
231  cvmfs::MsgHandshakeAck *msg_ack =
232  reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed);
233  cache_mgr->session_id_ = msg_ack->session_id();
234  cache_mgr->capabilities_ = msg_ack->capabilities();
235  cache_mgr->max_object_size_ = msg_ack->max_object_size();
236  assert(cache_mgr->max_object_size_ > 0);
237  if (cache_mgr->max_object_size_ > kMaxSupportedObjectSize) {
239  "external cache manager object size too large (%u)",
240  cache_mgr->max_object_size_);
241  return NULL;
242  }
243  if (cache_mgr->max_object_size_ < kMinSupportedObjectSize) {
245  "external cache manager object size too small (%u)",
246  cache_mgr->max_object_size_);
247  return NULL;
248  }
249  if (msg_ack->has_pid())
250  cache_mgr->pid_plugin_ = msg_ack->pid();
251  return cache_mgr.Release();
252 }
253 
254 
262  const std::string &locator,
263  const std::vector<std::string> &cmd_line)
264 {
265  UniquePtr<PluginHandle> plugin_handle(new PluginHandle());
266  unsigned num_attempts = 0;
267  bool try_again = false;
268  do {
269  num_attempts++;
270  if (num_attempts > 2) {
271  // Prevent violate busy loops
272  SafeSleepMs(1000);
273  }
274  plugin_handle->fd_connection_ = ConnectLocator(locator, num_attempts > 1);
275  if (plugin_handle->IsValid()) {
276  break;
277  } else if (plugin_handle->fd_connection_ == -EINVAL) {
279  "Invalid locator: %s", locator.c_str());
280  plugin_handle->error_msg_ = "Invalid locator: " + locator;
281  break;
282  } else {
283  if (num_attempts > 1) {
285  "Failed to connect to external cache manager: %d",
286  plugin_handle->fd_connection_);
287  }
288  plugin_handle->error_msg_ = "Failed to connect to external cache manager";
289  }
290 
291  try_again = SpawnPlugin(cmd_line);
292  } while (try_again);
293 
294  return plugin_handle.Release();
295 }
296 
297 
299  const ObjectInfo &object_info,
300  const int flags,
301  void *txn)
302 {
303  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
304  transaction->object_info = object_info;
305  transaction->object_info_modified = true;
306 }
307 
308 
310  return "External cache manager\n";
311 }
312 
313 
315  FdTable<ReadOnlyHandle> *fd_table =
316  reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
317  delete fd_table;
318  return true;
319 }
320 
321 
323  int fd = -1;
324  {
325  WriteLockGuard guard(rwlock_fd_table_);
326  fd = fd_table_.OpenFd(ReadOnlyHandle(id));
327  if (fd < 0) {
328  LogCvmfs(kLogCache, kLogDebug, "error while creating new fd",
329  strerror(-fd));
330  return fd;
331  }
332  }
333 
334  int status_refcnt = ChangeRefcount(id, 1);
335  if (status_refcnt == 0)
336  return fd;
337 
338  WriteLockGuard guard(rwlock_fd_table_);
339  int retval = fd_table_.CloseFd(fd);
340  assert(retval == 0);
341  return status_refcnt;
342 }
343 
344 
346  // When DoRestoreState is called, we have fd 0 assigned to the root file
347  // catalog unless this is a lower layer cache in a tiered setup
348  for (unsigned i = 1; i < fd_table_.GetMaxFds(); ++i) {
349  assert(fd_table_.GetHandle(i) == ReadOnlyHandle());
350  }
351  ReadOnlyHandle handle_root = fd_table_.GetHandle(0);
352 
353  FdTable<ReadOnlyHandle> *other =
354  reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
355  fd_table_.AssignFrom(*other);
356  cvmfs::MsgIoctl msg_ioctl;
357  msg_ioctl.set_session_id(session_id_);
358  msg_ioctl.set_conncnt_change_by(-1);
359  CacheTransport::Frame frame(&msg_ioctl);
360  transport_.SendFrame(&frame);
361 
362  int new_root_fd = -1;
363  if (handle_root != ReadOnlyHandle()) {
364  new_root_fd = fd_table_.OpenFd(handle_root);
365  // There must be a free file descriptor because the root file catalog gets
366  // closed before a reload
367  assert(new_root_fd >= 0);
368  }
369  return new_root_fd;
370 }
371 
372 
374  cvmfs::MsgIoctl msg_ioctl;
375  msg_ioctl.set_session_id(session_id_);
376  msg_ioctl.set_conncnt_change_by(1);
377  CacheTransport::Frame frame(&msg_ioctl);
378  transport_.SendFrame(&frame);
379  return fd_table_.Clone();
380 }
381 
382 
384  shash::Any id = GetHandle(fd);
385  if (id == kInvalidHandle)
386  return -EBADF;
387  return DoOpen(id);
388 }
389 
390 
392  int fd_connection,
393  unsigned max_open_fds)
394  : pid_plugin_(0)
395  , fd_table_(max_open_fds, ReadOnlyHandle())
396  , transport_(fd_connection)
397  , session_id_(-1)
398  , max_object_size_(0)
399  , spawned_(false)
400  , terminated_(false)
401  , capabilities_(cvmfs::CAP_NONE)
402 {
403  int retval = pthread_rwlock_init(&rwlock_fd_table_, NULL);
404  assert(retval == 0);
405  retval = pthread_mutex_init(&lock_send_fd_, NULL);
406  assert(retval == 0);
407  retval = pthread_mutex_init(&lock_inflight_rpcs_, NULL);
408  assert(retval == 0);
409  memset(&thread_read_, 0, sizeof(thread_read_));
410  atomic_init64(&next_request_id_);
411 }
412 
413 
415  terminated_ = true;
416  MemoryFence();
417  if (session_id_ >= 0) {
418  cvmfs::MsgQuit msg_quit;
419  msg_quit.set_session_id(session_id_);
420  CacheTransport::Frame frame(&msg_quit);
421  transport_.SendFrame(&frame);
422  }
423  shutdown(transport_.fd_connection(), SHUT_RDWR);
424  if (spawned_)
425  pthread_join(thread_read_, NULL);
426  close(transport_.fd_connection());
427  pthread_rwlock_destroy(&rwlock_fd_table_);
428  pthread_mutex_destroy(&lock_send_fd_);
429  pthread_mutex_destroy(&lock_inflight_rpcs_);
430 }
431 
432 
433 int ExternalCacheManager::Flush(bool do_commit, Transaction *transaction) {
434  if (transaction->committed)
435  return 0;
436  LogCvmfs(kLogCache, kLogDebug, "flushing %u bytes for %s",
437  transaction->buf_pos, transaction->id.ToString().c_str());
438  cvmfs::MsgHash object_id;
439  transport_.FillMsgHash(transaction->id, &object_id);
440  cvmfs::MsgStoreReq msg_store;
441  msg_store.set_session_id(session_id_);
442  msg_store.set_req_id(transaction->transaction_id);
443  msg_store.set_allocated_object_id(&object_id);
444  msg_store.set_part_nr((transaction->size / max_object_size_) + 1);
445  msg_store.set_expected_size(transaction->expected_size);
446  msg_store.set_last_part(do_commit);
447 
448  if (transaction->object_info_modified) {
449  cvmfs::EnumObjectType object_type;
450  transport_.FillObjectType(transaction->object_info.type, &object_type);
451  msg_store.set_object_type(object_type);
452  msg_store.set_description(transaction->object_info.description);
453  }
454 
455  RpcJob rpc_job(&msg_store);
456  rpc_job.set_attachment_send(transaction->buffer, transaction->buf_pos);
457  // TODO(jblomer): allow for out of order chunk upload
458  CallRemotely(&rpc_job);
459  msg_store.release_object_id();
460 
461  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
462  if (msg_reply->status() == cvmfs::STATUS_OK) {
463  transaction->flushed = true;
464  if (do_commit)
465  transaction->committed = true;
466  }
467  return Ack2Errno(msg_reply->status());
468 }
469 
470 
473  ReadOnlyHandle handle = fd_table_.GetHandle(fd);
474  return handle.id;
475 }
476 
477 
479  shash::Any id = GetHandle(fd);
480  if (id == kInvalidHandle)
481  return -EBADF;
482 
483  cvmfs::MsgHash object_id;
484  transport_.FillMsgHash(id, &object_id);
485  cvmfs::MsgObjectInfoReq msg_info;
486  msg_info.set_session_id(session_id_);
487  msg_info.set_req_id(NextRequestId());
488  msg_info.set_allocated_object_id(&object_id);
489  RpcJob rpc_job(&msg_info);
490  CallRemotely(&rpc_job);
491  msg_info.release_object_id();
492 
493  cvmfs::MsgObjectInfoReply *msg_reply = rpc_job.msg_object_info_reply();
494  if (msg_reply->status() == cvmfs::STATUS_OK) {
495  assert(msg_reply->has_size());
496  return msg_reply->size();
497  }
498  return Ack2Errno(msg_reply->status());
499 }
500 
501 
503  ExternalCacheManager *cache_mgr =
504  reinterpret_cast<ExternalCacheManager *>(data);
505  LogCvmfs(kLogCache, kLogDebug, "starting external cache reader thread");
506 
507  unsigned char buffer[cache_mgr->max_object_size_];
508  while (true) {
509  CacheTransport::Frame frame_recv;
510  frame_recv.set_attachment(buffer, cache_mgr->max_object_size_);
511  bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
512  if (!retval)
513  break;
514 
515  uint64_t req_id;
516  uint64_t part_nr = 0;
517  google::protobuf::MessageLite *msg = frame_recv.GetMsgTyped();
518  if (msg->GetTypeName() == "cvmfs.MsgRefcountReply") {
519  req_id = reinterpret_cast<cvmfs::MsgRefcountReply *>(msg)->req_id();
520  } else if (msg->GetTypeName() == "cvmfs.MsgObjectInfoReply") {
521  req_id = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg)->req_id();
522  } else if (msg->GetTypeName() == "cvmfs.MsgReadReply") {
523  req_id = reinterpret_cast<cvmfs::MsgReadReply *>(msg)->req_id();
524  } else if (msg->GetTypeName() == "cvmfs.MsgStoreReply") {
525  req_id = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->req_id();
526  part_nr = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->part_nr();
527  } else if (msg->GetTypeName() == "cvmfs.MsgInfoReply") {
528  req_id = reinterpret_cast<cvmfs::MsgInfoReply *>(msg)->req_id();
529  } else if (msg->GetTypeName() == "cvmfs.MsgShrinkReply") {
530  req_id = reinterpret_cast<cvmfs::MsgShrinkReply *>(msg)->req_id();
531  } else if (msg->GetTypeName() == "cvmfs.MsgListReply") {
532  req_id = reinterpret_cast<cvmfs::MsgListReply *>(msg)->req_id();
533  } else if (msg->GetTypeName() == "cvmfs.MsgBreadcrumbReply") {
534  req_id = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(msg)->req_id();
535  } else if (msg->GetTypeName() == "cvmfs.MsgDetach") {
536  // Release pinned catalogs
537  cache_mgr->quota_mgr_->BroadcastBackchannels("R");
538  continue;
539  } else {
540  PANIC(kLogSyslogErr | kLogDebug, "unexpected message %s",
541  msg->GetTypeName().c_str());
542  }
543 
544  RpcInFlight rpc_inflight;
545  {
546  MutexLockGuard guard(cache_mgr->lock_inflight_rpcs_);
547  for (unsigned i = 0; i < cache_mgr->inflight_rpcs_.size(); ++i) {
548  RpcJob *rpc_job = cache_mgr->inflight_rpcs_[i].rpc_job;
549  if ((rpc_job->req_id() == req_id) && (rpc_job->part_nr() == part_nr)) {
550  rpc_inflight = cache_mgr->inflight_rpcs_[i];
551  cache_mgr->inflight_rpcs_.erase(
552  cache_mgr->inflight_rpcs_.begin() + i);
553  break;
554  }
555  }
556  }
557  if (rpc_inflight.rpc_job == NULL) {
559  "got unmatched rpc reply");
560  continue;
561  }
562  rpc_inflight.rpc_job->frame_recv()->MergeFrom(frame_recv);
563  rpc_inflight.signal->Wakeup();
564  }
565 
566  if (!cache_mgr->terminated_) {
568  "connection to external cache manager broken (%d)", errno);
569  }
570  LogCvmfs(kLogCache, kLogDebug, "stopping external cache reader thread");
571  return NULL;
572 }
573 
574 
576  return DoOpen(object.id);
577 }
578 
579 
581  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
582  LogCvmfs(kLogCache, kLogDebug, "open fd for transaction %s",
583  transaction->id.ToString().c_str());
584  int retval = Flush(true, transaction);
585  if (retval != 0)
586  return retval;
587 
588  int fd = -1;
589  {
591  fd = fd_table_.OpenFd(ReadOnlyHandle(transaction->id));
592  if (fd < 0) {
593  LogCvmfs(kLogCache, kLogDebug, "error while creating new fd",
594  strerror(-fd));
595  return fd;
596  }
597  }
598  transaction->open_fds++;
599  return fd;
600 }
601 
602 
604  int fd,
605  void *buf,
606  uint64_t size,
607  uint64_t offset)
608 {
609  shash::Any id = GetHandle(fd);
610  if (id == kInvalidHandle)
611  return -EBADF;
612 
613  cvmfs::MsgHash object_id;
614  transport_.FillMsgHash(id, &object_id);
615  uint64_t nbytes = 0;
616  while (nbytes < size) {
617  uint64_t batch_size =
618  std::min(size - nbytes, static_cast<uint64_t>(max_object_size_));
619  cvmfs::MsgReadReq msg_read;
620  msg_read.set_session_id(session_id_);
621  msg_read.set_req_id(NextRequestId());
622  msg_read.set_allocated_object_id(&object_id);
623  msg_read.set_offset(offset + nbytes);
624  msg_read.set_size(batch_size);
625  RpcJob rpc_job(&msg_read);
626  rpc_job.set_attachment_recv(reinterpret_cast<char *>(buf) + nbytes,
627  batch_size);
628  CallRemotely(&rpc_job);
629  msg_read.release_object_id();
630 
631  cvmfs::MsgReadReply *msg_reply = rpc_job.msg_read_reply();
632  if (msg_reply->status() == cvmfs::STATUS_OK) {
633  nbytes += rpc_job.frame_recv()->att_size();
634  // Fuse sends in rounded up buffers, so short reads are expected
635  if (rpc_job.frame_recv()->att_size() < batch_size)
636  return nbytes;
637  } else {
638  return Ack2Errno(msg_reply->status());
639  }
640  }
641  return size;
642 }
643 
644 
646  shash::Any id = GetHandle(fd);
647  if (id == kInvalidHandle)
648  return -EBADF;
649  // No-op
650  return 0;
651 }
652 
653 
655  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
656  transaction->buf_pos = 0;
657  transaction->size = 0;
658  transaction->open_fds = 0;
659  transaction->committed = false;
660  transaction->object_info_modified = true;
661 
662  if (!transaction->flushed)
663  return 0;
664 
665  cvmfs::MsgHash object_id;
666  transport_.FillMsgHash(transaction->id, &object_id);
667  cvmfs::MsgStoreAbortReq msg_abort;
668  msg_abort.set_session_id(session_id_);
669  msg_abort.set_req_id(transaction->transaction_id);
670  msg_abort.set_allocated_object_id(&object_id);
671  RpcJob rpc_job(&msg_abort);
672  CallRemotely(&rpc_job);
673  msg_abort.release_object_id();
674  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
675  transaction->transaction_id = NextRequestId();
676  transaction->flushed = false;
677  return Ack2Errno(msg_reply->status());
678 }
679 
680 
682  const std::string &fqrn)
683 {
684  if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
685  return manifest::Breadcrumb();
686 
687  cvmfs::MsgBreadcrumbLoadReq msg_breadcrumb_load;
688  msg_breadcrumb_load.set_session_id(session_id_);
689  msg_breadcrumb_load.set_req_id(NextRequestId());
690  msg_breadcrumb_load.set_fqrn(fqrn);
691  RpcJob rpc_job(&msg_breadcrumb_load);
692  CallRemotely(&rpc_job);
693 
694  manifest::Breadcrumb breadcrumb;
695  cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
696  if (msg_reply->status() == cvmfs::STATUS_OK) {
697  assert(msg_reply->has_breadcrumb());
698  assert(msg_reply->breadcrumb().fqrn() == fqrn);
699  bool rv = transport_.ParseMsgHash(msg_reply->breadcrumb().hash(),
700  &breadcrumb.catalog_hash);
701  assert(rv);
703  breadcrumb.timestamp = msg_reply->breadcrumb().timestamp();
704  }
705  return breadcrumb;
706 }
707 
708 
710  if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
711  return false;
712 
713  cvmfs::MsgHash hash;
714  transport_.FillMsgHash(manifest.catalog_hash(), &hash);
715  cvmfs::MsgBreadcrumb breadcrumb;
716  breadcrumb.set_fqrn(manifest.repository_name());
717  breadcrumb.set_allocated_hash(&hash);
718  breadcrumb.set_timestamp(manifest.publish_timestamp());
719  cvmfs::MsgBreadcrumbStoreReq msg_breadcrumb_store;
720  msg_breadcrumb_store.set_session_id(session_id_);
721  msg_breadcrumb_store.set_req_id(NextRequestId());
722  msg_breadcrumb_store.set_allocated_breadcrumb(&breadcrumb);
723  RpcJob rpc_job(&msg_breadcrumb_store);
724  CallRemotely(&rpc_job);
725  msg_breadcrumb_store.release_breadcrumb();
726  breadcrumb.release_hash();
727 
728  cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
729  return msg_reply->status() == cvmfs::STATUS_OK;
730 }
731 
732 
734  int retval = pthread_create(&thread_read_, NULL, MainRead, this);
735  assert(retval == 0);
736  spawned_ = true;
737 }
738 
739 
744 bool ExternalCacheManager::SpawnPlugin(const vector<string> &cmd_line) {
745  if (cmd_line.empty())
746  return false;
747 
748  int pipe_ready[2];
749  MakePipe(pipe_ready);
750  set<int> preserve_filedes;
751  preserve_filedes.insert(pipe_ready[1]);
752 
753  int fd_null_read = open("/dev/null", O_RDONLY);
754  int fd_null_write = open("/dev/null", O_WRONLY);
755  assert((fd_null_read >= 0) && (fd_null_write >= 0));
756  map<int, int> map_fildes;
757  map_fildes[fd_null_read] = 0;
758  map_fildes[fd_null_write] = 1;
759  map_fildes[fd_null_write] = 2;
760 
761  pid_t child_pid;
762  int retval = setenv(CacheTransport::kEnvReadyNotifyFd,
763  StringifyInt(pipe_ready[1]).c_str(), 1);
764  assert(retval == 0);
765  retval = ManagedExec(cmd_line,
766  preserve_filedes,
767  map_fildes,
768  false, // drop_credentials
769  false, // clear_env
770  true, // double fork
771  &child_pid);
773  close(fd_null_read);
774  close(fd_null_write);
775  if (!retval) {
777  "failed to start cache plugin '%s'",
778  JoinStrings(cmd_line, " ").c_str());
779  ClosePipe(pipe_ready);
780  return false;
781  }
782 
784  "started cache plugin '%s' (pid %d), waiting for it to become ready",
785  JoinStrings(cmd_line, " ").c_str(), child_pid);
786  close(pipe_ready[1]);
787  char buf;
788  if (read(pipe_ready[0], &buf, 1) != 1) {
789  close(pipe_ready[0]);
791  "cache plugin did not start properly");
792  return false;
793  }
794  close(pipe_ready[0]);
795 
797  return true;
799  "cache plugin failed to create an endpoint");
800  return false;
801 }
802 
803 
805  const shash::Any &id,
806  uint64_t size,
807  void *txn)
808 {
809  if (!(capabilities_ & cvmfs::CAP_WRITE))
810  return -EROFS;
811 
812  Transaction *transaction = new (txn) Transaction(id);
813  transaction->expected_size = size;
814  transaction->transaction_id = NextRequestId();
815 #ifdef __APPLE__
816  transaction->buffer =
817  reinterpret_cast<unsigned char *>(smalloc(max_object_size_));
818 #endif
819  return 0;
820 }
821 
822 
823 int64_t ExternalCacheManager::Write(const void *buf, uint64_t size, void *txn) {
824  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
825  assert(!transaction->committed);
826  LogCvmfs(kLogCache, kLogDebug, "writing %" PRIu64 " bytes for %s",
827  size, transaction->id.ToString().c_str());
828 
829  if (transaction->expected_size != kSizeUnknown) {
830  if (transaction->size + size > transaction->expected_size) {
832  "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
833  transaction->size + size, transaction->expected_size);
834  return -EFBIG;
835  }
836  }
837 
838  uint64_t written = 0;
839  const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
840  while (written < size) {
841  if (transaction->buf_pos == max_object_size_) {
842  bool do_commit = false;
843  if (transaction->expected_size != kSizeUnknown)
844  do_commit = (transaction->size + written) == transaction->expected_size;
845  int retval = Flush(do_commit, transaction);
846  if (retval != 0) {
847  transaction->size += written;
848  return retval;
849  }
850  transaction->size += transaction->buf_pos;
851  transaction->buf_pos = 0;
852  }
853  uint64_t remaining = size - written;
854  uint64_t space_in_buffer = max_object_size_ - transaction->buf_pos;
855  uint64_t batch_size = std::min(remaining, space_in_buffer);
856  memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
857  transaction->buf_pos += batch_size;
858  written += batch_size;
859  read_pos += batch_size;
860  }
861  return written;
862 }
863 
864 
865 //------------------------------------------------------------------------------
866 
867 
869  cvmfs::EnumObjectType type,
870  vector<cvmfs::MsgListRecord> *result)
871 {
872  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_LIST))
873  return false;
874 
875  uint64_t listing_id = 0;
876  bool more_data = false;
877  do {
878  cvmfs::MsgListReq msg_list;
879  msg_list.set_session_id(cache_mgr_->session_id_);
880  msg_list.set_req_id(cache_mgr_->NextRequestId());
881  msg_list.set_listing_id(listing_id);
882  msg_list.set_object_type(type);
883  ExternalCacheManager::RpcJob rpc_job(&msg_list);
884  cache_mgr_->CallRemotely(&rpc_job);
885 
886  cvmfs::MsgListReply *msg_reply = rpc_job.msg_list_reply();
887  if (msg_reply->status() != cvmfs::STATUS_OK)
888  return false;
889  more_data = !msg_reply->is_last_part();
890  listing_id = msg_reply->listing_id();
891  for (int i = 0; i < msg_reply->list_record_size(); ++i) {
892  result->push_back(msg_reply->list_record(i));
893  }
894  } while (more_data);
895 
896  return true;
897 }
898 
899 
900 bool ExternalQuotaManager::Cleanup(const uint64_t leave_size) {
901  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK))
902  return false;
903 
904  cvmfs::MsgShrinkReq msg_shrink;
905  msg_shrink.set_session_id(cache_mgr_->session_id_);
906  msg_shrink.set_req_id(cache_mgr_->NextRequestId());
907  msg_shrink.set_shrink_to(leave_size);
908  ExternalCacheManager::RpcJob rpc_job(&msg_shrink);
909  cache_mgr_->CallRemotely(&rpc_job);
910 
911  cvmfs::MsgShrinkReply *msg_reply = rpc_job.msg_shrink_reply();
912  return msg_reply->status() == cvmfs::STATUS_OK;
913 }
914 
915 
917  ExternalCacheManager *cache_mgr)
918 {
920  new ExternalQuotaManager(cache_mgr));
921  assert(quota_mgr.IsValid());
922 
923  return quota_mgr.Release();
924 }
925 
926 
928  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_INFO))
929  return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
930 
931  cvmfs::MsgInfoReq msg_info;
932  msg_info.set_session_id(cache_mgr_->session_id_);
933  msg_info.set_req_id(cache_mgr_->NextRequestId());
934  ExternalCacheManager::RpcJob rpc_job(&msg_info);
935  cache_mgr_->CallRemotely(&rpc_job);
936 
937  cvmfs::MsgInfoReply *msg_reply = rpc_job.msg_info_reply();
938  if (msg_reply->status() == cvmfs::STATUS_OK) {
939  quota_info->size = msg_reply->size_bytes();
940  quota_info->used = msg_reply->used_bytes();
941  quota_info->pinned = msg_reply->pinned_bytes();
942  if (msg_reply->no_shrink() >= 0)
943  quota_info->no_shrink = msg_reply->no_shrink();
944  }
945  return Ack2Errno(msg_reply->status());
946 }
947 
948 
950  QuotaInfo info;
951  int retval = GetInfo(&info);
952  if (retval != 0)
953  return uint64_t(-1);
954  return info.size;
955 }
956 
957 
958 uint64_t ExternalQuotaManager::GetCleanupRate(uint64_t period_s) {
959  QuotaInfo info;
960  int retval = GetInfo(&info);
961  if (retval != 0)
962  return 0;
963  return info.no_shrink;
964 }
965 
966 
968  QuotaInfo info;
969  int retval = GetInfo(&info);
970  if (retval != 0)
971  return 0;
972  return info.used;
973 }
974 
975 
977  QuotaInfo info;
978  int retval = GetInfo(&info);
979  if (retval != 0)
980  return 0;
981  return info.pinned;
982 }
983 
984 
986  switch (capability) {
987  case kCapIntrospectSize:
988  return cache_mgr_->capabilities_ & cvmfs::CAP_INFO;
990  return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK_RATE;
991  case kCapList:
992  return cache_mgr_->capabilities_ & cvmfs::CAP_LIST;
993  case kCapShrink:
994  return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK;
995  case kCapListeners:
996  return true;
997  default:
998  return false;
999  }
1000 }
1001 
1002 
1003 vector<string> ExternalQuotaManager::List() {
1004  vector<string> result;
1005  vector<cvmfs::MsgListRecord> raw_list;
1006  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
1007  if (!retval)
1008  return result;
1009  for (unsigned i = 0; i < raw_list.size(); ++i)
1010  result.push_back(raw_list[i].description());
1011  return result;
1012 }
1013 
1014 
1016  vector<string> result;
1017  vector<cvmfs::MsgListRecord> raw_list;
1018  bool retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
1019  if (!retval)
1020  return result;
1021  for (unsigned i = 0; i < raw_list.size(); ++i)
1022  result.push_back(raw_list[i].description());
1023  return result;
1024 }
1025 
1026 
1028  vector<string> result;
1029  vector<cvmfs::MsgListRecord> raw_lists[3];
1030  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
1031  if (!retval)
1032  return result;
1033  retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
1034  if (!retval)
1035  return result;
1036  retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
1037  if (!retval)
1038  return result;
1039  for (unsigned i = 0; i < sizeof(raw_lists) / sizeof(raw_lists[0]); ++i) {
1040  for (unsigned j = 0; j < raw_lists[i].size(); ++j) {
1041  if (raw_lists[i][j].pinned())
1042  result.push_back(raw_lists[i][j].description());
1043  }
1044  }
1045  return result;
1046 }
1047 
1048 
1050  vector<string> result;
1051  vector<cvmfs::MsgListRecord> raw_list;
1052  bool retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
1053  if (!retval)
1054  return result;
1055  for (unsigned i = 0; i < raw_list.size(); ++i)
1056  result.push_back(raw_list[i].description());
1057  return result;
1058 }
1059 
1060 
1062  int back_channel[2],
1063  const string &channel_id)
1064 {
1065  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1066  MakePipe(back_channel);
1067  LockBackChannels();
1068  assert(back_channels_.find(hash_id) == back_channels_.end());
1069  back_channels_[hash_id] = back_channel[1];
1071 }
1072 
1073 
1075  int back_channel[2],
1076  const string &channel_id)
1077 {
1078  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1079  LockBackChannels();
1080  back_channels_.erase(hash_id);
1082  ClosePipe(back_channel);
1083 }
CacheTransport transport_
Definition: cache_extern.h:290
int Flush(bool do_commit, Transaction *transaction)
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
pthread_mutex_t lock_inflight_rpcs_
Definition: cache_extern.h:303
void AssignFrom(const FdTable< HandleT > &other)
Definition: fd_table.h:48
virtual ~ExternalCacheManager()
virtual uint64_t GetSizePinned()
int ChangeRefcount(const shash::Any &id, int change_by)
void set_attachment_recv(void *data, unsigned size)
Definition: cache_extern.h:197
const manifest::Manifest * manifest() const
Definition: repository.h:123
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
virtual uint64_t GetCleanupRate(uint64_t period_s)
bool DoListing(cvmfs::EnumObjectType type, std::vector< cvmfs::MsgListRecord > *result)
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
int Ack2Errno(cvmfs::EnumStatus status_code)
Definition: cache_extern.cc:42
uint64_t timestamp
Definition: manifest.h:33
virtual int Readahead(int fd)
virtual bool Cleanup(const uint64_t leave_size)
void BroadcastBackchannels(const std::string &message)
Definition: quota.cc:21
vector< string > SplitString(const string &str, const char delim, const unsigned max_chunks)
Definition: string.cc:288
shash::Any catalog_hash
Definition: manifest.h:32
pthread_rwlock_t rwlock_fd_table_
Definition: cache_extern.h:295
cvmfs::MsgShrinkReply * msg_shrink_reply()
Definition: cache_extern.h:235
cvmfs::MsgRefcountReply * msg_refcount_reply()
Definition: cache_extern.h:203
#define PANIC(...)
Definition: exception.h:26
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:318
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:248
int ConnectTcpEndpoint(const std::string &ipv4_address, int portno)
Definition: posix.cc:494
Capabilities
Definition: quota.h:47
cvmfs::MsgReadReply * msg_read_reply()
Definition: cache_extern.h:216
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1784
virtual int AbortTxn(void *txn)
Definition: cache_extern.cc:76
virtual std::string Describe()
void Wakeup()
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_extern.cc:85
virtual std::vector< std::string > ListCatalogs()
std::vector< RpcInFlight > inflight_rpcs_
Definition: cache_extern.h:302
static void * MainRead(void *data)
atomic_int64 next_request_id_
Definition: cache_extern.h:296
virtual int Reset(void *txn)
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
virtual std::vector< std::string > ListPinned()
virtual std::vector< std::string > List()
static const char kReadyNotification
void set_attachment_send(void *data, unsigned size)
Definition: cache_extern.h:193
std::string description
Definition: cache.h:97
void MakePipe(int pipe_fd[2])
Definition: posix.cc:525
cvmfs::MsgListReply * msg_list_reply()
Definition: cache_extern.h:241
ExternalCacheManager(int fd_connection, unsigned max_open_fds)
virtual int Close(int fd)
ExternalCacheManager * cache_mgr_
Definition: cache_extern.h:365
cvmfs::MsgStoreReply * msg_store_reply()
Definition: cache_extern.h:222
FdTable< ReadOnlyHandle > fd_table_
Definition: cache_extern.h:289
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
cvmfs::MsgBreadcrumbReply * msg_breadcrumb_reply()
Definition: cache_extern.h:247
void MergeFrom(const Frame &other)
uint64_t publish_timestamp() const
Definition: manifest.h:128
void Transaction()
std::string repository_name() const
Definition: manifest.h:122
ObjectType type
Definition: cache.h:93
const char kSuffixCatalog
Definition: hash.h:53
virtual void * DoSaveState()
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
virtual int Dup(int fd)
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
uint32_t att_size() const
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
void LockBackChannels()
Definition: quota.h:96
CacheTransport::Frame * frame_send()
Definition: cache_extern.h:255
static const shash::Any kInvalidHandle
Definition: cache_extern.h:113
cvmfs::MsgObjectInfoReply * msg_object_info_reply()
Definition: cache_extern.h:209
uint64_t req_id() const
Definition: cache_extern.h:257
shash::Any catalog_hash() const
Definition: manifest.h:124
string StringifyInt(const int64_t value)
Definition: string.cc:78
int GetInfo(QuotaInfo *quota_info)
google::protobuf::MessageLite * GetMsgTyped()
uint64_t part_nr() const
Definition: cache_extern.h:258
virtual void CtrlTxn(const ObjectInfo &object_info, const int flags, void *txn)
std::map< shash::Md5, int > back_channels_
Definition: quota.h:94
ExternalQuotaManager(ExternalCacheManager *cache_mgr)
Definition: cache_extern.h:359
static bool SpawnPlugin(const std::vector< std::string > &cmd_line)
cvmfs::MsgInfoReply * msg_info_reply()
Definition: cache_extern.h:229
int ConnectSocket(const std::string &path)
Definition: posix.cc:460
CacheTransport::Frame * frame_recv()
Definition: cache_extern.h:256
void Wait()
shash::Any GetHandle(int fd)
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
virtual int Open(const BlessedObject &object)
virtual int64_t GetSize(int fd)
static ExternalCacheManager * Create(int fd_connection, unsigned max_open_fds, const std::string &ident)
QuotaManager * quota_mgr_
Definition: cache.h:237
Definition: mutex.h:42
virtual void Spawn()
pthread_mutex_t lock_send_fd_
Definition: cache_extern.h:301
static ExternalQuotaManager * Create(ExternalCacheManager *cache_mgr)
virtual bool DoFreeState(void *data)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:1918
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
virtual bool HasCapability(Capabilities capability)
void set_attachment(void *attachment, uint32_t att_size)
Suffix suffix
Definition: hash.h:125
void CallRemotely(RpcJob *rpc_job)
Definition: cache_extern.cc:93
virtual std::vector< std::string > ListVolatile()
virtual int OpenFromTxn(void *txn)
static int ConnectLocator(const std::string &locator, bool print_error)
static PluginHandle * CreatePlugin(const std::string &locator, const std::vector< std::string > &cmd_line)
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
virtual uint64_t GetCapacity()
void Reset(uint32_t original_att_size)
static void size_t size
Definition: smalloc.h:47
void FillObjectType(CacheManager::ObjectType object_type, cvmfs::EnumObjectType *wire_type)
virtual int CommitTxn(void *txn)
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:584
virtual int DoRestoreState(void *data)
void UnlockBackChannels()
Definition: quota.h:100
int fd_connection() const
static const char * kEnvReadyNotifyFd
virtual uint64_t GetSize()
int DoOpen(const shash::Any &id)
static const uint64_t kSizeUnknown
Definition: cache.h:72