CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_extern.cc
Go to the documentation of this file.
1 
5 #include "cache_extern.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <stdint.h>
11 #include <sys/socket.h>
12 #include <unistd.h>
13 
14 #include <algorithm>
15 #include <cassert>
16 #ifdef __APPLE__
17 #include <cstdlib>
18 #endif
19 #include <cstring>
20 #include <map>
21 #include <new>
22 #include <set>
23 #include <string>
24 
25 #include "cache.pb.h"
26 #include "crypto/hash.h"
27 #include "util/atomic.h"
28 #include "util/concurrency.h"
29 #include "util/exception.h"
30 #include "util/logging.h"
31 #include "util/pointer.h"
32 #include "util/posix.h"
33 #ifdef __APPLE__
34 #include "util/smalloc.h"
35 #endif
36 #include "util/string.h"
37 
38 using namespace std; // NOLINT
39 
40 namespace {
41 
42 int Ack2Errno(cvmfs::EnumStatus status_code) {
43  switch (status_code) {
44  case cvmfs::STATUS_OK:
45  return 0;
46  case cvmfs::STATUS_NOSUPPORT:
47  return -EOPNOTSUPP;
48  case cvmfs::STATUS_FORBIDDEN:
49  return -EPERM;
50  case cvmfs::STATUS_NOSPACE:
51  return -ENOSPC;
52  case cvmfs::STATUS_NOENTRY:
53  return -ENOENT;
54  case cvmfs::STATUS_MALFORMED:
55  return -EINVAL;
56  case cvmfs::STATUS_IOERR:
57  return -EIO;
58  case cvmfs::STATUS_CORRUPTED:
59  return -EIO;
60  case cvmfs::STATUS_TIMEOUT:
61  return -EIO;
62  case cvmfs::STATUS_BADCOUNT:
63  return -EINVAL;
64  case cvmfs::STATUS_OUTOFBOUNDS:
65  return -EINVAL;
66  default:
67  return -EIO;
68  }
69 }
70 
71 } // anonymous namespace
72 
74 
75 
77  int result = Reset(txn);
78 #ifdef __APPLE__
79  free(reinterpret_cast<Transaction *>(txn)->buffer);
80 #endif
81  return result;
82 }
83 
84 
86  assert(quota_mgr != NULL);
87  quota_mgr_ = quota_mgr;
88  LogCvmfs(kLogCache, kLogDebug, "set quota manager");
89  return true;
90 }
91 
92 
94  if (!spawned_) {
95  transport_.SendFrame(rpc_job->frame_send());
96  uint32_t save_att_size = rpc_job->frame_recv()->att_size();
97  bool again;
98  do {
99  again = false;
100  bool retval = transport_.RecvFrame(rpc_job->frame_recv());
101  assert(retval);
102  if (rpc_job->frame_recv()->IsMsgOutOfBand()) {
103  google::protobuf::MessageLite *msg_typed =
104  rpc_job->frame_recv()->GetMsgTyped();
105  assert(msg_typed->GetTypeName() == "cvmfs.MsgDetach");
106  quota_mgr_->BroadcastBackchannels("R"); // release pinned catalogs
107  rpc_job->frame_recv()->Reset(save_att_size);
108  again = true;
109  }
110  } while (again);
111  } else {
112  Signal signal;
113  {
114  MutexLockGuard guard(lock_inflight_rpcs_);
115  inflight_rpcs_.push_back(RpcInFlight(rpc_job, &signal));
116  }
117  {
118  MutexLockGuard guard(lock_send_fd_);
119  transport_.SendFrame(rpc_job->frame_send());
120  }
121  signal.Wait();
122  }
123 }
124 
125 
126 int ExternalCacheManager::ChangeRefcount(const shash::Any &id, int change_by) {
127  cvmfs::MsgHash object_id;
128  transport_.FillMsgHash(id, &object_id);
129  cvmfs::MsgRefcountReq msg_refcount;
130  msg_refcount.set_session_id(session_id_);
131  msg_refcount.set_req_id(NextRequestId());
132  msg_refcount.set_allocated_object_id(&object_id);
133  msg_refcount.set_change_by(change_by);
134  RpcJob rpc_job(&msg_refcount);
135  CallRemotely(&rpc_job);
136  msg_refcount.release_object_id();
137 
138  cvmfs::MsgRefcountReply *msg_reply = rpc_job.msg_refcount_reply();
139  return Ack2Errno(msg_reply->status());
140 }
141 
142 
144  ReadOnlyHandle handle;
145  {
146  WriteLockGuard guard(rwlock_fd_table_);
147  handle = fd_table_.GetHandle(fd);
148  if (handle.id == kInvalidHandle)
149  return -EBADF;
150  int retval = fd_table_.CloseFd(fd);
151  assert(retval == 0);
152  }
153 
154  return ChangeRefcount(handle.id, -1);
155 }
156 
157 
159  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
160  LogCvmfs(kLogCache, kLogDebug, "committing %s",
161  transaction->id.ToString().c_str());
162  int retval = Flush(true, transaction);
163  if (retval != 0)
164  return retval;
165 
166  int refcount = transaction->open_fds - 1;
167  if (refcount != 0)
168  return ChangeRefcount(transaction->id, refcount);
169 #ifdef __APPLE__
170  free(transaction->buffer);
171 #endif
172  return 0;
173 }
174 
175 
177  const std::string &locator, bool print_error)
178 {
179  vector<string> tokens = SplitString(locator, '=');
180  int result = -1;
181  if (tokens[0] == "unix") {
182  result = ConnectSocket(tokens[1]);
183  } else if (tokens[0] == "tcp") {
184  vector<string> tcp_address = SplitString(tokens[1], ':');
185  if (tcp_address.size() != 2)
186  return -EINVAL;
187  result = ConnectTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
188  } else {
189  return -EINVAL;
190  }
191  if (result < 0) {
192  if (print_error) {
193  if (errno) {
195  "Failed to connect to socket: %s", strerror(errno));
196  } else {
198  "Failed to connect to socket (unknown error)");
199  }
200  }
201  return -EIO;
202  }
204  "connected to cache plugin at %s", locator.c_str());
205  return result;
206 }
207 
208 
210  int fd_connection,
211  unsigned max_open_fds,
212  const string &ident)
213 {
215  new ExternalCacheManager(fd_connection, max_open_fds));
216  assert(cache_mgr.IsValid());
217 
218  cvmfs::MsgHandshake msg_handshake;
219  msg_handshake.set_protocol_version(kPbProtocolVersion);
220  msg_handshake.set_name(ident);
221  CacheTransport::Frame frame_send(&msg_handshake);
222  cache_mgr->transport_.SendFrame(&frame_send);
223 
224  CacheTransport::Frame frame_recv;
225  bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
226  if (!retval)
227  return NULL;
228  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
229  if (msg_typed->GetTypeName() != "cvmfs.MsgHandshakeAck")
230  return NULL;
231  cvmfs::MsgHandshakeAck *msg_ack =
232  reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed);
233  cache_mgr->session_id_ = msg_ack->session_id();
234  cache_mgr->capabilities_ = msg_ack->capabilities();
235  cache_mgr->max_object_size_ = msg_ack->max_object_size();
236  assert(cache_mgr->max_object_size_ > 0);
237  if (cache_mgr->max_object_size_ > kMaxSupportedObjectSize) {
239  "external cache manager object size too large (%u)",
240  cache_mgr->max_object_size_);
241  return NULL;
242  }
243  if (cache_mgr->max_object_size_ < kMinSupportedObjectSize) {
245  "external cache manager object size too small (%u)",
246  cache_mgr->max_object_size_);
247  return NULL;
248  }
249  if (msg_ack->has_pid())
250  cache_mgr->pid_plugin_ = msg_ack->pid();
251  return cache_mgr.Release();
252 }
253 
254 
262  const std::string &locator,
263  const std::vector<std::string> &cmd_line)
264 {
265  UniquePtr<PluginHandle> plugin_handle(new PluginHandle());
266  unsigned num_attempts = 0;
267  bool try_again = false;
268  do {
269  num_attempts++;
270  if (num_attempts > 2) {
271  // Prevent violate busy loops
272  SafeSleepMs(1000);
273  }
274  plugin_handle->fd_connection_ = ConnectLocator(locator, num_attempts > 1);
275  if (plugin_handle->IsValid()) {
276  break;
277  } else if (plugin_handle->fd_connection_ == -EINVAL) {
279  "Invalid locator: %s", locator.c_str());
280  plugin_handle->error_msg_ = "Invalid locator: " + locator;
281  break;
282  } else {
283  if (num_attempts > 1) {
285  "Failed to connect to external cache manager: %d",
286  plugin_handle->fd_connection_);
287  }
288  plugin_handle->error_msg_ = "Failed to connect to external cache manager";
289  }
290 
291  try_again = SpawnPlugin(cmd_line);
292  } while (try_again);
293 
294  return plugin_handle.Release();
295 }
296 
297 
299  const Label &label,
300  const int flags,
301  void *txn)
302 {
303  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
304  transaction->label = label;
305  transaction->label_modified = true;
306 }
307 
308 
310  return "External cache manager\n";
311 }
312 
313 
315  FdTable<ReadOnlyHandle> *fd_table =
316  reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
317  delete fd_table;
318  return true;
319 }
320 
321 
323  int fd = -1;
324  {
325  WriteLockGuard guard(rwlock_fd_table_);
326  fd = fd_table_.OpenFd(ReadOnlyHandle(id));
327  if (fd < 0) {
328  LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
329  strerror(-fd));
330  return fd;
331  }
332  }
333 
334  int status_refcnt = ChangeRefcount(id, 1);
335  if (status_refcnt == 0)
336  return fd;
337 
338  WriteLockGuard guard(rwlock_fd_table_);
339  int retval = fd_table_.CloseFd(fd);
340  assert(retval == 0);
341  return status_refcnt;
342 }
343 
344 
346  // When DoRestoreState is called, we have fd 0 assigned to the root file
347  // catalog unless this is a lower layer cache in a tiered setup
348  for (unsigned i = 1; i < fd_table_.GetMaxFds(); ++i) {
349  assert(fd_table_.GetHandle(i) == ReadOnlyHandle());
350  }
351  ReadOnlyHandle handle_root = fd_table_.GetHandle(0);
352 
353  FdTable<ReadOnlyHandle> *other =
354  reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
355  fd_table_.AssignFrom(*other);
356  cvmfs::MsgIoctl msg_ioctl;
357  msg_ioctl.set_session_id(session_id_);
358  msg_ioctl.set_conncnt_change_by(-1);
359  CacheTransport::Frame frame(&msg_ioctl);
360  transport_.SendFrame(&frame);
361 
362  int new_root_fd = -1;
363  if (handle_root != ReadOnlyHandle()) {
364  new_root_fd = fd_table_.OpenFd(handle_root);
365  // There must be a free file descriptor because the root file catalog gets
366  // closed before a reload
367  assert(new_root_fd >= 0);
368  }
369  return new_root_fd;
370 }
371 
372 
374  cvmfs::MsgIoctl msg_ioctl;
375  msg_ioctl.set_session_id(session_id_);
376  msg_ioctl.set_conncnt_change_by(1);
377  CacheTransport::Frame frame(&msg_ioctl);
378  transport_.SendFrame(&frame);
379  return fd_table_.Clone();
380 }
381 
382 
384  shash::Any id = GetHandle(fd);
385  if (id == kInvalidHandle)
386  return -EBADF;
387  return DoOpen(id);
388 }
389 
390 
392  int fd_connection,
393  unsigned max_open_fds)
394  : pid_plugin_(0)
395  , fd_table_(max_open_fds, ReadOnlyHandle())
396  , transport_(fd_connection)
397  , session_id_(-1)
398  , max_object_size_(0)
399  , spawned_(false)
400  , terminated_(false)
401  , capabilities_(cvmfs::CAP_NONE)
402 {
403  int retval = pthread_rwlock_init(&rwlock_fd_table_, NULL);
404  assert(retval == 0);
405  retval = pthread_mutex_init(&lock_send_fd_, NULL);
406  assert(retval == 0);
407  retval = pthread_mutex_init(&lock_inflight_rpcs_, NULL);
408  assert(retval == 0);
409  memset(&thread_read_, 0, sizeof(thread_read_));
410  atomic_init64(&next_request_id_);
411 }
412 
413 
415  terminated_ = true;
416  MemoryFence();
417  if (session_id_ >= 0) {
418  cvmfs::MsgQuit msg_quit;
419  msg_quit.set_session_id(session_id_);
420  CacheTransport::Frame frame(&msg_quit);
421  transport_.SendFrame(&frame);
422  }
423  shutdown(transport_.fd_connection(), SHUT_RDWR);
424  if (spawned_)
425  pthread_join(thread_read_, NULL);
426  close(transport_.fd_connection());
427  pthread_rwlock_destroy(&rwlock_fd_table_);
428  pthread_mutex_destroy(&lock_send_fd_);
429  pthread_mutex_destroy(&lock_inflight_rpcs_);
430 }
431 
432 
433 int ExternalCacheManager::Flush(bool do_commit, Transaction *transaction) {
434  if (transaction->committed)
435  return 0;
436  LogCvmfs(kLogCache, kLogDebug, "flushing %u bytes for %s",
437  transaction->buf_pos, transaction->id.ToString().c_str());
438  cvmfs::MsgHash object_id;
439  transport_.FillMsgHash(transaction->id, &object_id);
440  cvmfs::MsgStoreReq msg_store;
441  msg_store.set_session_id(session_id_);
442  msg_store.set_req_id(transaction->transaction_id);
443  msg_store.set_allocated_object_id(&object_id);
444  msg_store.set_part_nr((transaction->size / max_object_size_) + 1);
445  msg_store.set_expected_size(transaction->expected_size);
446  msg_store.set_last_part(do_commit);
447 
448  if (transaction->label_modified) {
449  cvmfs::EnumObjectType object_type;
450  transport_.FillObjectType(transaction->label.flags, &object_type);
451  msg_store.set_object_type(object_type);
452  msg_store.set_description(transaction->label.GetDescription());
453  }
454 
455  RpcJob rpc_job(&msg_store);
456  rpc_job.set_attachment_send(transaction->buffer, transaction->buf_pos);
457  // TODO(jblomer): allow for out of order chunk upload
458  CallRemotely(&rpc_job);
459  msg_store.release_object_id();
460 
461  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
462  if (msg_reply->status() == cvmfs::STATUS_OK) {
463  transaction->flushed = true;
464  if (do_commit)
465  transaction->committed = true;
466  }
467  return Ack2Errno(msg_reply->status());
468 }
469 
470 
473  ReadOnlyHandle handle = fd_table_.GetHandle(fd);
474  return handle.id;
475 }
476 
477 
479  shash::Any id = GetHandle(fd);
480  if (id == kInvalidHandle)
481  return -EBADF;
482 
483  cvmfs::MsgHash object_id;
484  transport_.FillMsgHash(id, &object_id);
485  cvmfs::MsgObjectInfoReq msg_info;
486  msg_info.set_session_id(session_id_);
487  msg_info.set_req_id(NextRequestId());
488  msg_info.set_allocated_object_id(&object_id);
489  RpcJob rpc_job(&msg_info);
490  CallRemotely(&rpc_job);
491  msg_info.release_object_id();
492 
493  cvmfs::MsgObjectInfoReply *msg_reply = rpc_job.msg_object_info_reply();
494  if (msg_reply->status() == cvmfs::STATUS_OK) {
495  assert(msg_reply->has_size());
496  return msg_reply->size();
497  }
498  return Ack2Errno(msg_reply->status());
499 }
500 
501 
503  ExternalCacheManager *cache_mgr =
504  reinterpret_cast<ExternalCacheManager *>(data);
505  LogCvmfs(kLogCache, kLogDebug, "starting external cache reader thread");
506 
507  unsigned char buffer[cache_mgr->max_object_size_];
508  while (true) {
509  CacheTransport::Frame frame_recv;
510  frame_recv.set_attachment(buffer, cache_mgr->max_object_size_);
511  bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
512  if (!retval)
513  break;
514 
515  uint64_t req_id;
516  uint64_t part_nr = 0;
517  google::protobuf::MessageLite *msg = frame_recv.GetMsgTyped();
518  if (msg->GetTypeName() == "cvmfs.MsgRefcountReply") {
519  req_id = reinterpret_cast<cvmfs::MsgRefcountReply *>(msg)->req_id();
520  } else if (msg->GetTypeName() == "cvmfs.MsgObjectInfoReply") {
521  req_id = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg)->req_id();
522  } else if (msg->GetTypeName() == "cvmfs.MsgReadReply") {
523  req_id = reinterpret_cast<cvmfs::MsgReadReply *>(msg)->req_id();
524  } else if (msg->GetTypeName() == "cvmfs.MsgStoreReply") {
525  req_id = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->req_id();
526  part_nr = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->part_nr();
527  } else if (msg->GetTypeName() == "cvmfs.MsgInfoReply") {
528  req_id = reinterpret_cast<cvmfs::MsgInfoReply *>(msg)->req_id();
529  } else if (msg->GetTypeName() == "cvmfs.MsgShrinkReply") {
530  req_id = reinterpret_cast<cvmfs::MsgShrinkReply *>(msg)->req_id();
531  } else if (msg->GetTypeName() == "cvmfs.MsgListReply") {
532  req_id = reinterpret_cast<cvmfs::MsgListReply *>(msg)->req_id();
533  } else if (msg->GetTypeName() == "cvmfs.MsgBreadcrumbReply") {
534  req_id = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(msg)->req_id();
535  } else if (msg->GetTypeName() == "cvmfs.MsgDetach") {
536  // Release pinned catalogs
537  cache_mgr->quota_mgr_->BroadcastBackchannels("R");
538  continue;
539  } else {
540  PANIC(kLogSyslogErr | kLogDebug, "unexpected message %s",
541  msg->GetTypeName().c_str());
542  }
543 
544  RpcInFlight rpc_inflight;
545  {
546  MutexLockGuard guard(cache_mgr->lock_inflight_rpcs_);
547  for (unsigned i = 0; i < cache_mgr->inflight_rpcs_.size(); ++i) {
548  RpcJob *rpc_job = cache_mgr->inflight_rpcs_[i].rpc_job;
549  if ((rpc_job->req_id() == req_id) && (rpc_job->part_nr() == part_nr)) {
550  rpc_inflight = cache_mgr->inflight_rpcs_[i];
551  cache_mgr->inflight_rpcs_.erase(
552  cache_mgr->inflight_rpcs_.begin() + i);
553  break;
554  }
555  }
556  }
557  if (rpc_inflight.rpc_job == NULL) {
559  "got unmatched rpc reply");
560  continue;
561  }
562  rpc_inflight.rpc_job->frame_recv()->MergeFrom(frame_recv);
563  rpc_inflight.signal->Wakeup();
564  }
565 
566  if (!cache_mgr->terminated_) {
568  "connection to external cache manager broken (%d)", errno);
569  }
570  LogCvmfs(kLogCache, kLogDebug, "stopping external cache reader thread");
571  return NULL;
572 }
573 
574 
576  return DoOpen(object.id);
577 }
578 
579 
581  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
582  LogCvmfs(kLogCache, kLogDebug, "open fd for transaction %s",
583  transaction->id.ToString().c_str());
584  int retval = Flush(true, transaction);
585  if (retval != 0)
586  return retval;
587 
588  int fd = -1;
589  {
591  fd = fd_table_.OpenFd(ReadOnlyHandle(transaction->id));
592  if (fd < 0) {
593  LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
594  strerror(-fd));
595  return fd;
596  }
597  }
598  transaction->open_fds++;
599  return fd;
600 }
601 
602 
604  int fd,
605  void *buf,
606  uint64_t size,
607  uint64_t offset)
608 {
609  shash::Any id = GetHandle(fd);
610  if (id == kInvalidHandle)
611  return -EBADF;
612 
613  cvmfs::MsgHash object_id;
614  transport_.FillMsgHash(id, &object_id);
615  uint64_t nbytes = 0;
616  while (nbytes < size) {
617  uint64_t batch_size =
618  std::min(size - nbytes, static_cast<uint64_t>(max_object_size_));
619  cvmfs::MsgReadReq msg_read;
620  msg_read.set_session_id(session_id_);
621  msg_read.set_req_id(NextRequestId());
622  msg_read.set_allocated_object_id(&object_id);
623  msg_read.set_offset(offset + nbytes);
624  msg_read.set_size(batch_size);
625  RpcJob rpc_job(&msg_read);
626  rpc_job.set_attachment_recv(reinterpret_cast<char *>(buf) + nbytes,
627  batch_size);
628  CallRemotely(&rpc_job);
629  msg_read.release_object_id();
630 
631  cvmfs::MsgReadReply *msg_reply = rpc_job.msg_read_reply();
632  if (msg_reply->status() == cvmfs::STATUS_OK) {
633  nbytes += rpc_job.frame_recv()->att_size();
634  // Fuse sends in rounded up buffers, so short reads are expected
635  if (rpc_job.frame_recv()->att_size() < batch_size)
636  return nbytes;
637  } else {
638  return Ack2Errno(msg_reply->status());
639  }
640  }
641  return size;
642 }
643 
644 
646  shash::Any id = GetHandle(fd);
647  if (id == kInvalidHandle)
648  return -EBADF;
649  // No-op
650  return 0;
651 }
652 
653 
655  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
656  transaction->buf_pos = 0;
657  transaction->size = 0;
658  transaction->open_fds = 0;
659  transaction->committed = false;
660  transaction->label_modified = true;
661 
662  if (!transaction->flushed)
663  return 0;
664 
665  cvmfs::MsgHash object_id;
666  transport_.FillMsgHash(transaction->id, &object_id);
667  cvmfs::MsgStoreAbortReq msg_abort;
668  msg_abort.set_session_id(session_id_);
669  msg_abort.set_req_id(transaction->transaction_id);
670  msg_abort.set_allocated_object_id(&object_id);
671  RpcJob rpc_job(&msg_abort);
672  CallRemotely(&rpc_job);
673  msg_abort.release_object_id();
674  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
675  transaction->transaction_id = NextRequestId();
676  transaction->flushed = false;
677  return Ack2Errno(msg_reply->status());
678 }
679 
680 
682  const std::string &fqrn)
683 {
684  if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
685  return manifest::Breadcrumb();
686 
687  cvmfs::MsgBreadcrumbLoadReq msg_breadcrumb_load;
688  msg_breadcrumb_load.set_session_id(session_id_);
689  msg_breadcrumb_load.set_req_id(NextRequestId());
690  msg_breadcrumb_load.set_fqrn(fqrn);
691  RpcJob rpc_job(&msg_breadcrumb_load);
692  CallRemotely(&rpc_job);
693 
694  manifest::Breadcrumb breadcrumb;
695  cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
696  if (msg_reply->status() == cvmfs::STATUS_OK) {
697  assert(msg_reply->has_breadcrumb());
698  assert(msg_reply->breadcrumb().fqrn() == fqrn);
699  bool rv = transport_.ParseMsgHash(msg_reply->breadcrumb().hash(),
700  &breadcrumb.catalog_hash);
701  assert(rv);
703  breadcrumb.timestamp = msg_reply->breadcrumb().timestamp();
704  if (msg_reply->breadcrumb().has_revision()) {
705  breadcrumb.revision = msg_reply->breadcrumb().revision();
706  } else {
707  breadcrumb.revision = 0;
708  }
709  }
710  return breadcrumb;
711 }
712 
713 
715  if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
716  return false;
717 
718  cvmfs::MsgHash hash;
719  transport_.FillMsgHash(manifest.catalog_hash(), &hash);
720  cvmfs::MsgBreadcrumb breadcrumb;
721  breadcrumb.set_fqrn(manifest.repository_name());
722  breadcrumb.set_allocated_hash(&hash);
723  breadcrumb.set_timestamp(manifest.publish_timestamp());
724  breadcrumb.set_revision(manifest.revision());
725  cvmfs::MsgBreadcrumbStoreReq msg_breadcrumb_store;
726  msg_breadcrumb_store.set_session_id(session_id_);
727  msg_breadcrumb_store.set_req_id(NextRequestId());
728  msg_breadcrumb_store.set_allocated_breadcrumb(&breadcrumb);
729  RpcJob rpc_job(&msg_breadcrumb_store);
730  CallRemotely(&rpc_job);
731  msg_breadcrumb_store.release_breadcrumb();
732  breadcrumb.release_hash();
733 
734  cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
735  return msg_reply->status() == cvmfs::STATUS_OK;
736 }
737 
738 
740  int retval = pthread_create(&thread_read_, NULL, MainRead, this);
741  assert(retval == 0);
742  spawned_ = true;
743 }
744 
745 
750 bool ExternalCacheManager::SpawnPlugin(const vector<string> &cmd_line) {
751  if (cmd_line.empty())
752  return false;
753 
754  int pipe_ready[2];
755  MakePipe(pipe_ready);
756  set<int> preserve_filedes;
757  preserve_filedes.insert(pipe_ready[1]);
758 
759  int fd_null_read = open("/dev/null", O_RDONLY);
760  int fd_null_write = open("/dev/null", O_WRONLY);
761  assert((fd_null_read >= 0) && (fd_null_write >= 0));
762  map<int, int> map_fildes;
763  map_fildes[fd_null_read] = 0;
764  map_fildes[fd_null_write] = 1;
765  map_fildes[fd_null_write] = 2;
766 
767  pid_t child_pid;
768  int retval = setenv(CacheTransport::kEnvReadyNotifyFd,
769  StringifyInt(pipe_ready[1]).c_str(), 1);
770  assert(retval == 0);
771  retval = ManagedExec(cmd_line,
772  preserve_filedes,
773  map_fildes,
774  false, // drop_credentials
775  false, // clear_env
776  true, // double fork
777  &child_pid);
779  close(fd_null_read);
780  close(fd_null_write);
781  if (!retval) {
783  "failed to start cache plugin '%s'",
784  JoinStrings(cmd_line, " ").c_str());
785  ClosePipe(pipe_ready);
786  return false;
787  }
788 
790  "started cache plugin '%s' (pid %d), waiting for it to become ready",
791  JoinStrings(cmd_line, " ").c_str(), child_pid);
792  close(pipe_ready[1]);
793  char buf;
794  if (read(pipe_ready[0], &buf, 1) != 1) {
795  close(pipe_ready[0]);
797  "cache plugin did not start properly");
798  return false;
799  }
800  close(pipe_ready[0]);
801 
803  return true;
805  "cache plugin failed to create an endpoint");
806  return false;
807 }
808 
809 
811  const shash::Any &id,
812  uint64_t size,
813  void *txn)
814 {
815  if (!(capabilities_ & cvmfs::CAP_WRITE))
816  return -EROFS;
817 
818  Transaction *transaction = new (txn) Transaction(id);
819  transaction->expected_size = size;
820  transaction->transaction_id = NextRequestId();
821 #ifdef __APPLE__
822  transaction->buffer =
823  reinterpret_cast<unsigned char *>(smalloc(max_object_size_));
824 #endif
825  return 0;
826 }
827 
828 
829 int64_t ExternalCacheManager::Write(const void *buf, uint64_t size, void *txn) {
830  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
831  assert(!transaction->committed);
832  LogCvmfs(kLogCache, kLogDebug, "writing %" PRIu64 " bytes for %s",
833  size, transaction->id.ToString().c_str());
834 
835  if (transaction->expected_size != kSizeUnknown) {
836  if (transaction->size + size > transaction->expected_size) {
838  "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
839  transaction->size + size, transaction->expected_size);
840  return -EFBIG;
841  }
842  }
843 
844  uint64_t written = 0;
845  const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
846  while (written < size) {
847  if (transaction->buf_pos == max_object_size_) {
848  bool do_commit = false;
849  if (transaction->expected_size != kSizeUnknown)
850  do_commit = (transaction->size + written) == transaction->expected_size;
851  int retval = Flush(do_commit, transaction);
852  if (retval != 0) {
853  transaction->size += written;
854  return retval;
855  }
856  transaction->size += transaction->buf_pos;
857  transaction->buf_pos = 0;
858  }
859  uint64_t remaining = size - written;
860  uint64_t space_in_buffer = max_object_size_ - transaction->buf_pos;
861  uint64_t batch_size = std::min(remaining, space_in_buffer);
862  memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
863  transaction->buf_pos += batch_size;
864  written += batch_size;
865  read_pos += batch_size;
866  }
867  return written;
868 }
869 
870 
871 //------------------------------------------------------------------------------
872 
873 
875  cvmfs::EnumObjectType type,
876  vector<cvmfs::MsgListRecord> *result)
877 {
878  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_LIST))
879  return false;
880 
881  uint64_t listing_id = 0;
882  bool more_data = false;
883  do {
884  cvmfs::MsgListReq msg_list;
885  msg_list.set_session_id(cache_mgr_->session_id_);
886  msg_list.set_req_id(cache_mgr_->NextRequestId());
887  msg_list.set_listing_id(listing_id);
888  msg_list.set_object_type(type);
889  ExternalCacheManager::RpcJob rpc_job(&msg_list);
890  cache_mgr_->CallRemotely(&rpc_job);
891 
892  cvmfs::MsgListReply *msg_reply = rpc_job.msg_list_reply();
893  if (msg_reply->status() != cvmfs::STATUS_OK)
894  return false;
895  more_data = !msg_reply->is_last_part();
896  listing_id = msg_reply->listing_id();
897  for (int i = 0; i < msg_reply->list_record_size(); ++i) {
898  result->push_back(msg_reply->list_record(i));
899  }
900  } while (more_data);
901 
902  return true;
903 }
904 
905 
906 bool ExternalQuotaManager::Cleanup(const uint64_t leave_size) {
907  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK))
908  return false;
909 
910  cvmfs::MsgShrinkReq msg_shrink;
911  msg_shrink.set_session_id(cache_mgr_->session_id_);
912  msg_shrink.set_req_id(cache_mgr_->NextRequestId());
913  msg_shrink.set_shrink_to(leave_size);
914  ExternalCacheManager::RpcJob rpc_job(&msg_shrink);
915  cache_mgr_->CallRemotely(&rpc_job);
916 
917  cvmfs::MsgShrinkReply *msg_reply = rpc_job.msg_shrink_reply();
918  return msg_reply->status() == cvmfs::STATUS_OK;
919 }
920 
921 
923  ExternalCacheManager *cache_mgr)
924 {
926  new ExternalQuotaManager(cache_mgr));
927  assert(quota_mgr.IsValid());
928 
929  return quota_mgr.Release();
930 }
931 
932 
934  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_INFO))
935  return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
936 
937  cvmfs::MsgInfoReq msg_info;
938  msg_info.set_session_id(cache_mgr_->session_id_);
939  msg_info.set_req_id(cache_mgr_->NextRequestId());
940  ExternalCacheManager::RpcJob rpc_job(&msg_info);
941  cache_mgr_->CallRemotely(&rpc_job);
942 
943  cvmfs::MsgInfoReply *msg_reply = rpc_job.msg_info_reply();
944  if (msg_reply->status() == cvmfs::STATUS_OK) {
945  quota_info->size = msg_reply->size_bytes();
946  quota_info->used = msg_reply->used_bytes();
947  quota_info->pinned = msg_reply->pinned_bytes();
948  if (msg_reply->no_shrink() >= 0)
949  quota_info->no_shrink = msg_reply->no_shrink();
950  }
951  return Ack2Errno(msg_reply->status());
952 }
953 
954 
956  QuotaInfo info;
957  int retval = GetInfo(&info);
958  if (retval != 0)
959  return uint64_t(-1);
960  return info.size;
961 }
962 
963 
964 uint64_t ExternalQuotaManager::GetCleanupRate(uint64_t period_s) {
965  QuotaInfo info;
966  int retval = GetInfo(&info);
967  if (retval != 0)
968  return 0;
969  return info.no_shrink;
970 }
971 
972 
974  QuotaInfo info;
975  int retval = GetInfo(&info);
976  if (retval != 0)
977  return 0;
978  return info.used;
979 }
980 
981 
983  QuotaInfo info;
984  int retval = GetInfo(&info);
985  if (retval != 0)
986  return 0;
987  return info.pinned;
988 }
989 
990 
992  switch (capability) {
993  case kCapIntrospectSize:
994  return cache_mgr_->capabilities_ & cvmfs::CAP_INFO;
996  return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK_RATE;
997  case kCapList:
998  return cache_mgr_->capabilities_ & cvmfs::CAP_LIST;
999  case kCapShrink:
1000  return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK;
1001  case kCapListeners:
1002  return true;
1003  default:
1004  return false;
1005  }
1006 }
1007 
1008 
1009 vector<string> ExternalQuotaManager::List() {
1010  vector<string> result;
1011  vector<cvmfs::MsgListRecord> raw_list;
1012  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
1013  if (!retval)
1014  return result;
1015  for (unsigned i = 0; i < raw_list.size(); ++i)
1016  result.push_back(raw_list[i].description());
1017  return result;
1018 }
1019 
1020 
1022  vector<string> result;
1023  vector<cvmfs::MsgListRecord> raw_list;
1024  bool retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
1025  if (!retval)
1026  return result;
1027  for (unsigned i = 0; i < raw_list.size(); ++i)
1028  result.push_back(raw_list[i].description());
1029  return result;
1030 }
1031 
1032 
1034  vector<string> result;
1035  vector<cvmfs::MsgListRecord> raw_lists[3];
1036  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
1037  if (!retval)
1038  return result;
1039  retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
1040  if (!retval)
1041  return result;
1042  retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
1043  if (!retval)
1044  return result;
1045  for (unsigned i = 0; i < sizeof(raw_lists) / sizeof(raw_lists[0]); ++i) {
1046  for (unsigned j = 0; j < raw_lists[i].size(); ++j) {
1047  if (raw_lists[i][j].pinned())
1048  result.push_back(raw_lists[i][j].description());
1049  }
1050  }
1051  return result;
1052 }
1053 
1054 
1056  vector<string> result;
1057  vector<cvmfs::MsgListRecord> raw_list;
1058  bool retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
1059  if (!retval)
1060  return result;
1061  for (unsigned i = 0; i < raw_list.size(); ++i)
1062  result.push_back(raw_list[i].description());
1063  return result;
1064 }
1065 
1066 
1068  int back_channel[2],
1069  const string &channel_id)
1070 {
1071  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1072  MakePipe(back_channel);
1073  LockBackChannels();
1074  assert(back_channels_.find(hash_id) == back_channels_.end());
1075  back_channels_[hash_id] = back_channel[1];
1077 }
1078 
1079 
1081  int back_channel[2],
1082  const string &channel_id)
1083 {
1084  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1085  LockBackChannels();
1086  back_channels_.erase(hash_id);
1088  ClosePipe(back_channel);
1089 }
CacheTransport transport_
Definition: cache_extern.h:290
int Flush(bool do_commit, Transaction *transaction)
pthread_mutex_t lock_inflight_rpcs_
Definition: cache_extern.h:303
void AssignFrom(const FdTable< HandleT > &other)
Definition: fd_table.h:48
virtual ~ExternalCacheManager()
virtual uint64_t GetSizePinned()
int ChangeRefcount(const shash::Any &id, int change_by)
void set_attachment_recv(void *data, unsigned size)
Definition: cache_extern.h:197
const manifest::Manifest * manifest() const
Definition: repository.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
std::string GetDescription() const
Definition: cache.h:110
virtual uint64_t GetCleanupRate(uint64_t period_s)
bool DoListing(cvmfs::EnumObjectType type, std::vector< cvmfs::MsgListRecord > *result)
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
int Ack2Errno(cvmfs::EnumStatus status_code)
Definition: cache_extern.cc:42
uint64_t timestamp
Definition: manifest.h:40
virtual int Readahead(int fd)
virtual bool Cleanup(const uint64_t leave_size)
void BroadcastBackchannels(const std::string &message)
Definition: quota.cc:21
shash::Any catalog_hash
Definition: manifest.h:39
pthread_rwlock_t rwlock_fd_table_
Definition: cache_extern.h:295
cvmfs::MsgShrinkReply * msg_shrink_reply()
Definition: cache_extern.h:235
cvmfs::MsgRefcountReply * msg_refcount_reply()
Definition: cache_extern.h:203
#define PANIC(...)
Definition: exception.h:29
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:343
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
int ConnectTcpEndpoint(const std::string &ipv4_address, int portno)
Definition: posix.cc:461
Capabilities
Definition: quota.h:47
cvmfs::MsgReadReply * msg_read_reply()
Definition: cache_extern.h:216
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1920
virtual int AbortTxn(void *txn)
Definition: cache_extern.cc:76
virtual std::string Describe()
void Wakeup()
Definition: concurrency.cc:59
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_extern.cc:85
virtual std::vector< std::string > ListCatalogs()
std::vector< RpcInFlight > inflight_rpcs_
Definition: cache_extern.h:302
static void * MainRead(void *data)
atomic_int64 next_request_id_
Definition: cache_extern.h:296
virtual int Reset(void *txn)
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void FillObjectType(int object_flags, cvmfs::EnumObjectType *wire_type)
virtual std::vector< std::string > ListPinned()
virtual std::vector< std::string > List()
static const char kReadyNotification
void set_attachment_send(void *data, unsigned size)
Definition: cache_extern.h:193
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
cvmfs::MsgListReply * msg_list_reply()
Definition: cache_extern.h:241
ExternalCacheManager(int fd_connection, unsigned max_open_fds)
virtual int Close(int fd)
uint64_t revision() const
Definition: manifest.h:129
ExternalCacheManager * cache_mgr_
Definition: cache_extern.h:366
cvmfs::MsgStoreReply * msg_store_reply()
Definition: cache_extern.h:222
FdTable< ReadOnlyHandle > fd_table_
Definition: cache_extern.h:289
uint64_t revision
Definition: manifest.h:41
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
cvmfs::MsgBreadcrumbReply * msg_breadcrumb_reply()
Definition: cache_extern.h:247
void MergeFrom(const Frame &other)
uint64_t publish_timestamp() const
Definition: manifest.h:136
void Transaction()
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:308
std::string repository_name() const
Definition: manifest.h:130
const char kSuffixCatalog
Definition: hash.h:54
virtual void * DoSaveState()
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
virtual int Dup(int fd)
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
uint32_t att_size() const
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
void LockBackChannels()
Definition: quota.h:97
CacheTransport::Frame * frame_send()
Definition: cache_extern.h:255
static const shash::Any kInvalidHandle
Definition: cache_extern.h:113
cvmfs::MsgObjectInfoReply * msg_object_info_reply()
Definition: cache_extern.h:209
uint64_t req_id() const
Definition: cache_extern.h:257
shash::Any catalog_hash() const
Definition: manifest.h:132
string StringifyInt(const int64_t value)
Definition: string.cc:78
int GetInfo(QuotaInfo *quota_info)
google::protobuf::MessageLite * GetMsgTyped()
uint64_t part_nr() const
Definition: cache_extern.h:258
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
std::map< shash::Md5, int > back_channels_
Definition: quota.h:95
ExternalQuotaManager(ExternalCacheManager *cache_mgr)
Definition: cache_extern.h:360
static bool SpawnPlugin(const std::vector< std::string > &cmd_line)
cvmfs::MsgInfoReply * msg_info_reply()
Definition: cache_extern.h:229
int ConnectSocket(const std::string &path)
Definition: posix.cc:427
CacheTransport::Frame * frame_recv()
Definition: cache_extern.h:256
void Wait()
Definition: concurrency.cc:49
shash::Any GetHandle(int fd)
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
Definition: string.cc:246
virtual int64_t GetSize(int fd)
static ExternalCacheManager * Create(int fd_connection, unsigned max_open_fds, const std::string &ident)
QuotaManager * quota_mgr_
Definition: cache.h:235
Definition: mutex.h:42
virtual void Spawn()
pthread_mutex_t lock_send_fd_
Definition: cache_extern.h:301
static ExternalQuotaManager * Create(ExternalCacheManager *cache_mgr)
virtual bool DoFreeState(void *data)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2049
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
virtual bool HasCapability(Capabilities capability)
void set_attachment(void *attachment, uint32_t att_size)
Suffix suffix
Definition: hash.h:126
void CallRemotely(RpcJob *rpc_job)
Definition: cache_extern.cc:93
virtual std::vector< std::string > ListVolatile()
virtual int OpenFromTxn(void *txn)
static int ConnectLocator(const std::string &locator, bool print_error)
static PluginHandle * CreatePlugin(const std::string &locator, const std::vector< std::string > &cmd_line)
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
virtual uint64_t GetCapacity()
void Reset(uint32_t original_att_size)
static void size_t size
Definition: smalloc.h:54
virtual int CommitTxn(void *txn)
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:562
virtual int DoRestoreState(void *data)
void UnlockBackChannels()
Definition: quota.h:101
int fd_connection() const
static const char * kEnvReadyNotifyFd
virtual uint64_t GetSize()
int DoOpen(const shash::Any &id)
static const uint64_t kSizeUnknown
Definition: cache.h:74
virtual int Open(const LabeledObject &object)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528