CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
channel.cc
Go to the documentation of this file.
1 
4 #include "cvmfs_config.h"
5 #include "channel.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <sys/un.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstring>
16 #include <vector>
17 
18 #include "util/concurrency.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/smalloc.h"
25 #include "util/string.h"
26 
27 using namespace std; // NOLINT
28 
29 
31 
33  delete instance_;
34  instance_ = NULL;
35 }
36 
37 
39  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
40  smalloc(sizeof(pthread_mutex_t)));
41  int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
42  assert(retval == 0);
43 }
44 
45 
47  pthread_mutex_destroy(lock_tls_blocks_);
48  free(lock_tls_blocks_);
49 
50  for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
51  delete tls_blocks_[i];
52  }
53 
54  int retval = pthread_key_delete(thread_local_storage_);
55  assert(retval == 0);
56 }
57 
58 
60  if (instance_ == NULL) {
61  instance_ = new SessionCtx();
62  int retval =
63  pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
64  assert(retval == 0);
65  }
66 
67  return instance_;
68 }
69 
70 
71 void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
72  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
73  pthread_getspecific(thread_local_storage_));
74  if ((tls == NULL) || !tls->is_set) {
75  *id = 0;
76  *reponame = NULL;
77  *client_instance = NULL;
78  } else {
79  *id = tls->id;
80  *reponame = tls->reponame;
81  *client_instance = tls->client_instance;
82  }
83 }
84 
85 
87  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
88  pthread_getspecific(thread_local_storage_));
89  if (tls == NULL)
90  return false;
91 
92  return tls->is_set;
93 }
94 
95 
96 void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
97  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
98  pthread_getspecific(thread_local_storage_));
99 
100  if (tls == NULL) {
101  tls = new ThreadLocalStorage(id, reponame, client_instance);
102  int retval = pthread_setspecific(thread_local_storage_, tls);
103  assert(retval == 0);
104  MutexLockGuard lock_guard(lock_tls_blocks_);
105  tls_blocks_.push_back(tls);
106  } else {
107  tls->id = id;
108  tls->reponame = reponame;
109  tls->client_instance = client_instance;
110  tls->is_set = true;
111  }
112 }
113 
114 
115 void SessionCtx::TlsDestructor(void *data) {
116  ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
117  delete tls;
118 
119  assert(instance_);
120  MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
121  for (vector<ThreadLocalStorage *>::iterator i =
122  instance_->tls_blocks_.begin(), iEnd = instance_->tls_blocks_.end();
123  i != iEnd; ++i)
124  {
125  if ((*i) == tls) {
126  instance_->tls_blocks_.erase(i);
127  break;
128  }
129  }
130 }
131 
132 
134  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
135  pthread_getspecific(thread_local_storage_));
136  if (tls != NULL) {
137  tls->is_set = false;
138  tls->id = 0;
139  tls->reponame = NULL;
140  tls->client_instance = NULL;
141  }
142 }
143 
144 
145 //------------------------------------------------------------------------------
146 
147 
148 CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
149  : id(id)
150  , name(name)
151 {
152  vector<string> tokens = SplitString(name, ':');
153  reponame = strdup(tokens[0].c_str());
154  if (tokens.size() > 1)
155  client_instance = strdup(tokens[1].c_str());
156  else
157  client_instance = NULL;
158 }
159 
160 const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
161 
162 
164  char detach = kSignalDetach;
165  WritePipe(pipe_ctrl_[1], &detach, 1);
166 }
167 
168 
170  : is_local_(false)
171  , capabilities_(capabilities)
172  , fd_socket_(-1)
173  , fd_socket_lock_(-1)
174  , running_(0)
175  , num_workers_(0)
178 {
179  atomic_init64(&next_session_id_);
180  atomic_init64(&next_txn_id_);
181  atomic_init64(&next_lst_id_);
182  // Don't use listing id zero
183  atomic_inc64(&next_lst_id_);
185  memset(&thread_io_, 0, sizeof(thread_io_));
187 }
188 
189 
191  Terminate();
193  if (fd_socket_ >= 0)
194  close(fd_socket_);
195  if (fd_socket_lock_ >= 0)
197 }
198 
199 
201  cvmfs::MsgBreadcrumbStoreReq *msg_req,
202  CacheTransport *transport)
203 {
204  SessionCtxGuard session_guard(msg_req->session_id(), this);
205  cvmfs::MsgBreadcrumbReply msg_reply;
206  CacheTransport::Frame frame_send(&msg_reply);
207 
208  msg_reply.set_req_id(msg_req->req_id());
209  manifest::Breadcrumb breadcrumb;
210  bool retval = transport->ParseMsgHash(msg_req->breadcrumb().hash(),
211  &breadcrumb.catalog_hash);
212  if (!retval) {
213  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
214  "malformed hash received from client");
215  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
216  } else {
217  breadcrumb.timestamp = msg_req->breadcrumb().timestamp();
218  cvmfs::EnumStatus status =
219  StoreBreadcrumb(msg_req->breadcrumb().fqrn(), breadcrumb);
220  msg_reply.set_status(status);
221  }
222  transport->SendFrame(&frame_send);
223 }
224 
225 
227  cvmfs::MsgBreadcrumbLoadReq *msg_req,
228  CacheTransport *transport)
229 {
230  SessionCtxGuard session_guard(msg_req->session_id(), this);
231  cvmfs::MsgBreadcrumbReply msg_reply;
232  CacheTransport::Frame frame_send(&msg_reply);
233 
234  msg_reply.set_req_id(msg_req->req_id());
235  manifest::Breadcrumb breadcrumb;
236  cvmfs::EnumStatus status =
237  LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
238  msg_reply.set_status(status);
239  if (status == cvmfs::STATUS_OK) {
240  assert(breadcrumb.IsValid());
241  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
242  transport->FillMsgHash(breadcrumb.catalog_hash, msg_hash);
243  cvmfs::MsgBreadcrumb *msg_breadcrumb = new cvmfs::MsgBreadcrumb();
244  msg_breadcrumb->set_fqrn(msg_req->fqrn());
245  msg_breadcrumb->set_allocated_hash(msg_hash);
246  msg_breadcrumb->set_timestamp(breadcrumb.timestamp);
247  msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
248  }
249  transport->SendFrame(&frame_send);
250 }
251 
252 
254  cvmfs::MsgHandshake *msg_req,
255  CacheTransport *transport)
256 {
257  uint64_t session_id = NextSessionId();
258  if (msg_req->has_name()) {
259  sessions_[session_id] = SessionInfo(session_id, msg_req->name());
260  } else {
261  sessions_[session_id] = SessionInfo(session_id,
262  "anonymous client (" + StringifyInt(session_id) + ")");
263  }
264  cvmfs::MsgHandshakeAck msg_ack;
265  CacheTransport::Frame frame_send(&msg_ack);
266 
267  msg_ack.set_status(cvmfs::STATUS_OK);
268  msg_ack.set_name(name_);
269  msg_ack.set_protocol_version(kPbProtocolVersion);
270  msg_ack.set_max_object_size(max_object_size_);
271  msg_ack.set_session_id(session_id);
272  msg_ack.set_capabilities(capabilities_);
273  if (is_local_)
274  msg_ack.set_pid(getpid());
275  transport->SendFrame(&frame_send);
276 }
277 
278 
280  cvmfs::MsgInfoReq *msg_req,
281  CacheTransport *transport)
282 {
283  SessionCtxGuard session_guard(msg_req->session_id(), this);
284  cvmfs::MsgInfoReply msg_reply;
285  CacheTransport::Frame frame_send(&msg_reply);
286 
287  msg_reply.set_req_id(msg_req->req_id());
288  Info info;
289  cvmfs::EnumStatus status = GetInfo(&info);
290  if (status != cvmfs::STATUS_OK) {
291  LogSessionError(msg_req->session_id(), status,
292  "failed to query cache status");
293  }
294  msg_reply.set_size_bytes(info.size_bytes);
295  msg_reply.set_used_bytes(info.used_bytes);
296  msg_reply.set_pinned_bytes(info.pinned_bytes);
297  msg_reply.set_no_shrink(info.no_shrink);
298  msg_reply.set_status(status);
299  transport->SendFrame(&frame_send);
300 }
301 
302 
303 void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
304  if (!msg_req->has_conncnt_change_by())
305  return;
306  int32_t conncnt_change_by = msg_req->conncnt_change_by();
307  if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
308  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
309  "invalid request to drop connection counter below zero");
310  return;
311  }
312  if (conncnt_change_by > 0) {
313  LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
314  } else {
315  LogSessionInfo(msg_req->session_id(), "release session lock");
316  }
317  num_inlimbo_clients_ += conncnt_change_by;
318 }
319 
320 
322  cvmfs::MsgListReq *msg_req,
323  CacheTransport *transport)
324 {
325  SessionCtxGuard session_guard(msg_req->session_id(), this);
326  cvmfs::MsgListReply msg_reply;
327  CacheTransport::Frame frame_send(&msg_reply);
328 
329  msg_reply.set_req_id(msg_req->req_id());
330  int64_t listing_id = msg_req->listing_id();
331  msg_reply.set_listing_id(listing_id);
332  msg_reply.set_is_last_part(true);
333 
334  cvmfs::EnumStatus status;
335  if (msg_req->listing_id() == 0) {
336  listing_id = NextLstId();
337  status = ListingBegin(listing_id, msg_req->object_type());
338  if (status != cvmfs::STATUS_OK) {
339  LogSessionError(msg_req->session_id(), status,
340  "failed to start enumeration of objects");
341  msg_reply.set_status(status);
342  transport->SendFrame(&frame_send);
343  return;
344  }
345  msg_reply.set_listing_id(listing_id);
346  }
347  assert(listing_id != 0);
348 
349  ObjectInfo item;
350  unsigned total_size = 0;
351  while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
352  cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
353  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
354  transport->FillMsgHash(item.id, msg_hash);
355  msg_list_record->set_allocated_hash(msg_hash);
356  msg_list_record->set_pinned(item.pinned);
357  msg_list_record->set_description(item.description);
358  // Approximation of the message size
359  total_size += sizeof(item) + item.description.length();
360  if (total_size > kListingSize)
361  break;
362  }
363  if (status == cvmfs::STATUS_OUTOFBOUNDS) {
364  ListingEnd(listing_id);
365  status = cvmfs::STATUS_OK;
366  } else {
367  msg_reply.set_is_last_part(false);
368  }
369  if (status != cvmfs::STATUS_OK) {
370  LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
371  }
372  msg_reply.set_status(status);
373  transport->SendFrame(&frame_send);
374 }
375 
376 
378  cvmfs::MsgObjectInfoReq *msg_req,
379  CacheTransport *transport)
380 {
381  SessionCtxGuard session_guard(msg_req->session_id(), this);
382  cvmfs::MsgObjectInfoReply msg_reply;
383  CacheTransport::Frame frame_send(&msg_reply);
384 
385  msg_reply.set_req_id(msg_req->req_id());
386  shash::Any object_id;
387  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
388  if (!retval) {
389  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
390  "malformed hash received from client");
391  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
392  } else {
393  ObjectInfo info;
394  cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
395  msg_reply.set_status(status);
396  if (status == cvmfs::STATUS_OK) {
397  msg_reply.set_object_type(info.object_type);
398  msg_reply.set_size(info.size);
399  } else if (status != cvmfs::STATUS_NOENTRY) {
400  LogSessionError(msg_req->session_id(), status,
401  "failed retrieving object details");
402  }
403  }
404  transport->SendFrame(&frame_send);
405 }
406 
407 
409  cvmfs::MsgReadReq *msg_req,
410  CacheTransport *transport)
411 {
412  SessionCtxGuard session_guard(msg_req->session_id(), this);
413  cvmfs::MsgReadReply msg_reply;
414  CacheTransport::Frame frame_send(&msg_reply);
415 
416  msg_reply.set_req_id(msg_req->req_id());
417  shash::Any object_id;
418  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
419  if (!retval || (msg_req->size() > max_object_size_)) {
420  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
421  "malformed hash received from client");
422  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
423  transport->SendFrame(&frame_send);
424  return;
425  }
426  unsigned size = msg_req->size();
427 #ifdef __APPLE__
428  unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
429 #else
430  unsigned char buffer[size];
431 #endif
432  cvmfs::EnumStatus status = Pread(object_id, msg_req->offset(), &size, buffer);
433  msg_reply.set_status(status);
434  if (status == cvmfs::STATUS_OK) {
435  frame_send.set_attachment(buffer, size);
436  } else {
437  LogSessionError(msg_req->session_id(), status,
438  "failed to read from object");
439  }
440  transport->SendFrame(&frame_send);
441 #ifdef __APPLE__
442  free(buffer);
443 #endif
444 }
445 
446 
448  cvmfs::MsgRefcountReq *msg_req,
449  CacheTransport *transport)
450 {
451  SessionCtxGuard session_guard(msg_req->session_id(), this);
452  cvmfs::MsgRefcountReply msg_reply;
453  CacheTransport::Frame frame_send(&msg_reply);
454 
455  msg_reply.set_req_id(msg_req->req_id());
456  shash::Any object_id;
457  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
458  if (!retval) {
459  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
460  "malformed hash received from client");
461  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
462  } else {
463  cvmfs::EnumStatus status = ChangeRefcount(object_id, msg_req->change_by());
464  msg_reply.set_status(status);
465  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
466  LogSessionError(msg_req->session_id(), status,
467  "failed to open/close object " + object_id.ToString());
468  }
469  }
470  transport->SendFrame(&frame_send);
471 }
472 
473 
474 bool CachePlugin::HandleRequest(int fd_con) {
476  char buffer[max_object_size_];
477  CacheTransport::Frame frame_recv;
478  frame_recv.set_attachment(buffer, max_object_size_);
479  bool retval = transport.RecvFrame(&frame_recv);
480  if (!retval) {
482  "failed to receive request from connection (%d)", errno);
483  return false;
484  }
485 
486  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
487 
488  if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
489  cvmfs::MsgHandshake *msg_req =
490  reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed);
491  HandleHandshake(msg_req, &transport);
492  } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
493  cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
494  map<uint64_t, SessionInfo>::const_iterator iter =
495  sessions_.find(msg_req->session_id());
496  if (iter != sessions_.end()) {
497  free(iter->second.reponame);
498  free(iter->second.client_instance);
499  }
500  sessions_.erase(msg_req->session_id());
501  return false;
502  } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
503  HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
504  } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
505  cvmfs::MsgRefcountReq *msg_req =
506  reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed);
507  HandleRefcount(msg_req, &transport);
508  } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
509  cvmfs::MsgObjectInfoReq *msg_req =
510  reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
511  HandleObjectInfo(msg_req, &transport);
512  } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
513  cvmfs::MsgReadReq *msg_req =
514  reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed);
515  HandleRead(msg_req, &transport);
516  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
517  cvmfs::MsgStoreReq *msg_req =
518  reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed);
519  HandleStore(msg_req, &frame_recv, &transport);
520  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
521  cvmfs::MsgStoreAbortReq *msg_req =
522  reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
523  HandleStoreAbort(msg_req, &transport);
524  } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
525  cvmfs::MsgInfoReq *msg_req =
526  reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed);
527  HandleInfo(msg_req, &transport);
528  } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
529  cvmfs::MsgShrinkReq *msg_req =
530  reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed);
531  HandleShrink(msg_req, &transport);
532  } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
533  cvmfs::MsgListReq *msg_req =
534  reinterpret_cast<cvmfs::MsgListReq *>(msg_typed);
535  HandleList(msg_req, &transport);
536  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbStoreReq") {
537  cvmfs::MsgBreadcrumbStoreReq *msg_req =
538  reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed);
539  HandleBreadcrumbStore(msg_req, &transport);
540  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbLoadReq") {
541  cvmfs::MsgBreadcrumbLoadReq *msg_req =
542  reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed);
543  HandleBreadcrumbLoad(msg_req, &transport);
544  } else {
546  "unexpected message from client: %s",
547  msg_typed->GetTypeName().c_str());
548  return false;
549  }
550 
551  return true;
552 }
553 
554 
556  cvmfs::MsgShrinkReq *msg_req,
557  CacheTransport *transport)
558 {
559  SessionCtxGuard session_guard(msg_req->session_id(), this);
560  cvmfs::MsgShrinkReply msg_reply;
561  CacheTransport::Frame frame_send(&msg_reply);
562 
563  msg_reply.set_req_id(msg_req->req_id());
564  uint64_t used_bytes = 0;
565  cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
566  msg_reply.set_used_bytes(used_bytes);
567  msg_reply.set_status(status);
568  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
569  LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
570  }
571  transport->SendFrame(&frame_send);
572 }
573 
574 
576  cvmfs::MsgStoreAbortReq *msg_req,
577  CacheTransport *transport)
578 {
579  SessionCtxGuard session_guard(msg_req->session_id(), this);
580  cvmfs::MsgStoreReply msg_reply;
581  CacheTransport::Frame frame_send(&msg_reply);
582  msg_reply.set_req_id(msg_req->req_id());
583  msg_reply.set_part_nr(0);
584  uint64_t txn_id;
585  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
586  bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
587  if (!retval) {
588  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
589  "malformed transaction id received from client");
590  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
591  } else {
592  cvmfs::EnumStatus status = AbortTxn(txn_id);
593  msg_reply.set_status(status);
594  if (status != cvmfs::STATUS_OK) {
595  LogSessionError(msg_req->session_id(), status,
596  "failed to abort transaction");
597  }
598  txn_ids_.Erase(uniq_req);
599  }
600  transport->SendFrame(&frame_send);
601 }
602 
603 
605  cvmfs::MsgStoreReq *msg_req,
606  CacheTransport::Frame *frame,
607  CacheTransport *transport)
608 {
609  SessionCtxGuard session_guard(msg_req->session_id(), this);
610  cvmfs::MsgStoreReply msg_reply;
611  CacheTransport::Frame frame_send(&msg_reply);
612  msg_reply.set_req_id(msg_req->req_id());
613  msg_reply.set_part_nr(msg_req->part_nr());
614  shash::Any object_id;
615  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
616  if ( !retval ||
617  (frame->att_size() > max_object_size_) ||
618  ((frame->att_size() < max_object_size_) && !msg_req->last_part()) )
619  {
620  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
621  "malformed hash or bad object size received from client");
622  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
623  transport->SendFrame(&frame_send);
624  return;
625  }
626 
627  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
628  uint64_t txn_id;
629  cvmfs::EnumStatus status = cvmfs::STATUS_OK;
630  if (msg_req->part_nr() == 1) {
631  if (txn_ids_.Contains(uniq_req)) {
632  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
633  "invalid attempt to restart running transaction");
634  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
635  transport->SendFrame(&frame_send);
636  return;
637  }
638  txn_id = NextTxnId();
639  ObjectInfo info;
640  info.id = object_id;
641  if (msg_req->has_expected_size()) {info.size = msg_req->expected_size();}
642  if (msg_req->has_object_type()) {info.object_type = msg_req->object_type();}
643  if (msg_req->has_description()) {info.description = msg_req->description();}
644  status = StartTxn(object_id, txn_id, info);
645  if (status != cvmfs::STATUS_OK) {
646  LogSessionError(msg_req->session_id(), status,
647  "failed to start transaction");
648  msg_reply.set_status(status);
649  transport->SendFrame(&frame_send);
650  return;
651  }
652  txn_ids_.Insert(uniq_req, txn_id);
653  } else {
654  retval = txn_ids_.Lookup(uniq_req, &txn_id);
655  if (!retval) {
656  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
657  "invalid transaction received from client");
658  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
659  transport->SendFrame(&frame_send);
660  return;
661  }
662  }
663 
664  // TODO(jblomer): check part number and send objects up in order
665  if (frame->att_size() > 0) {
666  status = WriteTxn(txn_id,
667  reinterpret_cast<unsigned char *>(frame->attachment()),
668  frame->att_size());
669  if (status != cvmfs::STATUS_OK) {
670  LogSessionError(msg_req->session_id(), status, "failure writing object");
671  msg_reply.set_status(status);
672  transport->SendFrame(&frame_send);
673  return;
674  }
675  }
676 
677  if (msg_req->last_part()) {
678  status = CommitTxn(txn_id);
679  if (status != cvmfs::STATUS_OK) {
680  LogSessionError(msg_req->session_id(), status,
681  "failure committing object");
682  }
683  txn_ids_.Erase(uniq_req);
684  }
685  msg_reply.set_status(status);
686  transport->SendFrame(&frame_send);
687 }
688 
689 
691  return atomic_read32(&running_) != 0;
692 }
693 
694 
695 bool CachePlugin::Listen(const string &locator) {
696  vector<string> tokens = SplitString(locator, '=');
697  if (tokens[0] == "unix") {
698  string lock_path = tokens[1] + ".lock";
699  fd_socket_lock_ = TryLockFile(lock_path);
700  if (fd_socket_lock_ == -1) {
702  "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
704  return false;
705  } else if (fd_socket_lock_ == -2) {
706  // Another plugin process probably started in the meantime
708  if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
710  "failed to lock on %s, file is busy", lock_path.c_str());
711  }
712  return false;
713  }
714  assert(fd_socket_lock_ >= 0);
715  fd_socket_ = MakeSocket(tokens[1], 0600);
716  is_local_ = true;
717  } else if (tokens[0] == "tcp") {
718  vector<string> tcp_address = SplitString(tokens[1], ':');
719  if (tcp_address.size() != 2) {
721  "invalid locator: %s", locator.c_str());
723  return false;
724  }
725  fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
726  } else {
728  "unknown endpoint in locator: %s", locator.c_str());
730  return false;
731  }
732 
733  if (fd_socket_ < 0) {
734  if (errno == EADDRINUSE) {
735  // Another plugin process probably started in the meantime
737  } else {
739  "failed to create endpoint %s (%d)", locator.c_str(), errno);
741  }
742  is_local_ = false;
743  return false;
744  }
745  int retval = listen(fd_socket_, 32);
746  assert(retval == 0);
747 
748  return true;
749 }
750 
751 
752 void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
753  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
754  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
755  if (iter != sessions_.end()) {
756  session_str = iter->second.name;
757  }
759  "session '%s': %s", session_str.c_str(), msg.c_str());
760 }
761 
762 
764  uint64_t session_id,
765  cvmfs::EnumStatus status,
766  const std::string &msg)
767 {
768  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
769  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
770  if (iter != sessions_.end()) {
771  session_str = iter->second.name;
772  }
774  "session '%s': %s (%d - %s)",
775  session_str.c_str(), msg.c_str(), status,
776  CacheTransportCode2Ascii(status));
777 }
778 
779 
781  CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
782 
783  platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
784 
785  vector<struct pollfd> watch_fds;
786  // Elements 0, 1: control pipe, socket fd
787  struct pollfd watch_ctrl;
788  watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
789  watch_ctrl.events = POLLIN | POLLPRI;
790  watch_fds.push_back(watch_ctrl);
791  struct pollfd watch_socket;
792  watch_socket.fd = cache_plugin->fd_socket_;
793  watch_socket.events = POLLIN | POLLPRI;
794  watch_fds.push_back(watch_socket);
795 
796  bool terminated = false;
797  while (!terminated) {
798  for (unsigned i = 0; i < watch_fds.size(); ++i)
799  watch_fds[i].revents = 0;
800  int retval = poll(&watch_fds[0], watch_fds.size(), -1);
801  if (retval < 0) {
802  if (errno == EINTR)
803  continue;
804  PANIC(kLogSyslogErr | kLogDebug, "cache plugin connection failure (%d)",
805  errno);
806  }
807 
808  // Termination or detach
809  if (watch_fds[0].revents) {
810  char signal;
811  ReadPipe(watch_fds[0].fd, &signal, 1);
812  if (signal == kSignalDetach) {
813  cache_plugin->SendDetachRequests();
814  continue;
815  }
816 
817  // termination
818  if (watch_fds.size() > 2) {
820  "terminating external cache manager with pending connections");
821  }
822  break;
823  }
824 
825  // New connection
826  if (watch_fds[1].revents) {
827  struct sockaddr_un remote;
828  socklen_t socket_size = sizeof(remote);
829  int fd_con =
830  accept(watch_fds[1].fd, (struct sockaddr *)&remote, &socket_size);
831  if (fd_con < 0) {
833  "failed to establish connection (%d)", errno);
834  continue;
835  }
836  struct pollfd watch_con;
837  watch_con.fd = fd_con;
838  watch_con.events = POLLIN | POLLPRI;
839  watch_fds.push_back(watch_con);
840  cache_plugin->connections_.insert(fd_con);
841  }
842 
843  // New request
844  for (unsigned i = 2; i < watch_fds.size(); ) {
845  if (watch_fds[i].revents) {
846  bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
847  if (!proceed) {
848  close(watch_fds[i].fd);
849  cache_plugin->connections_.erase(watch_fds[i].fd);
850  watch_fds.erase(watch_fds.begin() + i);
851  if ( (getenv(CacheTransport::kEnvReadyNotifyFd) != NULL) &&
852  (cache_plugin->connections_.empty()) &&
853  (cache_plugin->num_inlimbo_clients_ == 0) )
854  {
856  "stopping cache plugin, no more active clients");
857  terminated = true;
858  break;
859  }
860  } else {
861  i++;
862  }
863  } else {
864  i++;
865  }
866  }
867  }
868 
869  // 0, 1 being closed by destructor
870  for (unsigned i = 2; i < watch_fds.size(); ++i)
871  close(watch_fds[i].fd);
872  cache_plugin->txn_ids_.Clear();
873 
874  signal(SIGPIPE, save_sigpipe);
875  return NULL;
876 }
877 
878 
882 void CachePlugin::NotifySupervisor(char signal) {
883  char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
884  if (pipe_ready == NULL)
885  return;
886  int fd_pipe_ready = String2Int64(pipe_ready);
887  WritePipe(fd_pipe_ready, &signal, 1);
888 }
889 
890 
891 void CachePlugin::ProcessRequests(unsigned num_workers) {
892  num_workers_ = num_workers;
893  int retval = pthread_create(&thread_io_, NULL, MainProcessRequests, this);
894  assert(retval == 0);
896  atomic_cas32(&running_, 0, 1);
897 }
898 
899 
901  set<int>::const_iterator iter = connections_.begin();
902  set<int>::const_iterator iter_end = connections_.end();
903  for (; iter != iter_end; ++iter) {
904  CacheTransport transport(*iter,
907  cvmfs::MsgDetach msg_detach;
908  CacheTransport::Frame frame_send(&msg_detach);
909  transport.SendFrame(&frame_send);
910  }
911 }
912 
913 
915  if (IsRunning()) {
916  char terminate = kSignalTerminate;
917  WritePipe(pipe_ctrl_[1], &terminate, 1);
918  pthread_join(thread_io_, NULL);
919  atomic_cas32(&running_, 1, 0);
920  }
921 }
922 
923 
925  if (!IsRunning())
926  return;
927  pthread_join(thread_io_, NULL);
928 }
uint64_t pinned_bytes
Definition: channel.h:90
void Get(uint64_t *id, char **reponame, char **client_instance)
Definition: channel.cc:71
#define LogCvmfs(source, mask,...)
Definition: logging.h:25
int MakeSocket(const std::string &path, const int mode)
Definition: posix.cc:331
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
Definition: channel.cc:200
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
Definition: channel.h:261
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:377
static void CleanupInstance()
Definition: channel.cc:32
uint64_t timestamp
Definition: manifest.h:33
void Set(uint64_t id, char *reponame, char *client_instance)
Definition: channel.cc:96
static const uint64_t kSizeUnknown
Definition: channel.h:71
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
Definition: posix.cc:389
shash::Any catalog_hash
Definition: manifest.h:32
#define PANIC(...)
Definition: exception.h:29
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
bool is_set
either not yet set or deliberately unset
Definition: channel.h:44
bool IsRunning()
Definition: channel.cc:690
uint64_t NextLstId()
Definition: channel.h:210
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
Definition: channel.cc:303
std::set< int > connections_
Definition: channel.h:265
virtual ~CachePlugin()
Definition: channel.cc:190
uint64_t capabilities_
Definition: channel.h:249
std::map< uint64_t, SessionInfo > sessions_
Definition: channel.h:266
uint64_t capabilities() const
Definition: channel.h:103
bool IsSet()
Definition: channel.cc:86
void ProcessRequests(unsigned num_workers)
Definition: channel.cc:891
cvmfs::EnumObjectType object_type
Definition: channel.h:81
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
void Unset()
Definition: channel.cc:133
static SessionCtx * instance_
Definition: channel.h:57
static const unsigned kDefaultMaxObjectSize
Definition: channel.h:140
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
Definition: channel.h:213
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
Definition: channel.cc:555
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
bool is_local_
Definition: channel.h:248
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
Definition: channel.cc:169
uint64_t num_inlimbo_clients_
Definition: channel.h:259
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
Definition: channel.cc:447
bool Listen(const std::string &locator)
Definition: channel.cc:695
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
Definition: string.cc:222
void Terminate()
Definition: channel.cc:914
uint64_t NextTxnId()
Definition: channel.h:207
void AskToDetach()
Definition: channel.cc:163
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
static const unsigned kListingSize
Definition: channel.h:141
#define platform_sighandler_t
static void TlsDestructor(void *data)
Definition: channel.cc:115
void NotifySupervisor(char signal)
Definition: channel.cc:882
void * attachment() const
unsigned max_object_size_
Definition: channel.h:254
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
Definition: posix.cc:913
uint32_t att_size() const
int64_t no_shrink
Definition: channel.h:91
static const char kSignalDetach
Definition: channel.h:143
std::string description
Definition: channel.h:83
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
pthread_t thread_io_
Definition: channel.h:267
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
Definition: channel.cc:763
int fd_socket_lock_
Definition: channel.h:251
string StringifyInt(const int64_t value)
Definition: string.cc:78
static const unsigned kPbProtocolVersion
Definition: channel.h:70
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
Definition: channel.cc:575
SessionCtx()
Definition: channel.cc:38
int fd_socket_
Definition: channel.h:250
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
Definition: channel.cc:321
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:408
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
Definition: channel.cc:253
void SendDetachRequests()
Definition: channel.cc:900
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
Definition: channel.h:264
atomic_int64 next_lst_id_
Definition: channel.h:263
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
uint64_t used_bytes
Definition: channel.h:89
atomic_int32 running_
Definition: channel.h:252
Definition: mutex.h:42
unsigned num_workers_
Definition: channel.h:253
~SessionCtx()
Definition: channel.cc:46
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
Definition: channel.cc:604
bool IsValid() const
Definition: manifest.h:30
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
Definition: channel.cc:780
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
void WaitFor()
Definition: channel.cc:924
static const char kSignalTerminate
Definition: channel.h:142
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:551
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
std::string name_
Definition: channel.h:260
atomic_int64 next_txn_id_
Definition: channel.h:262
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:279
static const char * kEnvReadyNotifyFd
int pipe_ctrl_[2]
Definition: channel.h:268
void LogSessionInfo(uint64_t session_id, const std::string &msg)
Definition: channel.cc:752
uint64_t NextSessionId()
Definition: channel.h:204
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:226
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
Definition: posix.cc:995
bool HandleRequest(int fd_con)
Definition: channel.cc:474
uint64_t size_bytes
Definition: channel.h:88
static SessionCtx * GetInstance()
Definition: channel.cc:59