CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
channel.cc
Go to the documentation of this file.
1 
4 #include "cvmfs_config.h"
5 #include "channel.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <sys/un.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstring>
16 #include <vector>
17 
18 #include "util/concurrency.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/smalloc.h"
25 #include "util/string.h"
26 
27 using namespace std; // NOLINT
28 
29 
31 
33  delete instance_;
34  instance_ = NULL;
35 }
36 
37 
39  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
40  smalloc(sizeof(pthread_mutex_t)));
41  int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
42  assert(retval == 0);
43 }
44 
45 
47  pthread_mutex_destroy(lock_tls_blocks_);
48  free(lock_tls_blocks_);
49 
50  for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
51  delete tls_blocks_[i];
52  }
53 
54  int retval = pthread_key_delete(thread_local_storage_);
55  assert(retval == 0);
56 }
57 
58 
60  if (instance_ == NULL) {
61  instance_ = new SessionCtx();
62  int retval =
63  pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
64  assert(retval == 0);
65  }
66 
67  return instance_;
68 }
69 
70 
71 void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
72  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
73  pthread_getspecific(thread_local_storage_));
74  if ((tls == NULL) || !tls->is_set) {
75  *id = 0;
76  *reponame = NULL;
77  *client_instance = NULL;
78  } else {
79  *id = tls->id;
80  *reponame = tls->reponame;
81  *client_instance = tls->client_instance;
82  }
83 }
84 
85 
87  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
88  pthread_getspecific(thread_local_storage_));
89  if (tls == NULL)
90  return false;
91 
92  return tls->is_set;
93 }
94 
95 
96 void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
97  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
98  pthread_getspecific(thread_local_storage_));
99 
100  if (tls == NULL) {
101  tls = new ThreadLocalStorage(id, reponame, client_instance);
102  int retval = pthread_setspecific(thread_local_storage_, tls);
103  assert(retval == 0);
104  MutexLockGuard lock_guard(lock_tls_blocks_);
105  tls_blocks_.push_back(tls);
106  } else {
107  tls->id = id;
108  tls->reponame = reponame;
109  tls->client_instance = client_instance;
110  tls->is_set = true;
111  }
112 }
113 
114 
115 void SessionCtx::TlsDestructor(void *data) {
116  ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
117  delete tls;
118 
119  assert(instance_);
120  MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
121  for (vector<ThreadLocalStorage *>::iterator i =
122  instance_->tls_blocks_.begin(), iEnd = instance_->tls_blocks_.end();
123  i != iEnd; ++i)
124  {
125  if ((*i) == tls) {
126  instance_->tls_blocks_.erase(i);
127  break;
128  }
129  }
130 }
131 
132 
134  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
135  pthread_getspecific(thread_local_storage_));
136  if (tls != NULL) {
137  tls->is_set = false;
138  tls->id = 0;
139  tls->reponame = NULL;
140  tls->client_instance = NULL;
141  }
142 }
143 
144 
145 //------------------------------------------------------------------------------
146 
147 
148 CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
149  : id(id)
150  , name(name)
151 {
152  vector<string> tokens = SplitString(name, ':');
153  reponame = strdup(tokens[0].c_str());
154  if (tokens.size() > 1)
155  client_instance = strdup(tokens[1].c_str());
156  else
157  client_instance = NULL;
158 }
159 
160 const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
161 
162 
164  char detach = kSignalDetach;
165  WritePipe(pipe_ctrl_[1], &detach, 1);
166 }
167 
168 
170  : is_local_(false)
171  , capabilities_(capabilities)
172  , fd_socket_(-1)
173  , fd_socket_lock_(-1)
174  , running_(0)
175  , num_workers_(0)
178 {
179  atomic_init64(&next_session_id_);
180  atomic_init64(&next_txn_id_);
181  atomic_init64(&next_lst_id_);
182  // Don't use listing id zero
183  atomic_inc64(&next_lst_id_);
185  memset(&thread_io_, 0, sizeof(thread_io_));
187 }
188 
189 
191  Terminate();
193  if (fd_socket_ >= 0)
194  close(fd_socket_);
195  if (fd_socket_lock_ >= 0)
197 }
198 
199 
201  cvmfs::MsgBreadcrumbStoreReq *msg_req,
202  CacheTransport *transport)
203 {
204  SessionCtxGuard session_guard(msg_req->session_id(), this);
205  cvmfs::MsgBreadcrumbReply msg_reply;
206  CacheTransport::Frame frame_send(&msg_reply);
207 
208  msg_reply.set_req_id(msg_req->req_id());
209  manifest::Breadcrumb breadcrumb;
210  bool retval = transport->ParseMsgHash(msg_req->breadcrumb().hash(),
211  &breadcrumb.catalog_hash);
212  if (!retval) {
213  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
214  "malformed hash received from client");
215  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
216  } else {
217  breadcrumb.timestamp = msg_req->breadcrumb().timestamp();
218  if (msg_req->breadcrumb().has_revision()) {
219  breadcrumb.revision = msg_req->breadcrumb().revision();
220  } else {
221  breadcrumb.revision = 0;
222  }
223  cvmfs::EnumStatus status =
224  StoreBreadcrumb(msg_req->breadcrumb().fqrn(), breadcrumb);
225  msg_reply.set_status(status);
226  }
227  transport->SendFrame(&frame_send);
228 }
229 
230 
232  cvmfs::MsgBreadcrumbLoadReq *msg_req,
233  CacheTransport *transport)
234 {
235  SessionCtxGuard session_guard(msg_req->session_id(), this);
236  cvmfs::MsgBreadcrumbReply msg_reply;
237  CacheTransport::Frame frame_send(&msg_reply);
238 
239  msg_reply.set_req_id(msg_req->req_id());
240  manifest::Breadcrumb breadcrumb;
241  cvmfs::EnumStatus status =
242  LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
243  msg_reply.set_status(status);
244  if (status == cvmfs::STATUS_OK) {
245  assert(breadcrumb.IsValid());
246  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
247  transport->FillMsgHash(breadcrumb.catalog_hash, msg_hash);
248  cvmfs::MsgBreadcrumb *msg_breadcrumb = new cvmfs::MsgBreadcrumb();
249  msg_breadcrumb->set_fqrn(msg_req->fqrn());
250  msg_breadcrumb->set_allocated_hash(msg_hash);
251  msg_breadcrumb->set_timestamp(breadcrumb.timestamp);
252  msg_breadcrumb->set_revision(breadcrumb.revision);
253  msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
254  }
255  transport->SendFrame(&frame_send);
256 }
257 
258 
260  cvmfs::MsgHandshake *msg_req,
261  CacheTransport *transport)
262 {
263  uint64_t session_id = NextSessionId();
264  if (msg_req->has_name()) {
265  sessions_[session_id] = SessionInfo(session_id, msg_req->name());
266  } else {
267  sessions_[session_id] = SessionInfo(session_id,
268  "anonymous client (" + StringifyInt(session_id) + ")");
269  }
270  cvmfs::MsgHandshakeAck msg_ack;
271  CacheTransport::Frame frame_send(&msg_ack);
272 
273  msg_ack.set_status(cvmfs::STATUS_OK);
274  msg_ack.set_name(name_);
275  msg_ack.set_protocol_version(kPbProtocolVersion);
276  msg_ack.set_max_object_size(max_object_size_);
277  msg_ack.set_session_id(session_id);
278  msg_ack.set_capabilities(capabilities_);
279  if (is_local_)
280  msg_ack.set_pid(getpid());
281  transport->SendFrame(&frame_send);
282 }
283 
284 
286  cvmfs::MsgInfoReq *msg_req,
287  CacheTransport *transport)
288 {
289  SessionCtxGuard session_guard(msg_req->session_id(), this);
290  cvmfs::MsgInfoReply msg_reply;
291  CacheTransport::Frame frame_send(&msg_reply);
292 
293  msg_reply.set_req_id(msg_req->req_id());
294  Info info;
295  cvmfs::EnumStatus status = GetInfo(&info);
296  if (status != cvmfs::STATUS_OK) {
297  LogSessionError(msg_req->session_id(), status,
298  "failed to query cache status");
299  }
300  msg_reply.set_size_bytes(info.size_bytes);
301  msg_reply.set_used_bytes(info.used_bytes);
302  msg_reply.set_pinned_bytes(info.pinned_bytes);
303  msg_reply.set_no_shrink(info.no_shrink);
304  msg_reply.set_status(status);
305  transport->SendFrame(&frame_send);
306 }
307 
308 
309 void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
310  if (!msg_req->has_conncnt_change_by())
311  return;
312  int32_t conncnt_change_by = msg_req->conncnt_change_by();
313  if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
314  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
315  "invalid request to drop connection counter below zero");
316  return;
317  }
318  if (conncnt_change_by > 0) {
319  LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
320  } else {
321  LogSessionInfo(msg_req->session_id(), "release session lock");
322  }
323  num_inlimbo_clients_ += conncnt_change_by;
324 }
325 
326 
328  cvmfs::MsgListReq *msg_req,
329  CacheTransport *transport)
330 {
331  SessionCtxGuard session_guard(msg_req->session_id(), this);
332  cvmfs::MsgListReply msg_reply;
333  CacheTransport::Frame frame_send(&msg_reply);
334 
335  msg_reply.set_req_id(msg_req->req_id());
336  int64_t listing_id = msg_req->listing_id();
337  msg_reply.set_listing_id(listing_id);
338  msg_reply.set_is_last_part(true);
339 
340  cvmfs::EnumStatus status;
341  if (msg_req->listing_id() == 0) {
342  listing_id = NextLstId();
343  status = ListingBegin(listing_id, msg_req->object_type());
344  if (status != cvmfs::STATUS_OK) {
345  LogSessionError(msg_req->session_id(), status,
346  "failed to start enumeration of objects");
347  msg_reply.set_status(status);
348  transport->SendFrame(&frame_send);
349  return;
350  }
351  msg_reply.set_listing_id(listing_id);
352  }
353  assert(listing_id != 0);
354 
355  ObjectInfo item;
356  unsigned total_size = 0;
357  while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
358  cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
359  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
360  transport->FillMsgHash(item.id, msg_hash);
361  msg_list_record->set_allocated_hash(msg_hash);
362  msg_list_record->set_pinned(item.pinned);
363  msg_list_record->set_description(item.description);
364  // Approximation of the message size
365  total_size += sizeof(item) + item.description.length();
366  if (total_size > kListingSize)
367  break;
368  }
369  if (status == cvmfs::STATUS_OUTOFBOUNDS) {
370  ListingEnd(listing_id);
371  status = cvmfs::STATUS_OK;
372  } else {
373  msg_reply.set_is_last_part(false);
374  }
375  if (status != cvmfs::STATUS_OK) {
376  LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
377  }
378  msg_reply.set_status(status);
379  transport->SendFrame(&frame_send);
380 }
381 
382 
384  cvmfs::MsgObjectInfoReq *msg_req,
385  CacheTransport *transport)
386 {
387  SessionCtxGuard session_guard(msg_req->session_id(), this);
388  cvmfs::MsgObjectInfoReply msg_reply;
389  CacheTransport::Frame frame_send(&msg_reply);
390 
391  msg_reply.set_req_id(msg_req->req_id());
392  shash::Any object_id;
393  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
394  if (!retval) {
395  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
396  "malformed hash received from client");
397  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
398  } else {
399  ObjectInfo info;
400  cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
401  msg_reply.set_status(status);
402  if (status == cvmfs::STATUS_OK) {
403  msg_reply.set_object_type(info.object_type);
404  msg_reply.set_size(info.size);
405  } else if (status != cvmfs::STATUS_NOENTRY) {
406  LogSessionError(msg_req->session_id(), status,
407  "failed retrieving object details");
408  }
409  }
410  transport->SendFrame(&frame_send);
411 }
412 
413 
415  cvmfs::MsgReadReq *msg_req,
416  CacheTransport *transport)
417 {
418  SessionCtxGuard session_guard(msg_req->session_id(), this);
419  cvmfs::MsgReadReply msg_reply;
420  CacheTransport::Frame frame_send(&msg_reply);
421 
422  msg_reply.set_req_id(msg_req->req_id());
423  shash::Any object_id;
424  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
425  if (!retval || (msg_req->size() > max_object_size_)) {
426  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
427  "malformed hash received from client");
428  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
429  transport->SendFrame(&frame_send);
430  return;
431  }
432  unsigned size = msg_req->size();
433 #ifdef __APPLE__
434  unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
435 #else
436  unsigned char buffer[size];
437 #endif
438  cvmfs::EnumStatus status = Pread(object_id, msg_req->offset(), &size, buffer);
439  msg_reply.set_status(status);
440  if (status == cvmfs::STATUS_OK) {
441  frame_send.set_attachment(buffer, size);
442  } else {
443  LogSessionError(msg_req->session_id(), status,
444  "failed to read from object");
445  }
446  transport->SendFrame(&frame_send);
447 #ifdef __APPLE__
448  free(buffer);
449 #endif
450 }
451 
452 
454  cvmfs::MsgRefcountReq *msg_req,
455  CacheTransport *transport)
456 {
457  SessionCtxGuard session_guard(msg_req->session_id(), this);
458  cvmfs::MsgRefcountReply msg_reply;
459  CacheTransport::Frame frame_send(&msg_reply);
460 
461  msg_reply.set_req_id(msg_req->req_id());
462  shash::Any object_id;
463  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
464  if (!retval) {
465  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
466  "malformed hash received from client");
467  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
468  } else {
469  cvmfs::EnumStatus status = ChangeRefcount(object_id, msg_req->change_by());
470  msg_reply.set_status(status);
471  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
472  LogSessionError(msg_req->session_id(), status,
473  "failed to open/close object " + object_id.ToString());
474  }
475  }
476  transport->SendFrame(&frame_send);
477 }
478 
479 
480 bool CachePlugin::HandleRequest(int fd_con) {
482  char buffer[max_object_size_];
483  CacheTransport::Frame frame_recv;
484  frame_recv.set_attachment(buffer, max_object_size_);
485  bool retval = transport.RecvFrame(&frame_recv);
486  if (!retval) {
488  "failed to receive request from connection (%d)", errno);
489  return false;
490  }
491 
492  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
493 
494  if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
495  cvmfs::MsgHandshake *msg_req =
496  reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed);
497  HandleHandshake(msg_req, &transport);
498  } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
499  cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
500  map<uint64_t, SessionInfo>::const_iterator iter =
501  sessions_.find(msg_req->session_id());
502  if (iter != sessions_.end()) {
503  free(iter->second.reponame);
504  free(iter->second.client_instance);
505  }
506  sessions_.erase(msg_req->session_id());
507  return false;
508  } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
509  HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
510  } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
511  cvmfs::MsgRefcountReq *msg_req =
512  reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed);
513  HandleRefcount(msg_req, &transport);
514  } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
515  cvmfs::MsgObjectInfoReq *msg_req =
516  reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
517  HandleObjectInfo(msg_req, &transport);
518  } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
519  cvmfs::MsgReadReq *msg_req =
520  reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed);
521  HandleRead(msg_req, &transport);
522  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
523  cvmfs::MsgStoreReq *msg_req =
524  reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed);
525  HandleStore(msg_req, &frame_recv, &transport);
526  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
527  cvmfs::MsgStoreAbortReq *msg_req =
528  reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
529  HandleStoreAbort(msg_req, &transport);
530  } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
531  cvmfs::MsgInfoReq *msg_req =
532  reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed);
533  HandleInfo(msg_req, &transport);
534  } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
535  cvmfs::MsgShrinkReq *msg_req =
536  reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed);
537  HandleShrink(msg_req, &transport);
538  } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
539  cvmfs::MsgListReq *msg_req =
540  reinterpret_cast<cvmfs::MsgListReq *>(msg_typed);
541  HandleList(msg_req, &transport);
542  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbStoreReq") {
543  cvmfs::MsgBreadcrumbStoreReq *msg_req =
544  reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed);
545  HandleBreadcrumbStore(msg_req, &transport);
546  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbLoadReq") {
547  cvmfs::MsgBreadcrumbLoadReq *msg_req =
548  reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed);
549  HandleBreadcrumbLoad(msg_req, &transport);
550  } else {
552  "unexpected message from client: %s",
553  msg_typed->GetTypeName().c_str());
554  return false;
555  }
556 
557  return true;
558 }
559 
560 
562  cvmfs::MsgShrinkReq *msg_req,
563  CacheTransport *transport)
564 {
565  SessionCtxGuard session_guard(msg_req->session_id(), this);
566  cvmfs::MsgShrinkReply msg_reply;
567  CacheTransport::Frame frame_send(&msg_reply);
568 
569  msg_reply.set_req_id(msg_req->req_id());
570  uint64_t used_bytes = 0;
571  cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
572  msg_reply.set_used_bytes(used_bytes);
573  msg_reply.set_status(status);
574  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
575  LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
576  }
577  transport->SendFrame(&frame_send);
578 }
579 
580 
582  cvmfs::MsgStoreAbortReq *msg_req,
583  CacheTransport *transport)
584 {
585  SessionCtxGuard session_guard(msg_req->session_id(), this);
586  cvmfs::MsgStoreReply msg_reply;
587  CacheTransport::Frame frame_send(&msg_reply);
588  msg_reply.set_req_id(msg_req->req_id());
589  msg_reply.set_part_nr(0);
590  uint64_t txn_id;
591  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
592  bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
593  if (!retval) {
594  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
595  "malformed transaction id received from client");
596  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
597  } else {
598  cvmfs::EnumStatus status = AbortTxn(txn_id);
599  msg_reply.set_status(status);
600  if (status != cvmfs::STATUS_OK) {
601  LogSessionError(msg_req->session_id(), status,
602  "failed to abort transaction");
603  }
604  txn_ids_.Erase(uniq_req);
605  }
606  transport->SendFrame(&frame_send);
607 }
608 
609 
611  cvmfs::MsgStoreReq *msg_req,
612  CacheTransport::Frame *frame,
613  CacheTransport *transport)
614 {
615  SessionCtxGuard session_guard(msg_req->session_id(), this);
616  cvmfs::MsgStoreReply msg_reply;
617  CacheTransport::Frame frame_send(&msg_reply);
618  msg_reply.set_req_id(msg_req->req_id());
619  msg_reply.set_part_nr(msg_req->part_nr());
620  shash::Any object_id;
621  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
622  if ( !retval ||
623  (frame->att_size() > max_object_size_) ||
624  ((frame->att_size() < max_object_size_) && !msg_req->last_part()) )
625  {
626  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
627  "malformed hash or bad object size received from client");
628  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
629  transport->SendFrame(&frame_send);
630  return;
631  }
632 
633  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
634  uint64_t txn_id;
635  cvmfs::EnumStatus status = cvmfs::STATUS_OK;
636  if (msg_req->part_nr() == 1) {
637  if (txn_ids_.Contains(uniq_req)) {
638  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
639  "invalid attempt to restart running transaction");
640  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
641  transport->SendFrame(&frame_send);
642  return;
643  }
644  txn_id = NextTxnId();
645  ObjectInfo info;
646  info.id = object_id;
647  if (msg_req->has_expected_size()) {info.size = msg_req->expected_size();}
648  if (msg_req->has_object_type()) {info.object_type = msg_req->object_type();}
649  if (msg_req->has_description()) {info.description = msg_req->description();}
650  status = StartTxn(object_id, txn_id, info);
651  if (status != cvmfs::STATUS_OK) {
652  LogSessionError(msg_req->session_id(), status,
653  "failed to start transaction");
654  msg_reply.set_status(status);
655  transport->SendFrame(&frame_send);
656  return;
657  }
658  txn_ids_.Insert(uniq_req, txn_id);
659  } else {
660  retval = txn_ids_.Lookup(uniq_req, &txn_id);
661  if (!retval) {
662  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
663  "invalid transaction received from client");
664  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
665  transport->SendFrame(&frame_send);
666  return;
667  }
668  }
669 
670  // TODO(jblomer): check part number and send objects up in order
671  if (frame->att_size() > 0) {
672  status = WriteTxn(txn_id,
673  reinterpret_cast<unsigned char *>(frame->attachment()),
674  frame->att_size());
675  if (status != cvmfs::STATUS_OK) {
676  LogSessionError(msg_req->session_id(), status, "failure writing object");
677  msg_reply.set_status(status);
678  transport->SendFrame(&frame_send);
679  return;
680  }
681  }
682 
683  if (msg_req->last_part()) {
684  status = CommitTxn(txn_id);
685  if (status != cvmfs::STATUS_OK) {
686  LogSessionError(msg_req->session_id(), status,
687  "failure committing object");
688  }
689  txn_ids_.Erase(uniq_req);
690  }
691  msg_reply.set_status(status);
692  transport->SendFrame(&frame_send);
693 }
694 
695 
697  return atomic_read32(&running_) != 0;
698 }
699 
700 
701 bool CachePlugin::Listen(const string &locator) {
702  vector<string> tokens = SplitString(locator, '=');
703  if (tokens[0] == "unix") {
704  string lock_path = tokens[1] + ".lock";
705  fd_socket_lock_ = TryLockFile(lock_path);
706  if (fd_socket_lock_ == -1) {
708  "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
710  return false;
711  } else if (fd_socket_lock_ == -2) {
712  // Another plugin process probably started in the meantime
714  if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
716  "failed to lock on %s, file is busy", lock_path.c_str());
717  }
718  return false;
719  }
720  assert(fd_socket_lock_ >= 0);
721  fd_socket_ = MakeSocket(tokens[1], 0600);
722  is_local_ = true;
723  } else if (tokens[0] == "tcp") {
724  vector<string> tcp_address = SplitString(tokens[1], ':');
725  if (tcp_address.size() != 2) {
727  "invalid locator: %s", locator.c_str());
729  return false;
730  }
731  fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
732  } else {
734  "unknown endpoint in locator: %s", locator.c_str());
736  return false;
737  }
738 
739  if (fd_socket_ < 0) {
740  if (errno == EADDRINUSE) {
741  // Another plugin process probably started in the meantime
743  } else {
745  "failed to create endpoint %s (%d)", locator.c_str(), errno);
747  }
748  is_local_ = false;
749  return false;
750  }
751  int retval = listen(fd_socket_, 32);
752  assert(retval == 0);
753 
754  return true;
755 }
756 
757 
758 void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
759  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
760  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
761  if (iter != sessions_.end()) {
762  session_str = iter->second.name;
763  }
765  "session '%s': %s", session_str.c_str(), msg.c_str());
766 }
767 
768 
770  uint64_t session_id,
771  cvmfs::EnumStatus status,
772  const std::string &msg)
773 {
774  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
775  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
776  if (iter != sessions_.end()) {
777  session_str = iter->second.name;
778  }
780  "session '%s': %s (%d - %s)",
781  session_str.c_str(), msg.c_str(), status,
782  CacheTransportCode2Ascii(status));
783 }
784 
785 
787  CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
788 
789  platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
790 
791  vector<struct pollfd> watch_fds;
792  // Elements 0, 1: control pipe, socket fd
793  struct pollfd watch_ctrl;
794  watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
795  watch_ctrl.events = POLLIN | POLLPRI;
796  watch_fds.push_back(watch_ctrl);
797  struct pollfd watch_socket;
798  watch_socket.fd = cache_plugin->fd_socket_;
799  watch_socket.events = POLLIN | POLLPRI;
800  watch_fds.push_back(watch_socket);
801 
802  bool terminated = false;
803  while (!terminated) {
804  for (unsigned i = 0; i < watch_fds.size(); ++i)
805  watch_fds[i].revents = 0;
806  int retval = poll(&watch_fds[0], watch_fds.size(), -1);
807  if (retval < 0) {
808  if (errno == EINTR)
809  continue;
810  PANIC(kLogSyslogErr | kLogDebug, "cache plugin connection failure (%d)",
811  errno);
812  }
813 
814  // Termination or detach
815  if (watch_fds[0].revents) {
816  char signal;
817  ReadPipe(watch_fds[0].fd, &signal, 1);
818  if (signal == kSignalDetach) {
819  cache_plugin->SendDetachRequests();
820  continue;
821  }
822 
823  // termination
824  if (watch_fds.size() > 2) {
826  "terminating external cache manager with pending connections");
827  }
828  break;
829  }
830 
831  // New connection
832  if (watch_fds[1].revents) {
833  struct sockaddr_un remote;
834  socklen_t socket_size = sizeof(remote);
835  int fd_con =
836  accept(watch_fds[1].fd, (struct sockaddr *)&remote, &socket_size);
837  if (fd_con < 0) {
839  "failed to establish connection (%d)", errno);
840  continue;
841  }
842  struct pollfd watch_con;
843  watch_con.fd = fd_con;
844  watch_con.events = POLLIN | POLLPRI;
845  watch_fds.push_back(watch_con);
846  cache_plugin->connections_.insert(fd_con);
847  }
848 
849  // New request
850  for (unsigned i = 2; i < watch_fds.size(); ) {
851  if (watch_fds[i].revents) {
852  bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
853  if (!proceed) {
854  close(watch_fds[i].fd);
855  cache_plugin->connections_.erase(watch_fds[i].fd);
856  watch_fds.erase(watch_fds.begin() + i);
857  if ( (getenv(CacheTransport::kEnvReadyNotifyFd) != NULL) &&
858  (cache_plugin->connections_.empty()) &&
859  (cache_plugin->num_inlimbo_clients_ == 0) )
860  {
862  "stopping cache plugin, no more active clients");
863  terminated = true;
864  break;
865  }
866  } else {
867  i++;
868  }
869  } else {
870  i++;
871  }
872  }
873  }
874 
875  // 0, 1 being closed by destructor
876  for (unsigned i = 2; i < watch_fds.size(); ++i)
877  close(watch_fds[i].fd);
878  cache_plugin->txn_ids_.Clear();
879 
880  signal(SIGPIPE, save_sigpipe);
881  return NULL;
882 }
883 
884 
888 void CachePlugin::NotifySupervisor(char signal) {
889  char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
890  if (pipe_ready == NULL)
891  return;
892  int fd_pipe_ready = String2Int64(pipe_ready);
893  WritePipe(fd_pipe_ready, &signal, 1);
894 }
895 
896 
897 void CachePlugin::ProcessRequests(unsigned num_workers) {
898  num_workers_ = num_workers;
899  int retval = pthread_create(&thread_io_, NULL, MainProcessRequests, this);
900  assert(retval == 0);
902  atomic_cas32(&running_, 0, 1);
903 }
904 
905 
907  set<int>::const_iterator iter = connections_.begin();
908  set<int>::const_iterator iter_end = connections_.end();
909  for (; iter != iter_end; ++iter) {
910  CacheTransport transport(*iter,
913  cvmfs::MsgDetach msg_detach;
914  CacheTransport::Frame frame_send(&msg_detach);
915  transport.SendFrame(&frame_send);
916  }
917 }
918 
919 
921  if (IsRunning()) {
922  char terminate = kSignalTerminate;
923  WritePipe(pipe_ctrl_[1], &terminate, 1);
924  pthread_join(thread_io_, NULL);
925  atomic_cas32(&running_, 1, 0);
926  }
927 }
928 
929 
931  if (!IsRunning())
932  return;
933  pthread_join(thread_io_, NULL);
934 }
uint64_t pinned_bytes
Definition: channel.h:90
void Get(uint64_t *id, char **reponame, char **client_instance)
Definition: channel.cc:71
int MakeSocket(const std::string &path, const int mode)
Definition: posix.cc:331
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
Definition: channel.cc:200
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
Definition: channel.h:261
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:383
static void CleanupInstance()
Definition: channel.cc:32
uint64_t timestamp
Definition: manifest.h:40
void Set(uint64_t id, char *reponame, char *client_instance)
Definition: channel.cc:96
static const uint64_t kSizeUnknown
Definition: channel.h:71
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
Definition: posix.cc:389
shash::Any catalog_hash
Definition: manifest.h:39
#define PANIC(...)
Definition: exception.h:29
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
bool is_set
either not yet set or deliberately unset
Definition: channel.h:44
bool IsRunning()
Definition: channel.cc:696
uint64_t NextLstId()
Definition: channel.h:210
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
Definition: channel.cc:309
std::set< int > connections_
Definition: channel.h:265
virtual ~CachePlugin()
Definition: channel.cc:190
uint64_t capabilities_
Definition: channel.h:249
std::map< uint64_t, SessionInfo > sessions_
Definition: channel.h:266
uint64_t capabilities() const
Definition: channel.h:103
bool IsSet()
Definition: channel.cc:86
void ProcessRequests(unsigned num_workers)
Definition: channel.cc:897
cvmfs::EnumObjectType object_type
Definition: channel.h:81
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
void Unset()
Definition: channel.cc:133
static SessionCtx * instance_
Definition: channel.h:57
static const unsigned kDefaultMaxObjectSize
Definition: channel.h:140
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
Definition: channel.h:213
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
Definition: channel.cc:561
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
bool is_local_
Definition: channel.h:248
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
Definition: channel.cc:169
uint64_t num_inlimbo_clients_
Definition: channel.h:259
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
Definition: channel.cc:453
uint64_t revision
Definition: manifest.h:41
bool Listen(const std::string &locator)
Definition: channel.cc:701
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
Definition: string.cc:222
void Terminate()
Definition: channel.cc:920
uint64_t NextTxnId()
Definition: channel.h:207
void AskToDetach()
Definition: channel.cc:163
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
static const unsigned kListingSize
Definition: channel.h:141
#define platform_sighandler_t
static void TlsDestructor(void *data)
Definition: channel.cc:115
void NotifySupervisor(char signal)
Definition: channel.cc:888
void * attachment() const
unsigned max_object_size_
Definition: channel.h:254
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
Definition: posix.cc:913
uint32_t att_size() const
int64_t no_shrink
Definition: channel.h:91
static const char kSignalDetach
Definition: channel.h:143
std::string description
Definition: channel.h:83
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
pthread_t thread_io_
Definition: channel.h:267
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
Definition: channel.cc:769
int fd_socket_lock_
Definition: channel.h:251
string StringifyInt(const int64_t value)
Definition: string.cc:78
static const unsigned kPbProtocolVersion
Definition: channel.h:70
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
Definition: channel.cc:581
SessionCtx()
Definition: channel.cc:38
int fd_socket_
Definition: channel.h:250
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
Definition: channel.cc:327
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:414
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
Definition: channel.cc:259
void SendDetachRequests()
Definition: channel.cc:906
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
Definition: channel.h:264
atomic_int64 next_lst_id_
Definition: channel.h:263
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
Definition: string.cc:228
uint64_t used_bytes
Definition: channel.h:89
atomic_int32 running_
Definition: channel.h:252
Definition: mutex.h:42
unsigned num_workers_
Definition: channel.h:253
~SessionCtx()
Definition: channel.cc:46
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
Definition: channel.cc:610
bool IsValid() const
Definition: manifest.h:35
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
Definition: channel.cc:786
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
void WaitFor()
Definition: channel.cc:930
static const char kSignalTerminate
Definition: channel.h:142
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:551
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
std::string name_
Definition: channel.h:260
atomic_int64 next_txn_id_
Definition: channel.h:262
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:285
static const char * kEnvReadyNotifyFd
int pipe_ctrl_[2]
Definition: channel.h:268
void LogSessionInfo(uint64_t session_id, const std::string &msg)
Definition: channel.cc:758
uint64_t NextSessionId()
Definition: channel.h:204
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:231
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
Definition: posix.cc:995
bool HandleRequest(int fd_con)
Definition: channel.cc:480
uint64_t size_bytes
Definition: channel.h:88
static SessionCtx * GetInstance()
Definition: channel.cc:59
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528