CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cache_extern.cc
Go to the documentation of this file.
1 
5 #include "cache_extern.h"
6 
7 #include <errno.h>
8 #include <fcntl.h>
9 #include <inttypes.h>
10 #include <stdint.h>
11 #include <sys/socket.h>
12 #include <unistd.h>
13 
14 #include <algorithm>
15 #include <cassert>
16 #ifdef __APPLE__
17 #include <cstdlib>
18 #endif
19 #include <cstring>
20 #include <map>
21 #include <new>
22 #include <set>
23 #include <string>
24 
25 #include "cache.pb.h"
26 #include "crypto/hash.h"
27 #include "util/atomic.h"
28 #include "util/concurrency.h"
29 #include "util/exception.h"
30 #include "util/logging.h"
31 #include "util/pointer.h"
32 #include "util/posix.h"
33 #ifdef __APPLE__
34 #include "util/smalloc.h"
35 #endif
36 #include "util/string.h"
37 
38 using namespace std; // NOLINT
39 
40 namespace {
41 
42 int Ack2Errno(cvmfs::EnumStatus status_code) {
43  switch (status_code) {
44  case cvmfs::STATUS_OK:
45  return 0;
46  case cvmfs::STATUS_NOSUPPORT:
47  return -EOPNOTSUPP;
48  case cvmfs::STATUS_FORBIDDEN:
49  return -EPERM;
50  case cvmfs::STATUS_NOSPACE:
51  return -ENOSPC;
52  case cvmfs::STATUS_NOENTRY:
53  return -ENOENT;
54  case cvmfs::STATUS_MALFORMED:
55  return -EINVAL;
56  case cvmfs::STATUS_IOERR:
57  return -EIO;
58  case cvmfs::STATUS_CORRUPTED:
59  return -EIO;
60  case cvmfs::STATUS_TIMEOUT:
61  return -EIO;
62  case cvmfs::STATUS_BADCOUNT:
63  return -EINVAL;
64  case cvmfs::STATUS_OUTOFBOUNDS:
65  return -EINVAL;
66  default:
67  return -EIO;
68  }
69 }
70 
71 } // anonymous namespace
72 
74 
75 
77  int result = Reset(txn);
78 #ifdef __APPLE__
79  free(reinterpret_cast<Transaction *>(txn)->buffer);
80 #endif
81  return result;
82 }
83 
84 
86  assert(quota_mgr != NULL);
87  quota_mgr_ = quota_mgr;
88  LogCvmfs(kLogCache, kLogDebug, "set quota manager");
89  return true;
90 }
91 
92 
94  if (!spawned_) {
95  transport_.SendFrame(rpc_job->frame_send());
96  uint32_t save_att_size = rpc_job->frame_recv()->att_size();
97  bool again;
98  do {
99  again = false;
100  bool retval = transport_.RecvFrame(rpc_job->frame_recv());
101  assert(retval);
102  if (rpc_job->frame_recv()->IsMsgOutOfBand()) {
103  google::protobuf::MessageLite *msg_typed = rpc_job->frame_recv()
104  ->GetMsgTyped();
105  assert(msg_typed->GetTypeName() == "cvmfs.MsgDetach");
106  quota_mgr_->BroadcastBackchannels("R"); // release pinned catalogs
107  rpc_job->frame_recv()->Reset(save_att_size);
108  again = true;
109  }
110  } while (again);
111  } else {
112  Signal signal;
113  {
114  MutexLockGuard guard(lock_inflight_rpcs_);
115  inflight_rpcs_.push_back(RpcInFlight(rpc_job, &signal));
116  }
117  {
118  MutexLockGuard guard(lock_send_fd_);
119  transport_.SendFrame(rpc_job->frame_send());
120  }
121  signal.Wait();
122  }
123 }
124 
125 
126 int ExternalCacheManager::ChangeRefcount(const shash::Any &id, int change_by) {
127  cvmfs::MsgHash object_id;
128  transport_.FillMsgHash(id, &object_id);
129  cvmfs::MsgRefcountReq msg_refcount;
130  msg_refcount.set_session_id(session_id_);
131  msg_refcount.set_req_id(NextRequestId());
132  msg_refcount.set_allocated_object_id(&object_id);
133  msg_refcount.set_change_by(change_by);
134  RpcJob rpc_job(&msg_refcount);
135  CallRemotely(&rpc_job);
136  msg_refcount.release_object_id();
137 
138  cvmfs::MsgRefcountReply *msg_reply = rpc_job.msg_refcount_reply();
139  return Ack2Errno(msg_reply->status());
140 }
141 
142 
144  ReadOnlyHandle handle;
145  {
146  WriteLockGuard guard(rwlock_fd_table_);
147  handle = fd_table_.GetHandle(fd);
148  if (handle.id == kInvalidHandle)
149  return -EBADF;
150  int retval = fd_table_.CloseFd(fd);
151  assert(retval == 0);
152  }
153 
154  return ChangeRefcount(handle.id, -1);
155 }
156 
157 
159  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
160  LogCvmfs(kLogCache, kLogDebug, "committing %s",
161  std::string(transaction->id.ToString()).c_str());
162  int retval = Flush(true, transaction);
163  if (retval != 0)
164  return retval;
165 
166  int refcount = transaction->open_fds - 1;
167  if (refcount != 0)
168  return ChangeRefcount(transaction->id, refcount);
169 #ifdef __APPLE__
170  free(transaction->buffer);
171 #endif
172  return 0;
173 }
174 
175 
176 int ExternalCacheManager::ConnectLocator(const std::string &locator,
177  bool print_error) {
178  vector<string> tokens = SplitString(locator, '=');
179  int result = -1;
180  if (tokens[0] == "unix") {
181  result = ConnectSocket(tokens[1]);
182  } else if (tokens[0] == "tcp") {
183  vector<string> tcp_address = SplitString(tokens[1], ':');
184  if (tcp_address.size() != 2)
185  return -EINVAL;
186  result = ConnectTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
187  } else {
188  return -EINVAL;
189  }
190  if (result < 0) {
191  if (print_error) {
192  if (errno) {
194  "Failed to connect to socket: %s", strerror(errno));
195  } else {
197  "Failed to connect to socket (unknown error)");
198  }
199  }
200  return -EIO;
201  }
202  LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "connected to cache plugin at %s",
203  locator.c_str());
204  return result;
205 }
206 
207 
209  unsigned max_open_fds,
210  const string &ident) {
212  new ExternalCacheManager(fd_connection, max_open_fds));
213  assert(cache_mgr.IsValid());
214 
215  cvmfs::MsgHandshake msg_handshake;
216  msg_handshake.set_protocol_version(kPbProtocolVersion);
217  msg_handshake.set_name(ident);
218  CacheTransport::Frame frame_send(&msg_handshake);
219  cache_mgr->transport_.SendFrame(&frame_send);
220 
221  CacheTransport::Frame frame_recv;
222  bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
223  if (!retval)
224  return NULL;
225  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
226  if (msg_typed->GetTypeName() != "cvmfs.MsgHandshakeAck")
227  return NULL;
228  cvmfs::MsgHandshakeAck *msg_ack = reinterpret_cast<cvmfs::MsgHandshakeAck *>(
229  msg_typed);
230  cache_mgr->session_id_ = msg_ack->session_id();
231  cache_mgr->capabilities_ = msg_ack->capabilities();
232  cache_mgr->max_object_size_ = msg_ack->max_object_size();
233  assert(cache_mgr->max_object_size_ > 0);
234  if (cache_mgr->max_object_size_ > kMaxSupportedObjectSize) {
236  "external cache manager object size too large (%u)",
237  cache_mgr->max_object_size_);
238  return NULL;
239  }
240  if (cache_mgr->max_object_size_ < kMinSupportedObjectSize) {
242  "external cache manager object size too small (%u)",
243  cache_mgr->max_object_size_);
244  return NULL;
245  }
246  if (msg_ack->has_pid())
247  cache_mgr->pid_plugin_ = msg_ack->pid();
248  return cache_mgr.Release();
249 }
250 
251 
259  const std::string &locator, const std::vector<std::string> &cmd_line) {
260  UniquePtr<PluginHandle> plugin_handle(new PluginHandle());
261  unsigned num_attempts = 0;
262  bool try_again = false;
263  do {
264  num_attempts++;
265  if (num_attempts > 2) {
266  // Prevent violate busy loops
267  SafeSleepMs(1000);
268  }
269  plugin_handle->fd_connection_ = ConnectLocator(locator, num_attempts > 1);
270  if (plugin_handle->IsValid()) {
271  break;
272  } else if (plugin_handle->fd_connection_ == -EINVAL) {
273  LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "Invalid locator: %s",
274  locator.c_str());
275  plugin_handle->error_msg_ = "Invalid locator: " + locator;
276  break;
277  } else {
278  if (num_attempts > 1) {
280  "Failed to connect to external cache manager: %d",
281  plugin_handle->fd_connection_);
282  }
283  plugin_handle->error_msg_ = "Failed to connect to external cache manager";
284  }
285 
286  try_again = SpawnPlugin(cmd_line);
287  } while (try_again);
288 
289  return plugin_handle.Release();
290 }
291 
292 
294  const int flags,
295  void *txn) {
296  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
297  transaction->label = label;
298  transaction->label_modified = true;
299 }
300 
301 
302 string ExternalCacheManager::Describe() { return "External cache manager\n"; }
303 
304 
307  *fd_table = reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
308  delete fd_table;
309  return true;
310 }
311 
312 
314  int fd = -1;
315  {
316  WriteLockGuard guard(rwlock_fd_table_);
317  fd = fd_table_.OpenFd(ReadOnlyHandle(id));
318  if (fd < 0) {
319  LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
320  strerror(-fd));
321  return fd;
322  }
323  }
324 
325  int status_refcnt = ChangeRefcount(id, 1);
326  if (status_refcnt == 0)
327  return fd;
328 
329  WriteLockGuard guard(rwlock_fd_table_);
330  int retval = fd_table_.CloseFd(fd);
331  assert(retval == 0);
332  return status_refcnt;
333 }
334 
335 
337  // When DoRestoreState is called, we have fd 0 assigned to the root file
338  // catalog unless this is a lower layer cache in a tiered setup
339  for (unsigned i = 1; i < fd_table_.GetMaxFds(); ++i) {
340  assert(fd_table_.GetHandle(i) == ReadOnlyHandle());
341  }
342  ReadOnlyHandle handle_root = fd_table_.GetHandle(0);
343 
344  FdTable<ReadOnlyHandle> *other = reinterpret_cast<FdTable<ReadOnlyHandle> *>(
345  data);
346  fd_table_.AssignFrom(*other);
347  cvmfs::MsgIoctl msg_ioctl;
348  msg_ioctl.set_session_id(session_id_);
349  msg_ioctl.set_conncnt_change_by(-1);
350  CacheTransport::Frame frame(&msg_ioctl);
351  transport_.SendFrame(&frame);
352 
353  int new_root_fd = -1;
354  if (handle_root != ReadOnlyHandle()) {
355  new_root_fd = fd_table_.OpenFd(handle_root);
356  // There must be a free file descriptor because the root file catalog gets
357  // closed before a reload
358  assert(new_root_fd >= 0);
359  }
360  return new_root_fd;
361 }
362 
363 
365  cvmfs::MsgIoctl msg_ioctl;
366  msg_ioctl.set_session_id(session_id_);
367  msg_ioctl.set_conncnt_change_by(1);
368  CacheTransport::Frame frame(&msg_ioctl);
369  transport_.SendFrame(&frame);
370  return fd_table_.Clone();
371 }
372 
373 
375  shash::Any id = GetHandle(fd);
376  if (id == kInvalidHandle)
377  return -EBADF;
378  return DoOpen(id);
379 }
380 
381 
383  unsigned max_open_fds)
384  : pid_plugin_(0)
385  , fd_table_(max_open_fds, ReadOnlyHandle())
386  , transport_(fd_connection)
387  , session_id_(-1)
388  , max_object_size_(0)
389  , spawned_(false)
390  , terminated_(false)
391  , capabilities_(cvmfs::CAP_NONE) {
392  int retval = pthread_rwlock_init(&rwlock_fd_table_, NULL);
393  assert(retval == 0);
394  retval = pthread_mutex_init(&lock_send_fd_, NULL);
395  assert(retval == 0);
396  retval = pthread_mutex_init(&lock_inflight_rpcs_, NULL);
397  assert(retval == 0);
398  memset(&thread_read_, 0, sizeof(thread_read_));
399  atomic_init64(&next_request_id_);
400 }
401 
402 
404  terminated_ = true;
405  MemoryFence();
406  if (session_id_ >= 0) {
407  cvmfs::MsgQuit msg_quit;
408  msg_quit.set_session_id(session_id_);
409  CacheTransport::Frame frame(&msg_quit);
410  transport_.SendFrame(&frame);
411  }
412  shutdown(transport_.fd_connection(), SHUT_RDWR);
413  if (spawned_)
414  pthread_join(thread_read_, NULL);
415  close(transport_.fd_connection());
416  pthread_rwlock_destroy(&rwlock_fd_table_);
417  pthread_mutex_destroy(&lock_send_fd_);
418  pthread_mutex_destroy(&lock_inflight_rpcs_);
419 }
420 
421 
422 int ExternalCacheManager::Flush(bool do_commit, Transaction *transaction) {
423  if (transaction->committed)
424  return 0;
425  LogCvmfs(kLogCache, kLogDebug, "flushing %u bytes for %s",
426  transaction->buf_pos,
427  std::string(transaction->id.ToString()).c_str());
428  cvmfs::MsgHash object_id;
429  transport_.FillMsgHash(transaction->id, &object_id);
430  cvmfs::MsgStoreReq msg_store;
431  msg_store.set_session_id(session_id_);
432  msg_store.set_req_id(transaction->transaction_id);
433  msg_store.set_allocated_object_id(&object_id);
434  msg_store.set_part_nr((transaction->size / max_object_size_) + 1);
435  msg_store.set_expected_size(transaction->expected_size);
436  msg_store.set_last_part(do_commit);
437 
438  if (transaction->label_modified) {
439  cvmfs::EnumObjectType object_type;
440  transport_.FillObjectType(transaction->label.flags, &object_type);
441  msg_store.set_object_type(object_type);
442  msg_store.set_description(transaction->label.GetDescription());
443  }
444 
445  RpcJob rpc_job(&msg_store);
446  rpc_job.set_attachment_send(transaction->buffer, transaction->buf_pos);
447  // TODO(jblomer): allow for out of order chunk upload
448  CallRemotely(&rpc_job);
449  msg_store.release_object_id();
450 
451  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
452  if (msg_reply->status() == cvmfs::STATUS_OK) {
453  transaction->flushed = true;
454  if (do_commit)
455  transaction->committed = true;
456  }
457  return Ack2Errno(msg_reply->status());
458 }
459 
460 
463  ReadOnlyHandle handle = fd_table_.GetHandle(fd);
464  return handle.id;
465 }
466 
467 
469  shash::Any id = GetHandle(fd);
470  if (id == kInvalidHandle)
471  return -EBADF;
472 
473  cvmfs::MsgHash object_id;
474  transport_.FillMsgHash(id, &object_id);
475  cvmfs::MsgObjectInfoReq msg_info;
476  msg_info.set_session_id(session_id_);
477  msg_info.set_req_id(NextRequestId());
478  msg_info.set_allocated_object_id(&object_id);
479  RpcJob rpc_job(&msg_info);
480  CallRemotely(&rpc_job);
481  msg_info.release_object_id();
482 
483  cvmfs::MsgObjectInfoReply *msg_reply = rpc_job.msg_object_info_reply();
484  if (msg_reply->status() == cvmfs::STATUS_OK) {
485  assert(msg_reply->has_size());
486  return msg_reply->size();
487  }
488  return Ack2Errno(msg_reply->status());
489 }
490 
491 
493  ExternalCacheManager *cache_mgr = reinterpret_cast<ExternalCacheManager *>(
494  data);
495  LogCvmfs(kLogCache, kLogDebug, "starting external cache reader thread");
496 
497  unsigned char buffer[cache_mgr->max_object_size_];
498  while (true) {
499  CacheTransport::Frame frame_recv;
500  frame_recv.set_attachment(buffer, cache_mgr->max_object_size_);
501  bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
502  if (!retval)
503  break;
504 
505  uint64_t req_id;
506  uint64_t part_nr = 0;
507  google::protobuf::MessageLite *msg = frame_recv.GetMsgTyped();
508  if (msg->GetTypeName() == "cvmfs.MsgRefcountReply") {
509  req_id = reinterpret_cast<cvmfs::MsgRefcountReply *>(msg)->req_id();
510  } else if (msg->GetTypeName() == "cvmfs.MsgObjectInfoReply") {
511  req_id = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg)->req_id();
512  } else if (msg->GetTypeName() == "cvmfs.MsgReadReply") {
513  req_id = reinterpret_cast<cvmfs::MsgReadReply *>(msg)->req_id();
514  } else if (msg->GetTypeName() == "cvmfs.MsgStoreReply") {
515  req_id = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->req_id();
516  part_nr = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->part_nr();
517  } else if (msg->GetTypeName() == "cvmfs.MsgInfoReply") {
518  req_id = reinterpret_cast<cvmfs::MsgInfoReply *>(msg)->req_id();
519  } else if (msg->GetTypeName() == "cvmfs.MsgShrinkReply") {
520  req_id = reinterpret_cast<cvmfs::MsgShrinkReply *>(msg)->req_id();
521  } else if (msg->GetTypeName() == "cvmfs.MsgListReply") {
522  req_id = reinterpret_cast<cvmfs::MsgListReply *>(msg)->req_id();
523  } else if (msg->GetTypeName() == "cvmfs.MsgBreadcrumbReply") {
524  req_id = reinterpret_cast<cvmfs::MsgBreadcrumbReply *>(msg)->req_id();
525  } else if (msg->GetTypeName() == "cvmfs.MsgDetach") {
526  // Release pinned catalogs
527  cache_mgr->quota_mgr_->BroadcastBackchannels("R");
528  continue;
529  } else {
530  PANIC(kLogSyslogErr | kLogDebug, "unexpected message %s",
531  std::string(msg->GetTypeName()).c_str());
532  }
533 
534  RpcInFlight rpc_inflight;
535  {
536  MutexLockGuard guard(cache_mgr->lock_inflight_rpcs_);
537  for (unsigned i = 0; i < cache_mgr->inflight_rpcs_.size(); ++i) {
538  RpcJob *rpc_job = cache_mgr->inflight_rpcs_[i].rpc_job;
539  if ((rpc_job->req_id() == req_id) && (rpc_job->part_nr() == part_nr)) {
540  rpc_inflight = cache_mgr->inflight_rpcs_[i];
541  cache_mgr->inflight_rpcs_.erase(cache_mgr->inflight_rpcs_.begin()
542  + i);
543  break;
544  }
545  }
546  }
547  if (rpc_inflight.rpc_job == NULL) {
549  "got unmatched rpc reply");
550  continue;
551  }
552  rpc_inflight.rpc_job->frame_recv()->MergeFrom(frame_recv);
553  rpc_inflight.signal->Wakeup();
554  }
555 
556  if (!cache_mgr->terminated_) {
558  "connection to external cache manager broken (%d)", errno);
559  }
560  LogCvmfs(kLogCache, kLogDebug, "stopping external cache reader thread");
561  return NULL;
562 }
563 
564 
566  return DoOpen(object.id);
567 }
568 
569 
571  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
572  LogCvmfs(kLogCache, kLogDebug, "open fd for transaction %s",
573  std::string(transaction->id.ToString()).c_str());
574  int retval = Flush(true, transaction);
575  if (retval != 0)
576  return retval;
577 
578  int fd = -1;
579  {
581  fd = fd_table_.OpenFd(ReadOnlyHandle(transaction->id));
582  if (fd < 0) {
583  LogCvmfs(kLogCache, kLogDebug, "error while creating new fd: %s",
584  strerror(-fd));
585  return fd;
586  }
587  }
588  transaction->open_fds++;
589  return fd;
590 }
591 
592 
594  void *buf,
595  uint64_t size,
596  uint64_t offset) {
597  shash::Any id = GetHandle(fd);
598  if (id == kInvalidHandle)
599  return -EBADF;
600 
601  cvmfs::MsgHash object_id;
602  transport_.FillMsgHash(id, &object_id);
603  uint64_t nbytes = 0;
604  while (nbytes < size) {
605  uint64_t batch_size = std::min(size - nbytes,
606  static_cast<uint64_t>(max_object_size_));
607  cvmfs::MsgReadReq msg_read;
608  msg_read.set_session_id(session_id_);
609  msg_read.set_req_id(NextRequestId());
610  msg_read.set_allocated_object_id(&object_id);
611  msg_read.set_offset(offset + nbytes);
612  msg_read.set_size(batch_size);
613  RpcJob rpc_job(&msg_read);
614  rpc_job.set_attachment_recv(reinterpret_cast<char *>(buf) + nbytes,
615  batch_size);
616  CallRemotely(&rpc_job);
617  msg_read.release_object_id();
618 
619  cvmfs::MsgReadReply *msg_reply = rpc_job.msg_read_reply();
620  if (msg_reply->status() == cvmfs::STATUS_OK) {
621  nbytes += rpc_job.frame_recv()->att_size();
622  // Fuse sends in rounded up buffers, so short reads are expected
623  if (rpc_job.frame_recv()->att_size() < batch_size)
624  return nbytes;
625  } else {
626  return Ack2Errno(msg_reply->status());
627  }
628  }
629  return size;
630 }
631 
632 
634  shash::Any id = GetHandle(fd);
635  if (id == kInvalidHandle)
636  return -EBADF;
637  // No-op
638  return 0;
639 }
640 
641 
643  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
644  transaction->buf_pos = 0;
645  transaction->size = 0;
646  transaction->open_fds = 0;
647  transaction->committed = false;
648  transaction->label_modified = true;
649 
650  if (!transaction->flushed)
651  return 0;
652 
653  cvmfs::MsgHash object_id;
654  transport_.FillMsgHash(transaction->id, &object_id);
655  cvmfs::MsgStoreAbortReq msg_abort;
656  msg_abort.set_session_id(session_id_);
657  msg_abort.set_req_id(transaction->transaction_id);
658  msg_abort.set_allocated_object_id(&object_id);
659  RpcJob rpc_job(&msg_abort);
660  CallRemotely(&rpc_job);
661  msg_abort.release_object_id();
662  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
663  transaction->transaction_id = NextRequestId();
664  transaction->flushed = false;
665  return Ack2Errno(msg_reply->status());
666 }
667 
668 
670  const std::string &fqrn) {
671  if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
672  return manifest::Breadcrumb();
673 
674  cvmfs::MsgBreadcrumbLoadReq msg_breadcrumb_load;
675  msg_breadcrumb_load.set_session_id(session_id_);
676  msg_breadcrumb_load.set_req_id(NextRequestId());
677  msg_breadcrumb_load.set_fqrn(fqrn);
678  RpcJob rpc_job(&msg_breadcrumb_load);
679  CallRemotely(&rpc_job);
680 
681  manifest::Breadcrumb breadcrumb;
682  cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
683  if (msg_reply->status() == cvmfs::STATUS_OK) {
684  assert(msg_reply->has_breadcrumb());
685  assert(msg_reply->breadcrumb().fqrn() == fqrn);
686  bool rv = transport_.ParseMsgHash(msg_reply->breadcrumb().hash(),
687  &breadcrumb.catalog_hash);
688  assert(rv);
690  breadcrumb.timestamp = msg_reply->breadcrumb().timestamp();
691  if (msg_reply->breadcrumb().has_revision()) {
692  breadcrumb.revision = msg_reply->breadcrumb().revision();
693  } else {
694  breadcrumb.revision = 0;
695  }
696  }
697  return breadcrumb;
698 }
699 
700 
702  if (!(capabilities_ & cvmfs::CAP_BREADCRUMB))
703  return false;
704 
705  cvmfs::MsgHash hash;
706  transport_.FillMsgHash(manifest.catalog_hash(), &hash);
707  cvmfs::MsgBreadcrumb breadcrumb;
708  breadcrumb.set_fqrn(manifest.repository_name());
709  breadcrumb.set_allocated_hash(&hash);
710  breadcrumb.set_timestamp(manifest.publish_timestamp());
711  breadcrumb.set_revision(manifest.revision());
712  cvmfs::MsgBreadcrumbStoreReq msg_breadcrumb_store;
713  msg_breadcrumb_store.set_session_id(session_id_);
714  msg_breadcrumb_store.set_req_id(NextRequestId());
715  msg_breadcrumb_store.set_allocated_breadcrumb(&breadcrumb);
716  RpcJob rpc_job(&msg_breadcrumb_store);
717  CallRemotely(&rpc_job);
718  msg_breadcrumb_store.release_breadcrumb();
719  breadcrumb.release_hash();
720 
721  cvmfs::MsgBreadcrumbReply *msg_reply = rpc_job.msg_breadcrumb_reply();
722  return msg_reply->status() == cvmfs::STATUS_OK;
723 }
724 
725 
727  int retval = pthread_create(&thread_read_, NULL, MainRead, this);
728  assert(retval == 0);
729  spawned_ = true;
730 }
731 
732 
737 bool ExternalCacheManager::SpawnPlugin(const vector<string> &cmd_line) {
738  if (cmd_line.empty())
739  return false;
740 
741  int pipe_ready[2];
742  MakePipe(pipe_ready);
743  set<int> preserve_filedes;
744  preserve_filedes.insert(pipe_ready[1]);
745 
746  int fd_null_read = open("/dev/null", O_RDONLY);
747  int fd_null_write = open("/dev/null", O_WRONLY);
748  assert((fd_null_read >= 0) && (fd_null_write >= 0));
749  map<int, int> map_fildes;
750  map_fildes[fd_null_read] = 0;
751  map_fildes[fd_null_write] = 1;
752  map_fildes[fd_null_write] = 2;
753 
754  pid_t child_pid;
755  int retval = setenv(CacheTransport::kEnvReadyNotifyFd,
756  StringifyInt(pipe_ready[1]).c_str(), 1);
757  assert(retval == 0);
758  retval = ManagedExec(cmd_line,
759  preserve_filedes,
760  map_fildes,
761  false, // drop_credentials
762  false, // clear_env
763  true, // double fork
764  &child_pid);
766  close(fd_null_read);
767  close(fd_null_write);
768  if (!retval) {
770  "failed to start cache plugin '%s'",
771  JoinStrings(cmd_line, " ").c_str());
772  ClosePipe(pipe_ready);
773  return false;
774  }
775 
777  "started cache plugin '%s' (pid %d), waiting for it to become ready",
778  JoinStrings(cmd_line, " ").c_str(), child_pid);
779  close(pipe_ready[1]);
780  char buf;
781  if (read(pipe_ready[0], &buf, 1) != 1) {
782  close(pipe_ready[0]);
784  "cache plugin did not start properly");
785  return false;
786  }
787  close(pipe_ready[0]);
788 
790  return true;
792  "cache plugin failed to create an endpoint");
793  return false;
794 }
795 
796 
798  uint64_t size,
799  void *txn) {
800  if (!(capabilities_ & cvmfs::CAP_WRITE))
801  return -EROFS;
802 
803  Transaction *transaction = new (txn) Transaction(id);
804  transaction->expected_size = size;
805  transaction->transaction_id = NextRequestId();
806 #ifdef __APPLE__
807  transaction->buffer = reinterpret_cast<unsigned char *>(
808  smalloc(max_object_size_));
809 #endif
810  return 0;
811 }
812 
813 
814 int64_t ExternalCacheManager::Write(const void *buf, uint64_t size, void *txn) {
815  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
816  assert(!transaction->committed);
817  LogCvmfs(kLogCache, kLogDebug, "writing %" PRIu64 " bytes for %s", size,
818  transaction->id.ToString().c_str());
819 
820  if (transaction->expected_size != kSizeUnknown) {
821  if (transaction->size + size > transaction->expected_size) {
823  "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
824  transaction->size + size, transaction->expected_size);
825  return -EFBIG;
826  }
827  }
828 
829  uint64_t written = 0;
830  const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
831  while (written < size) {
832  if (transaction->buf_pos == max_object_size_) {
833  bool do_commit = false;
834  if (transaction->expected_size != kSizeUnknown)
835  do_commit = (transaction->size + written) == transaction->expected_size;
836  int retval = Flush(do_commit, transaction);
837  if (retval != 0) {
838  transaction->size += written;
839  return retval;
840  }
841  transaction->size += transaction->buf_pos;
842  transaction->buf_pos = 0;
843  }
844  uint64_t remaining = size - written;
845  uint64_t space_in_buffer = max_object_size_ - transaction->buf_pos;
846  uint64_t batch_size = std::min(remaining, space_in_buffer);
847  memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
848  transaction->buf_pos += batch_size;
849  written += batch_size;
850  read_pos += batch_size;
851  }
852  return written;
853 }
854 
855 
856 //------------------------------------------------------------------------------
857 
858 
859 bool ExternalQuotaManager::DoListing(cvmfs::EnumObjectType type,
860  vector<cvmfs::MsgListRecord> *result) {
861  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_LIST))
862  return false;
863 
864  uint64_t listing_id = 0;
865  bool more_data = false;
866  do {
867  cvmfs::MsgListReq msg_list;
868  msg_list.set_session_id(cache_mgr_->session_id_);
869  msg_list.set_req_id(cache_mgr_->NextRequestId());
870  msg_list.set_listing_id(listing_id);
871  msg_list.set_object_type(type);
872  ExternalCacheManager::RpcJob rpc_job(&msg_list);
873  cache_mgr_->CallRemotely(&rpc_job);
874 
875  cvmfs::MsgListReply *msg_reply = rpc_job.msg_list_reply();
876  if (msg_reply->status() != cvmfs::STATUS_OK)
877  return false;
878  more_data = !msg_reply->is_last_part();
879  listing_id = msg_reply->listing_id();
880  for (int i = 0; i < msg_reply->list_record_size(); ++i) {
881  result->push_back(msg_reply->list_record(i));
882  }
883  } while (more_data);
884 
885  return true;
886 }
887 
888 
889 bool ExternalQuotaManager::Cleanup(const uint64_t leave_size) {
890  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK))
891  return false;
892 
893  cvmfs::MsgShrinkReq msg_shrink;
894  msg_shrink.set_session_id(cache_mgr_->session_id_);
895  msg_shrink.set_req_id(cache_mgr_->NextRequestId());
896  msg_shrink.set_shrink_to(leave_size);
897  ExternalCacheManager::RpcJob rpc_job(&msg_shrink);
898  cache_mgr_->CallRemotely(&rpc_job);
899 
900  cvmfs::MsgShrinkReply *msg_reply = rpc_job.msg_shrink_reply();
901  return msg_reply->status() == cvmfs::STATUS_OK;
902 }
903 
904 
906  ExternalCacheManager *cache_mgr) {
908  new ExternalQuotaManager(cache_mgr));
909  assert(quota_mgr.IsValid());
910 
911  return quota_mgr.Release();
912 }
913 
914 
916  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_INFO))
917  return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
918 
919  cvmfs::MsgInfoReq msg_info;
920  msg_info.set_session_id(cache_mgr_->session_id_);
921  msg_info.set_req_id(cache_mgr_->NextRequestId());
922  ExternalCacheManager::RpcJob rpc_job(&msg_info);
923  cache_mgr_->CallRemotely(&rpc_job);
924 
925  cvmfs::MsgInfoReply *msg_reply = rpc_job.msg_info_reply();
926  if (msg_reply->status() == cvmfs::STATUS_OK) {
927  quota_info->size = msg_reply->size_bytes();
928  quota_info->used = msg_reply->used_bytes();
929  quota_info->pinned = msg_reply->pinned_bytes();
930  if (msg_reply->no_shrink() >= 0)
931  quota_info->no_shrink = msg_reply->no_shrink();
932  }
933  return Ack2Errno(msg_reply->status());
934 }
935 
936 
938  QuotaInfo info;
939  int retval = GetInfo(&info);
940  if (retval != 0)
941  return uint64_t(-1);
942  return info.size;
943 }
944 
945 
946 uint64_t ExternalQuotaManager::GetCleanupRate(uint64_t period_s) {
947  QuotaInfo info;
948  int retval = GetInfo(&info);
949  if (retval != 0)
950  return 0;
951  return info.no_shrink;
952 }
953 
954 
956  QuotaInfo info;
957  int retval = GetInfo(&info);
958  if (retval != 0)
959  return 0;
960  return info.used;
961 }
962 
963 
965  QuotaInfo info;
966  int retval = GetInfo(&info);
967  if (retval != 0)
968  return 0;
969  return info.pinned;
970 }
971 
972 
974  switch (capability) {
975  case kCapIntrospectSize:
976  return cache_mgr_->capabilities_ & cvmfs::CAP_INFO;
978  return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK_RATE;
979  case kCapList:
980  return cache_mgr_->capabilities_ & cvmfs::CAP_LIST;
981  case kCapShrink:
982  return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK;
983  case kCapListeners:
984  return true;
985  default:
986  return false;
987  }
988 }
989 
990 
991 vector<string> ExternalQuotaManager::List() {
992  vector<string> result;
993  vector<cvmfs::MsgListRecord> raw_list;
994  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
995  if (!retval)
996  return result;
997  for (unsigned i = 0; i < raw_list.size(); ++i)
998  result.push_back(raw_list[i].description());
999  return result;
1000 }
1001 
1002 
1004  vector<string> result;
1005  vector<cvmfs::MsgListRecord> raw_list;
1006  bool retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
1007  if (!retval)
1008  return result;
1009  for (unsigned i = 0; i < raw_list.size(); ++i)
1010  result.push_back(raw_list[i].description());
1011  return result;
1012 }
1013 
1014 
1016  vector<string> result;
1017  vector<cvmfs::MsgListRecord> raw_lists[3];
1018  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
1019  if (!retval)
1020  return result;
1021  retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
1022  if (!retval)
1023  return result;
1024  retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
1025  if (!retval)
1026  return result;
1027  for (unsigned i = 0; i < sizeof(raw_lists) / sizeof(raw_lists[0]); ++i) {
1028  for (unsigned j = 0; j < raw_lists[i].size(); ++j) {
1029  if (raw_lists[i][j].pinned())
1030  result.push_back(raw_lists[i][j].description());
1031  }
1032  }
1033  return result;
1034 }
1035 
1036 
1038  vector<string> result;
1039  vector<cvmfs::MsgListRecord> raw_list;
1040  bool retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
1041  if (!retval)
1042  return result;
1043  for (unsigned i = 0; i < raw_list.size(); ++i)
1044  result.push_back(raw_list[i].description());
1045  return result;
1046 }
1047 
1048 
1050  const string &channel_id) {
1051  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1052  MakePipe(back_channel);
1053  LockBackChannels();
1054  assert(back_channels_.find(hash_id) == back_channels_.end());
1055  back_channels_[hash_id] = back_channel[1];
1057 }
1058 
1059 
1061  const string &channel_id) {
1062  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1063  LockBackChannels();
1064  back_channels_.erase(hash_id);
1066  ClosePipe(back_channel);
1067 }
CacheTransport transport_
Definition: cache_extern.h:315
int Flush(bool do_commit, Transaction *transaction)
pthread_mutex_t lock_inflight_rpcs_
Definition: cache_extern.h:328
void AssignFrom(const FdTable< HandleT > &other)
Definition: fd_table.h:45
virtual ~ExternalCacheManager()
virtual uint64_t GetSizePinned()
int ChangeRefcount(const shash::Any &id, int change_by)
void set_attachment_recv(void *data, unsigned size)
Definition: cache_extern.h:222
const manifest::Manifest * manifest() const
Definition: repository.h:125
virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset)
std::string GetDescription() const
Definition: cache.h:110
virtual uint64_t GetCleanupRate(uint64_t period_s)
bool DoListing(cvmfs::EnumObjectType type, std::vector< cvmfs::MsgListRecord > *result)
virtual int64_t Write(const void *buf, uint64_t size, void *txn)
int Ack2Errno(cvmfs::EnumStatus status_code)
Definition: cache_extern.cc:42
uint64_t timestamp
Definition: manifest.h:39
virtual int Readahead(int fd)
virtual bool Cleanup(const uint64_t leave_size)
void BroadcastBackchannels(const std::string &message)
Definition: quota.cc:21
shash::Any catalog_hash
Definition: manifest.h:38
pthread_rwlock_t rwlock_fd_table_
Definition: cache_extern.h:320
cvmfs::MsgShrinkReply * msg_shrink_reply()
Definition: cache_extern.h:260
cvmfs::MsgRefcountReply * msg_refcount_reply()
Definition: cache_extern.h:228
#define PANIC(...)
Definition: exception.h:29
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:356
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
int ConnectTcpEndpoint(const std::string &ipv4_address, int portno)
Definition: posix.cc:456
Capabilities
Definition: quota.h:47
cvmfs::MsgReadReply * msg_read_reply()
Definition: cache_extern.h:241
bool ManagedExec(const std::vector< std::string > &command_line, const std::set< int > &preserve_fildes, const std::map< int, int > &map_fildes, const bool drop_credentials, const bool clear_env, const bool double_fork, pid_t *child_pid)
Definition: posix.cc:1895
virtual int AbortTxn(void *txn)
Definition: cache_extern.cc:76
virtual std::string Describe()
void Wakeup()
Definition: concurrency.cc:60
virtual bool AcquireQuotaManager(QuotaManager *quota_mgr)
Definition: cache_extern.cc:85
virtual std::vector< std::string > ListCatalogs()
std::vector< RpcInFlight > inflight_rpcs_
Definition: cache_extern.h:327
static void * MainRead(void *data)
atomic_int64 next_request_id_
Definition: cache_extern.h:321
virtual int Reset(void *txn)
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)
void FillObjectType(int object_flags, cvmfs::EnumObjectType *wire_type)
virtual std::vector< std::string > ListPinned()
virtual std::vector< std::string > List()
static const char kReadyNotification
void set_attachment_send(void *data, unsigned size)
Definition: cache_extern.h:218
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
cvmfs::MsgListReply * msg_list_reply()
Definition: cache_extern.h:266
ExternalCacheManager(int fd_connection, unsigned max_open_fds)
virtual int Close(int fd)
uint64_t revision() const
Definition: manifest.h:122
ExternalCacheManager * cache_mgr_
Definition: cache_extern.h:390
cvmfs::MsgStoreReply * msg_store_reply()
Definition: cache_extern.h:247
FdTable< ReadOnlyHandle > fd_table_
Definition: cache_extern.h:314
uint64_t revision
Definition: manifest.h:40
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
cvmfs::MsgBreadcrumbReply * msg_breadcrumb_reply()
Definition: cache_extern.h:272
void MergeFrom(const Frame &other)
uint64_t publish_timestamp() const
Definition: manifest.h:129
void Transaction()
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
std::string repository_name() const
Definition: manifest.h:123
const char kSuffixCatalog
Definition: hash.h:54
virtual void * DoSaveState()
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)
virtual int Dup(int fd)
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
uint32_t att_size() const
virtual bool StoreBreadcrumb(const manifest::Manifest &manifest)
void LockBackChannels()
Definition: quota.h:97
CacheTransport::Frame * frame_send()
Definition: cache_extern.h:280
static const shash::Any kInvalidHandle
Definition: cache_extern.h:111
cvmfs::MsgObjectInfoReply * msg_object_info_reply()
Definition: cache_extern.h:234
uint64_t req_id() const
Definition: cache_extern.h:282
shash::Any catalog_hash() const
Definition: manifest.h:125
string StringifyInt(const int64_t value)
Definition: string.cc:77
int GetInfo(QuotaInfo *quota_info)
google::protobuf::MessageLite * GetMsgTyped()
uint64_t part_nr() const
Definition: cache_extern.h:283
virtual void CtrlTxn(const Label &label, const int flags, void *txn)
std::map< shash::Md5, int > back_channels_
Definition: quota.h:95
ExternalQuotaManager(ExternalCacheManager *cache_mgr)
Definition: cache_extern.h:384
static bool SpawnPlugin(const std::vector< std::string > &cmd_line)
cvmfs::MsgInfoReply * msg_info_reply()
Definition: cache_extern.h:254
int ConnectSocket(const std::string &path)
Definition: posix.cc:422
CacheTransport::Frame * frame_recv()
Definition: cache_extern.h:281
void Wait()
Definition: concurrency.cc:50
shash::Any GetHandle(int fd)
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
virtual int64_t GetSize(int fd)
static ExternalCacheManager * Create(int fd_connection, unsigned max_open_fds, const std::string &ident)
QuotaManager * quota_mgr_
Definition: cache.h:233
Definition: mutex.h:42
virtual void Spawn()
pthread_mutex_t lock_send_fd_
Definition: cache_extern.h:326
static ExternalQuotaManager * Create(ExternalCacheManager *cache_mgr)
virtual bool DoFreeState(void *data)
void SafeSleepMs(const unsigned ms)
Definition: posix.cc:2024
virtual manifest::Breadcrumb LoadBreadcrumb(const std::string &fqrn)
virtual bool HasCapability(Capabilities capability)
void set_attachment(void *attachment, uint32_t att_size)
Suffix suffix
Definition: hash.h:123
void CallRemotely(RpcJob *rpc_job)
Definition: cache_extern.cc:93
virtual std::vector< std::string > ListVolatile()
virtual int OpenFromTxn(void *txn)
static int ConnectLocator(const std::string &locator, bool print_error)
static PluginHandle * CreatePlugin(const std::string &locator, const std::vector< std::string > &cmd_line)
virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn)
virtual uint64_t GetCapacity()
void Reset(uint32_t original_att_size)
static void size_t size
Definition: smalloc.h:54
virtual int CommitTxn(void *txn)
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
virtual int DoRestoreState(void *data)
void UnlockBackChannels()
Definition: quota.h:101
int fd_connection() const
static const char * kEnvReadyNotifyFd
virtual uint64_t GetSize()
int DoOpen(const shash::Any &id)
static const uint64_t kSizeUnknown
Definition: cache.h:74
virtual int Open(const LabeledObject &object)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545