CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
channel.cc
Go to the documentation of this file.
1 
5 #include "channel.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <sys/un.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstring>
16 #include <vector>
17 
18 #include "util/concurrency.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/smalloc.h"
25 #include "util/string.h"
26 
27 using namespace std; // NOLINT
28 
29 
31 
33  delete instance_;
34  instance_ = NULL;
35 }
36 
37 
39  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
40  smalloc(sizeof(pthread_mutex_t)));
41  int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
42  assert(retval == 0);
43 }
44 
45 
47  pthread_mutex_destroy(lock_tls_blocks_);
48  free(lock_tls_blocks_);
49 
50  for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
51  delete tls_blocks_[i];
52  }
53 
54  int retval = pthread_key_delete(thread_local_storage_);
55  assert(retval == 0);
56 }
57 
58 
60  if (instance_ == NULL) {
61  instance_ = new SessionCtx();
62  int retval = pthread_key_create(&instance_->thread_local_storage_,
63  TlsDestructor);
64  assert(retval == 0);
65  }
66 
67  return instance_;
68 }
69 
70 
71 void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
72  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
73  pthread_getspecific(thread_local_storage_));
74  if ((tls == NULL) || !tls->is_set) {
75  *id = 0;
76  *reponame = NULL;
77  *client_instance = NULL;
78  } else {
79  *id = tls->id;
80  *reponame = tls->reponame;
81  *client_instance = tls->client_instance;
82  }
83 }
84 
85 
87  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
88  pthread_getspecific(thread_local_storage_));
89  if (tls == NULL)
90  return false;
91 
92  return tls->is_set;
93 }
94 
95 
96 void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
97  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
98  pthread_getspecific(thread_local_storage_));
99 
100  if (tls == NULL) {
101  tls = new ThreadLocalStorage(id, reponame, client_instance);
102  int retval = pthread_setspecific(thread_local_storage_, tls);
103  assert(retval == 0);
104  MutexLockGuard lock_guard(lock_tls_blocks_);
105  tls_blocks_.push_back(tls);
106  } else {
107  tls->id = id;
108  tls->reponame = reponame;
109  tls->client_instance = client_instance;
110  tls->is_set = true;
111  }
112 }
113 
114 
115 void SessionCtx::TlsDestructor(void *data) {
116  ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
117  delete tls;
118 
119  assert(instance_);
120  MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
121  for (vector<ThreadLocalStorage *>::iterator
122  i = instance_->tls_blocks_.begin(),
123  iEnd = instance_->tls_blocks_.end();
124  i != iEnd; ++i) {
125  if ((*i) == tls) {
126  instance_->tls_blocks_.erase(i);
127  break;
128  }
129  }
130 }
131 
132 
134  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
135  pthread_getspecific(thread_local_storage_));
136  if (tls != NULL) {
137  tls->is_set = false;
138  tls->id = 0;
139  tls->reponame = NULL;
140  tls->client_instance = NULL;
141  }
142 }
143 
144 
145 //------------------------------------------------------------------------------
146 
147 
148 CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
149  : id(id), name(name) {
150  vector<string> tokens = SplitString(name, ':');
151  reponame = strdup(tokens[0].c_str());
152  if (tokens.size() > 1)
153  client_instance = strdup(tokens[1].c_str());
154  else
155  client_instance = NULL;
156 }
157 
158 const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
159 
160 
162  char detach = kSignalDetach;
163  WritePipe(pipe_ctrl_[1], &detach, 1);
164 }
165 
166 
168  : is_local_(false)
169  , capabilities_(capabilities)
170  , fd_socket_(-1)
171  , fd_socket_lock_(-1)
172  , running_(0)
173  , num_workers_(0)
175  , num_inlimbo_clients_(0) {
176  atomic_init64(&next_session_id_);
177  atomic_init64(&next_txn_id_);
178  atomic_init64(&next_lst_id_);
179  // Don't use listing id zero
180  atomic_inc64(&next_lst_id_);
182  memset(&thread_io_, 0, sizeof(thread_io_));
184 }
185 
186 
188  Terminate();
190  if (fd_socket_ >= 0)
191  close(fd_socket_);
192  if (fd_socket_lock_ >= 0)
194 }
195 
196 
197 void CachePlugin::HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
198  CacheTransport *transport) {
199  SessionCtxGuard session_guard(msg_req->session_id(), this);
200  cvmfs::MsgBreadcrumbReply msg_reply;
201  CacheTransport::Frame frame_send(&msg_reply);
202 
203  msg_reply.set_req_id(msg_req->req_id());
204  manifest::Breadcrumb breadcrumb;
205  bool retval = transport->ParseMsgHash(msg_req->breadcrumb().hash(),
206  &breadcrumb.catalog_hash);
207  if (!retval) {
208  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
209  "malformed hash received from client");
210  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
211  } else {
212  breadcrumb.timestamp = msg_req->breadcrumb().timestamp();
213  if (msg_req->breadcrumb().has_revision()) {
214  breadcrumb.revision = msg_req->breadcrumb().revision();
215  } else {
216  breadcrumb.revision = 0;
217  }
218  cvmfs::EnumStatus status = StoreBreadcrumb(msg_req->breadcrumb().fqrn(),
219  breadcrumb);
220  msg_reply.set_status(status);
221  }
222  transport->SendFrame(&frame_send);
223 }
224 
225 
226 void CachePlugin::HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
227  CacheTransport *transport) {
228  SessionCtxGuard session_guard(msg_req->session_id(), this);
229  cvmfs::MsgBreadcrumbReply msg_reply;
230  CacheTransport::Frame frame_send(&msg_reply);
231 
232  msg_reply.set_req_id(msg_req->req_id());
233  manifest::Breadcrumb breadcrumb;
234  cvmfs::EnumStatus status = LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
235  msg_reply.set_status(status);
236  if (status == cvmfs::STATUS_OK) {
237  assert(breadcrumb.IsValid());
238  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
239  transport->FillMsgHash(breadcrumb.catalog_hash, msg_hash);
240  cvmfs::MsgBreadcrumb *msg_breadcrumb = new cvmfs::MsgBreadcrumb();
241  msg_breadcrumb->set_fqrn(msg_req->fqrn());
242  msg_breadcrumb->set_allocated_hash(msg_hash);
243  msg_breadcrumb->set_timestamp(breadcrumb.timestamp);
244  msg_breadcrumb->set_revision(breadcrumb.revision);
245  msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
246  }
247  transport->SendFrame(&frame_send);
248 }
249 
250 
251 void CachePlugin::HandleHandshake(cvmfs::MsgHandshake *msg_req,
252  CacheTransport *transport) {
253  uint64_t session_id = NextSessionId();
254  if (msg_req->has_name()) {
255  sessions_[session_id] = SessionInfo(session_id, msg_req->name());
256  } else {
257  sessions_[session_id] = SessionInfo(
258  session_id, "anonymous client (" + StringifyInt(session_id) + ")");
259  }
260  cvmfs::MsgHandshakeAck msg_ack;
261  CacheTransport::Frame frame_send(&msg_ack);
262 
263  msg_ack.set_status(cvmfs::STATUS_OK);
264  msg_ack.set_name(name_);
265  msg_ack.set_protocol_version(kPbProtocolVersion);
266  msg_ack.set_max_object_size(max_object_size_);
267  msg_ack.set_session_id(session_id);
268  msg_ack.set_capabilities(capabilities_);
269  if (is_local_)
270  msg_ack.set_pid(getpid());
271  transport->SendFrame(&frame_send);
272 }
273 
274 
275 void CachePlugin::HandleInfo(cvmfs::MsgInfoReq *msg_req,
276  CacheTransport *transport) {
277  SessionCtxGuard session_guard(msg_req->session_id(), this);
278  cvmfs::MsgInfoReply msg_reply;
279  CacheTransport::Frame frame_send(&msg_reply);
280 
281  msg_reply.set_req_id(msg_req->req_id());
282  Info info;
283  cvmfs::EnumStatus status = GetInfo(&info);
284  if (status != cvmfs::STATUS_OK) {
285  LogSessionError(msg_req->session_id(), status,
286  "failed to query cache status");
287  }
288  msg_reply.set_size_bytes(info.size_bytes);
289  msg_reply.set_used_bytes(info.used_bytes);
290  msg_reply.set_pinned_bytes(info.pinned_bytes);
291  msg_reply.set_no_shrink(info.no_shrink);
292  msg_reply.set_status(status);
293  transport->SendFrame(&frame_send);
294 }
295 
296 
297 void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
298  if (!msg_req->has_conncnt_change_by())
299  return;
300  int32_t conncnt_change_by = msg_req->conncnt_change_by();
301  if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
302  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
303  "invalid request to drop connection counter below zero");
304  return;
305  }
306  if (conncnt_change_by > 0) {
307  LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
308  } else {
309  LogSessionInfo(msg_req->session_id(), "release session lock");
310  }
311  num_inlimbo_clients_ += conncnt_change_by;
312 }
313 
314 
315 void CachePlugin::HandleList(cvmfs::MsgListReq *msg_req,
316  CacheTransport *transport) {
317  SessionCtxGuard session_guard(msg_req->session_id(), this);
318  cvmfs::MsgListReply msg_reply;
319  CacheTransport::Frame frame_send(&msg_reply);
320 
321  msg_reply.set_req_id(msg_req->req_id());
322  int64_t listing_id = msg_req->listing_id();
323  msg_reply.set_listing_id(listing_id);
324  msg_reply.set_is_last_part(true);
325 
326  cvmfs::EnumStatus status;
327  if (msg_req->listing_id() == 0) {
328  listing_id = NextLstId();
329  status = ListingBegin(listing_id, msg_req->object_type());
330  if (status != cvmfs::STATUS_OK) {
331  LogSessionError(msg_req->session_id(), status,
332  "failed to start enumeration of objects");
333  msg_reply.set_status(status);
334  transport->SendFrame(&frame_send);
335  return;
336  }
337  msg_reply.set_listing_id(listing_id);
338  }
339  assert(listing_id != 0);
340 
341  ObjectInfo item;
342  unsigned total_size = 0;
343  while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
344  cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
345  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
346  transport->FillMsgHash(item.id, msg_hash);
347  msg_list_record->set_allocated_hash(msg_hash);
348  msg_list_record->set_pinned(item.pinned);
349  msg_list_record->set_description(item.description);
350  // Approximation of the message size
351  total_size += sizeof(item) + item.description.length();
352  if (total_size > kListingSize)
353  break;
354  }
355  if (status == cvmfs::STATUS_OUTOFBOUNDS) {
356  ListingEnd(listing_id);
357  status = cvmfs::STATUS_OK;
358  } else {
359  msg_reply.set_is_last_part(false);
360  }
361  if (status != cvmfs::STATUS_OK) {
362  LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
363  }
364  msg_reply.set_status(status);
365  transport->SendFrame(&frame_send);
366 }
367 
368 
369 void CachePlugin::HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
370  CacheTransport *transport) {
371  SessionCtxGuard session_guard(msg_req->session_id(), this);
372  cvmfs::MsgObjectInfoReply msg_reply;
373  CacheTransport::Frame frame_send(&msg_reply);
374 
375  msg_reply.set_req_id(msg_req->req_id());
376  shash::Any object_id;
377  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
378  if (!retval) {
379  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
380  "malformed hash received from client");
381  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
382  } else {
383  ObjectInfo info;
384  cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
385  msg_reply.set_status(status);
386  if (status == cvmfs::STATUS_OK) {
387  msg_reply.set_object_type(info.object_type);
388  msg_reply.set_size(info.size);
389  } else if (status != cvmfs::STATUS_NOENTRY) {
390  LogSessionError(msg_req->session_id(), status,
391  "failed retrieving object details");
392  }
393  }
394  transport->SendFrame(&frame_send);
395 }
396 
397 
398 void CachePlugin::HandleRead(cvmfs::MsgReadReq *msg_req,
399  CacheTransport *transport) {
400  SessionCtxGuard session_guard(msg_req->session_id(), this);
401  cvmfs::MsgReadReply msg_reply;
402  CacheTransport::Frame frame_send(&msg_reply);
403 
404  msg_reply.set_req_id(msg_req->req_id());
405  shash::Any object_id;
406  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
407  if (!retval || (msg_req->size() > max_object_size_)) {
408  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
409  "malformed hash received from client");
410  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
411  transport->SendFrame(&frame_send);
412  return;
413  }
414  unsigned size = msg_req->size();
415 #ifdef __APPLE__
416  unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
417 #else
418  unsigned char buffer[size];
419 #endif
420  cvmfs::EnumStatus status = Pread(object_id, msg_req->offset(), &size, buffer);
421  msg_reply.set_status(status);
422  if (status == cvmfs::STATUS_OK) {
423  frame_send.set_attachment(buffer, size);
424  } else {
425  LogSessionError(msg_req->session_id(), status,
426  "failed to read from object");
427  }
428  transport->SendFrame(&frame_send);
429 #ifdef __APPLE__
430  free(buffer);
431 #endif
432 }
433 
434 
435 void CachePlugin::HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
436  CacheTransport *transport) {
437  SessionCtxGuard session_guard(msg_req->session_id(), this);
438  cvmfs::MsgRefcountReply msg_reply;
439  CacheTransport::Frame frame_send(&msg_reply);
440 
441  msg_reply.set_req_id(msg_req->req_id());
442  shash::Any object_id;
443  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
444  if (!retval) {
445  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
446  "malformed hash received from client");
447  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
448  } else {
449  cvmfs::EnumStatus status = ChangeRefcount(object_id, msg_req->change_by());
450  msg_reply.set_status(status);
451  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
452  LogSessionError(msg_req->session_id(), status,
453  "failed to open/close object " + object_id.ToString());
454  }
455  }
456  transport->SendFrame(&frame_send);
457 }
458 
459 
460 bool CachePlugin::HandleRequest(int fd_con) {
462  char buffer[max_object_size_];
463  CacheTransport::Frame frame_recv;
464  frame_recv.set_attachment(buffer, max_object_size_);
465  bool retval = transport.RecvFrame(&frame_recv);
466  if (!retval) {
468  "failed to receive request from connection (%d)", errno);
469  return false;
470  }
471 
472  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
473 
474  if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
475  cvmfs::MsgHandshake *msg_req = reinterpret_cast<cvmfs::MsgHandshake *>(
476  msg_typed);
477  HandleHandshake(msg_req, &transport);
478  } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
479  cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
480  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(
481  msg_req->session_id());
482  if (iter != sessions_.end()) {
483  free(iter->second.reponame);
484  free(iter->second.client_instance);
485  }
486  sessions_.erase(msg_req->session_id());
487  return false;
488  } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
489  HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
490  } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
491  cvmfs::MsgRefcountReq *msg_req = reinterpret_cast<cvmfs::MsgRefcountReq *>(
492  msg_typed);
493  HandleRefcount(msg_req, &transport);
494  } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
495  cvmfs::MsgObjectInfoReq
496  *msg_req = reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
497  HandleObjectInfo(msg_req, &transport);
498  } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
499  cvmfs::MsgReadReq *msg_req = reinterpret_cast<cvmfs::MsgReadReq *>(
500  msg_typed);
501  HandleRead(msg_req, &transport);
502  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
503  cvmfs::MsgStoreReq *msg_req = reinterpret_cast<cvmfs::MsgStoreReq *>(
504  msg_typed);
505  HandleStore(msg_req, &frame_recv, &transport);
506  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
507  cvmfs::MsgStoreAbortReq
508  *msg_req = reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
509  HandleStoreAbort(msg_req, &transport);
510  } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
511  cvmfs::MsgInfoReq *msg_req = reinterpret_cast<cvmfs::MsgInfoReq *>(
512  msg_typed);
513  HandleInfo(msg_req, &transport);
514  } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
515  cvmfs::MsgShrinkReq *msg_req = reinterpret_cast<cvmfs::MsgShrinkReq *>(
516  msg_typed);
517  HandleShrink(msg_req, &transport);
518  } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
519  cvmfs::MsgListReq *msg_req = reinterpret_cast<cvmfs::MsgListReq *>(
520  msg_typed);
521  HandleList(msg_req, &transport);
522  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbStoreReq") {
523  cvmfs::MsgBreadcrumbStoreReq
524  *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed);
525  HandleBreadcrumbStore(msg_req, &transport);
526  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbLoadReq") {
527  cvmfs::MsgBreadcrumbLoadReq
528  *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed);
529  HandleBreadcrumbLoad(msg_req, &transport);
530  } else {
532  "unexpected message from client: %s",
533  std::string(msg_typed->GetTypeName()).c_str());
534  return false;
535  }
536 
537  return true;
538 }
539 
540 
541 void CachePlugin::HandleShrink(cvmfs::MsgShrinkReq *msg_req,
542  CacheTransport *transport) {
543  SessionCtxGuard session_guard(msg_req->session_id(), this);
544  cvmfs::MsgShrinkReply msg_reply;
545  CacheTransport::Frame frame_send(&msg_reply);
546 
547  msg_reply.set_req_id(msg_req->req_id());
548  uint64_t used_bytes = 0;
549  cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
550  msg_reply.set_used_bytes(used_bytes);
551  msg_reply.set_status(status);
552  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
553  LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
554  }
555  transport->SendFrame(&frame_send);
556 }
557 
558 
559 void CachePlugin::HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
560  CacheTransport *transport) {
561  SessionCtxGuard session_guard(msg_req->session_id(), this);
562  cvmfs::MsgStoreReply msg_reply;
563  CacheTransport::Frame frame_send(&msg_reply);
564  msg_reply.set_req_id(msg_req->req_id());
565  msg_reply.set_part_nr(0);
566  uint64_t txn_id;
567  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
568  bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
569  if (!retval) {
570  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
571  "malformed transaction id received from client");
572  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
573  } else {
574  cvmfs::EnumStatus status = AbortTxn(txn_id);
575  msg_reply.set_status(status);
576  if (status != cvmfs::STATUS_OK) {
577  LogSessionError(msg_req->session_id(), status,
578  "failed to abort transaction");
579  }
580  txn_ids_.Erase(uniq_req);
581  }
582  transport->SendFrame(&frame_send);
583 }
584 
585 
586 void CachePlugin::HandleStore(cvmfs::MsgStoreReq *msg_req,
587  CacheTransport::Frame *frame,
588  CacheTransport *transport) {
589  SessionCtxGuard session_guard(msg_req->session_id(), this);
590  cvmfs::MsgStoreReply msg_reply;
591  CacheTransport::Frame frame_send(&msg_reply);
592  msg_reply.set_req_id(msg_req->req_id());
593  msg_reply.set_part_nr(msg_req->part_nr());
594  shash::Any object_id;
595  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
596  if (!retval || (frame->att_size() > max_object_size_)
597  || ((frame->att_size() < max_object_size_) && !msg_req->last_part())) {
598  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
599  "malformed hash or bad object size received from client");
600  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
601  transport->SendFrame(&frame_send);
602  return;
603  }
604 
605  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
606  uint64_t txn_id;
607  cvmfs::EnumStatus status = cvmfs::STATUS_OK;
608  if (msg_req->part_nr() == 1) {
609  if (txn_ids_.Contains(uniq_req)) {
610  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
611  "invalid attempt to restart running transaction");
612  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
613  transport->SendFrame(&frame_send);
614  return;
615  }
616  txn_id = NextTxnId();
617  ObjectInfo info;
618  info.id = object_id;
619  if (msg_req->has_expected_size()) {
620  info.size = msg_req->expected_size();
621  }
622  if (msg_req->has_object_type()) {
623  info.object_type = msg_req->object_type();
624  }
625  if (msg_req->has_description()) {
626  info.description = msg_req->description();
627  }
628  status = StartTxn(object_id, txn_id, info);
629  if (status != cvmfs::STATUS_OK) {
630  LogSessionError(msg_req->session_id(), status,
631  "failed to start transaction");
632  msg_reply.set_status(status);
633  transport->SendFrame(&frame_send);
634  return;
635  }
636  txn_ids_.Insert(uniq_req, txn_id);
637  } else {
638  retval = txn_ids_.Lookup(uniq_req, &txn_id);
639  if (!retval) {
640  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
641  "invalid transaction received from client");
642  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
643  transport->SendFrame(&frame_send);
644  return;
645  }
646  }
647 
648  // TODO(jblomer): check part number and send objects up in order
649  if (frame->att_size() > 0) {
650  status = WriteTxn(txn_id,
651  reinterpret_cast<unsigned char *>(frame->attachment()),
652  frame->att_size());
653  if (status != cvmfs::STATUS_OK) {
654  LogSessionError(msg_req->session_id(), status, "failure writing object");
655  msg_reply.set_status(status);
656  transport->SendFrame(&frame_send);
657  return;
658  }
659  }
660 
661  if (msg_req->last_part()) {
662  status = CommitTxn(txn_id);
663  if (status != cvmfs::STATUS_OK) {
664  LogSessionError(msg_req->session_id(), status,
665  "failure committing object");
666  }
667  txn_ids_.Erase(uniq_req);
668  }
669  msg_reply.set_status(status);
670  transport->SendFrame(&frame_send);
671 }
672 
673 
674 bool CachePlugin::IsRunning() { return atomic_read32(&running_) != 0; }
675 
676 
677 bool CachePlugin::Listen(const string &locator) {
678  vector<string> tokens = SplitString(locator, '=');
679  if (tokens[0] == "unix") {
680  string lock_path = tokens[1] + ".lock";
681  fd_socket_lock_ = TryLockFile(lock_path);
682  if (fd_socket_lock_ == -1) {
684  "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
686  return false;
687  } else if (fd_socket_lock_ == -2) {
688  // Another plugin process probably started in the meantime
690  if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
692  "failed to lock on %s, file is busy", lock_path.c_str());
693  }
694  return false;
695  }
696  assert(fd_socket_lock_ >= 0);
697  fd_socket_ = MakeSocket(tokens[1], 0600);
698  is_local_ = true;
699  } else if (tokens[0] == "tcp") {
700  vector<string> tcp_address = SplitString(tokens[1], ':');
701  if (tcp_address.size() != 2) {
702  LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, "invalid locator: %s",
703  locator.c_str());
705  return false;
706  }
707  fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
708  } else {
710  "unknown endpoint in locator: %s", locator.c_str());
712  return false;
713  }
714 
715  if (fd_socket_ < 0) {
716  if (errno == EADDRINUSE) {
717  // Another plugin process probably started in the meantime
719  } else {
721  "failed to create endpoint %s (%d)", locator.c_str(), errno);
723  }
724  is_local_ = false;
725  return false;
726  }
727  int retval = listen(fd_socket_, 32);
728  assert(retval == 0);
729 
730  return true;
731 }
732 
733 
734 void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
735  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
736  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
737  if (iter != sessions_.end()) {
738  session_str = iter->second.name;
739  }
740  LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "session '%s': %s",
741  session_str.c_str(), msg.c_str());
742 }
743 
744 
745 void CachePlugin::LogSessionError(uint64_t session_id,
746  cvmfs::EnumStatus status,
747  const std::string &msg) {
748  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
749  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
750  if (iter != sessions_.end()) {
751  session_str = iter->second.name;
752  }
753  LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, "session '%s': %s (%d - %s)",
754  session_str.c_str(), msg.c_str(), status,
755  CacheTransportCode2Ascii(status));
756 }
757 
758 
760  CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
761 
762  platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
763 
764  vector<struct pollfd> watch_fds;
765  // Elements 0, 1: control pipe, socket fd
766  struct pollfd watch_ctrl;
767  watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
768  watch_ctrl.events = POLLIN | POLLPRI;
769  watch_fds.push_back(watch_ctrl);
770  struct pollfd watch_socket;
771  watch_socket.fd = cache_plugin->fd_socket_;
772  watch_socket.events = POLLIN | POLLPRI;
773  watch_fds.push_back(watch_socket);
774 
775  bool terminated = false;
776  while (!terminated) {
777  for (unsigned i = 0; i < watch_fds.size(); ++i)
778  watch_fds[i].revents = 0;
779  int retval = poll(&watch_fds[0], watch_fds.size(), -1);
780  if (retval < 0) {
781  if (errno == EINTR)
782  continue;
783  PANIC(kLogSyslogErr | kLogDebug, "cache plugin connection failure (%d)",
784  errno);
785  }
786 
787  // Termination or detach
788  if (watch_fds[0].revents) {
789  char signal;
790  ReadPipe(watch_fds[0].fd, &signal, 1);
791  if (signal == kSignalDetach) {
792  cache_plugin->SendDetachRequests();
793  continue;
794  }
795 
796  // termination
797  if (watch_fds.size() > 2) {
799  "terminating external cache manager with pending connections");
800  }
801  break;
802  }
803 
804  // New connection
805  if (watch_fds[1].revents) {
806  struct sockaddr_un remote;
807  socklen_t socket_size = sizeof(remote);
808  int fd_con = accept(watch_fds[1].fd, (struct sockaddr *)&remote,
809  &socket_size);
810  if (fd_con < 0) {
812  "failed to establish connection (%d)", errno);
813  continue;
814  }
815  struct pollfd watch_con;
816  watch_con.fd = fd_con;
817  watch_con.events = POLLIN | POLLPRI;
818  watch_fds.push_back(watch_con);
819  cache_plugin->connections_.insert(fd_con);
820  }
821 
822  // New request
823  for (unsigned i = 2; i < watch_fds.size();) {
824  if (watch_fds[i].revents) {
825  bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
826  if (!proceed) {
827  close(watch_fds[i].fd);
828  cache_plugin->connections_.erase(watch_fds[i].fd);
829  watch_fds.erase(watch_fds.begin() + i);
830  if ((getenv(CacheTransport::kEnvReadyNotifyFd) != NULL)
831  && (cache_plugin->connections_.empty())
832  && (cache_plugin->num_inlimbo_clients_ == 0)) {
834  "stopping cache plugin, no more active clients");
835  terminated = true;
836  break;
837  }
838  } else {
839  i++;
840  }
841  } else {
842  i++;
843  }
844  }
845  }
846 
847  // 0, 1 being closed by destructor
848  for (unsigned i = 2; i < watch_fds.size(); ++i)
849  close(watch_fds[i].fd);
850  cache_plugin->txn_ids_.Clear();
851 
852  signal(SIGPIPE, save_sigpipe);
853  return NULL;
854 }
855 
856 
860 void CachePlugin::NotifySupervisor(char signal) {
861  char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
862  if (pipe_ready == NULL)
863  return;
864  int fd_pipe_ready = String2Int64(pipe_ready);
865  WritePipe(fd_pipe_ready, &signal, 1);
866 }
867 
868 
869 void CachePlugin::ProcessRequests(unsigned num_workers) {
870  num_workers_ = num_workers;
871  int retval = pthread_create(&thread_io_, NULL, MainProcessRequests, this);
872  assert(retval == 0);
874  atomic_cas32(&running_, 0, 1);
875 }
876 
877 
879  set<int>::const_iterator iter = connections_.begin();
880  set<int>::const_iterator iter_end = connections_.end();
881  for (; iter != iter_end; ++iter) {
882  CacheTransport transport(*iter,
885  cvmfs::MsgDetach msg_detach;
886  CacheTransport::Frame frame_send(&msg_detach);
887  transport.SendFrame(&frame_send);
888  }
889 }
890 
891 
893  if (IsRunning()) {
894  char terminate = kSignalTerminate;
895  WritePipe(pipe_ctrl_[1], &terminate, 1);
896  pthread_join(thread_io_, NULL);
897  atomic_cas32(&running_, 1, 0);
898  }
899 }
900 
901 
903  if (!IsRunning())
904  return;
905  pthread_join(thread_io_, NULL);
906 }
uint64_t pinned_bytes
Definition: channel.h:89
void Get(uint64_t *id, char **reponame, char **client_instance)
Definition: channel.cc:71
int MakeSocket(const std::string &path, const int mode)
Definition: posix.cc:327
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
Definition: channel.cc:197
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
Definition: channel.h:253
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:369
static void CleanupInstance()
Definition: channel.cc:32
uint64_t timestamp
Definition: manifest.h:39
void Set(uint64_t id, char *reponame, char *client_instance)
Definition: channel.cc:96
static const uint64_t kSizeUnknown
Definition: channel.h:70
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
Definition: posix.cc:384
shash::Any catalog_hash
Definition: manifest.h:38
#define PANIC(...)
Definition: exception.h:29
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
bool is_set
either not yet set or deliberately unset
Definition: channel.h:43
bool IsRunning()
Definition: channel.cc:674
uint64_t NextLstId()
Definition: channel.h:206
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
Definition: channel.cc:297
std::set< int > connections_
Definition: channel.h:257
virtual ~CachePlugin()
Definition: channel.cc:187
uint64_t capabilities_
Definition: channel.h:241
std::map< uint64_t, SessionInfo > sessions_
Definition: channel.h:258
uint64_t capabilities() const
Definition: channel.h:102
bool IsSet()
Definition: channel.cc:86
void ProcessRequests(unsigned num_workers)
Definition: channel.cc:869
cvmfs::EnumObjectType object_type
Definition: channel.h:80
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
void Unset()
Definition: channel.cc:133
static SessionCtx * instance_
Definition: channel.h:56
static const unsigned kDefaultMaxObjectSize
Definition: channel.h:138
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
Definition: channel.h:207
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
Definition: channel.cc:541
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
bool is_local_
Definition: channel.h:240
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
Definition: channel.cc:167
uint64_t num_inlimbo_clients_
Definition: channel.h:251
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
Definition: channel.cc:435
uint64_t revision
Definition: manifest.h:40
bool Listen(const std::string &locator)
Definition: channel.cc:677
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
Definition: string.cc:234
void Terminate()
Definition: channel.cc:892
uint64_t NextTxnId()
Definition: channel.h:205
void AskToDetach()
Definition: channel.cc:161
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
static const unsigned kListingSize
Definition: channel.h:139
#define platform_sighandler_t
static void TlsDestructor(void *data)
Definition: channel.cc:115
void NotifySupervisor(char signal)
Definition: channel.cc:860
void * attachment() const
unsigned max_object_size_
Definition: channel.h:246
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
Definition: posix.cc:922
uint32_t att_size() const
int64_t no_shrink
Definition: channel.h:90
static const char kSignalDetach
Definition: channel.h:141
std::string description
Definition: channel.h:82
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
pthread_t thread_io_
Definition: channel.h:259
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
Definition: channel.cc:745
int fd_socket_lock_
Definition: channel.h:243
string StringifyInt(const int64_t value)
Definition: string.cc:77
static const unsigned kPbProtocolVersion
Definition: channel.h:69
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
Definition: channel.cc:559
SessionCtx()
Definition: channel.cc:38
int fd_socket_
Definition: channel.h:242
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
Definition: channel.cc:315
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:398
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
Definition: channel.cc:251
void SendDetachRequests()
Definition: channel.cc:878
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
Definition: channel.h:256
atomic_int64 next_lst_id_
Definition: channel.h:255
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
uint64_t used_bytes
Definition: channel.h:88
atomic_int32 running_
Definition: channel.h:244
Definition: mutex.h:42
unsigned num_workers_
Definition: channel.h:245
~SessionCtx()
Definition: channel.cc:46
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
Definition: channel.cc:586
bool IsValid() const
Definition: manifest.h:33
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
Definition: channel.cc:759
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
void WaitFor()
Definition: channel.cc:902
static const char kSignalTerminate
Definition: channel.h:140
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
std::string name_
Definition: channel.h:252
atomic_int64 next_txn_id_
Definition: channel.h:254
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:275
static const char * kEnvReadyNotifyFd
int pipe_ctrl_[2]
Definition: channel.h:260
void LogSessionInfo(uint64_t session_id, const std::string &msg)
Definition: channel.cc:734
uint64_t NextSessionId()
Definition: channel.h:202
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:226
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
Definition: posix.cc:1003
bool HandleRequest(int fd_con)
Definition: channel.cc:460
uint64_t size_bytes
Definition: channel.h:87
static SessionCtx * GetInstance()
Definition: channel.cc:59
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545