CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
channel.cc
Go to the documentation of this file.
1 
5 #include "channel.h"
6 
7 #include <errno.h>
8 #include <poll.h>
9 #include <signal.h>
10 #include <sys/socket.h>
11 #include <sys/un.h>
12 #include <unistd.h>
13 
14 #include <cassert>
15 #include <cstring>
16 #include <vector>
17 
18 #include "util/concurrency.h"
19 #include "util/exception.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/smalloc.h"
25 #include "util/string.h"
26 
27 using namespace std; // NOLINT
28 
29 
31 
33  delete instance_;
34  instance_ = NULL;
35 }
36 
37 
39  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
40  smalloc(sizeof(pthread_mutex_t)));
41  const int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
42  assert(retval == 0);
43 }
44 
45 
47  pthread_mutex_destroy(lock_tls_blocks_);
48  free(lock_tls_blocks_);
49 
50  for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
51  delete tls_blocks_[i];
52  }
53 
54  const int retval = pthread_key_delete(thread_local_storage_);
55  assert(retval == 0);
56 }
57 
58 
60  if (instance_ == NULL) {
61  instance_ = new SessionCtx();
62  const int retval =
63  pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
64  assert(retval == 0);
65  }
66 
67  return instance_;
68 }
69 
70 
71 void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
72  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
73  pthread_getspecific(thread_local_storage_));
74  if ((tls == NULL) || !tls->is_set) {
75  *id = 0;
76  *reponame = NULL;
77  *client_instance = NULL;
78  } else {
79  *id = tls->id;
80  *reponame = tls->reponame;
81  *client_instance = tls->client_instance;
82  }
83 }
84 
85 
87  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
88  pthread_getspecific(thread_local_storage_));
89  if (tls == NULL)
90  return false;
91 
92  return tls->is_set;
93 }
94 
95 
96 void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
97  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
98  pthread_getspecific(thread_local_storage_));
99 
100  if (tls == NULL) {
101  tls = new ThreadLocalStorage(id, reponame, client_instance);
102  const int retval = pthread_setspecific(thread_local_storage_, tls);
103  assert(retval == 0);
104  const MutexLockGuard lock_guard(lock_tls_blocks_);
105  tls_blocks_.push_back(tls);
106  } else {
107  tls->id = id;
108  tls->reponame = reponame;
109  tls->client_instance = client_instance;
110  tls->is_set = true;
111  }
112 }
113 
114 
115 void SessionCtx::TlsDestructor(void *data) {
116  ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
117  delete tls;
118 
119  assert(instance_);
120  const MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
121  for (vector<ThreadLocalStorage *>::iterator
122  i = instance_->tls_blocks_.begin(),
123  iEnd = instance_->tls_blocks_.end();
124  i != iEnd; ++i) {
125  if ((*i) == tls) {
126  instance_->tls_blocks_.erase(i);
127  break;
128  }
129  }
130 }
131 
132 
134  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
135  pthread_getspecific(thread_local_storage_));
136  if (tls != NULL) {
137  tls->is_set = false;
138  tls->id = 0;
139  tls->reponame = NULL;
140  tls->client_instance = NULL;
141  }
142 }
143 
144 
145 //------------------------------------------------------------------------------
146 
147 
148 CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
149  : id(id), name(name) {
150  vector<string> tokens = SplitString(name, ':');
151  reponame = strdup(tokens[0].c_str());
152  if (tokens.size() > 1)
153  client_instance = strdup(tokens[1].c_str());
154  else
155  client_instance = NULL;
156 }
157 
158 const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
159 
160 
162  char detach = kSignalDetach;
163  WritePipe(pipe_ctrl_[1], &detach, 1);
164 }
165 
166 
168  : is_local_(false)
169  , capabilities_(capabilities)
170  , fd_socket_(-1)
171  , fd_socket_lock_(-1)
172  , running_(0)
173  , num_workers_(0)
175  , num_inlimbo_clients_(0) {
176  atomic_init64(&next_session_id_);
177  atomic_init64(&next_txn_id_);
178  atomic_init64(&next_lst_id_);
179  // Don't use listing id zero
180  atomic_inc64(&next_lst_id_);
182  memset(&thread_io_, 0, sizeof(thread_io_));
184 }
185 
186 
188  Terminate();
190  if (fd_socket_ >= 0)
191  close(fd_socket_);
192  if (fd_socket_lock_ >= 0)
194 }
195 
196 
197 void CachePlugin::HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req,
198  CacheTransport *transport) {
199  const SessionCtxGuard session_guard(msg_req->session_id(), this);
200  cvmfs::MsgBreadcrumbReply msg_reply;
201  CacheTransport::Frame frame_send(&msg_reply);
202 
203  msg_reply.set_req_id(msg_req->req_id());
204  manifest::Breadcrumb breadcrumb;
205  const bool retval = transport->ParseMsgHash(msg_req->breadcrumb().hash(),
206  &breadcrumb.catalog_hash);
207  if (!retval) {
208  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
209  "malformed hash received from client");
210  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
211  } else {
212  breadcrumb.timestamp = msg_req->breadcrumb().timestamp();
213  if (msg_req->breadcrumb().has_revision()) {
214  breadcrumb.revision = msg_req->breadcrumb().revision();
215  } else {
216  breadcrumb.revision = 0;
217  }
218  const cvmfs::EnumStatus status =
219  StoreBreadcrumb(msg_req->breadcrumb().fqrn(), breadcrumb);
220  msg_reply.set_status(status);
221  }
222  transport->SendFrame(&frame_send);
223 }
224 
225 
226 void CachePlugin::HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req,
227  CacheTransport *transport) {
228  const SessionCtxGuard session_guard(msg_req->session_id(), this);
229  cvmfs::MsgBreadcrumbReply msg_reply;
230  CacheTransport::Frame frame_send(&msg_reply);
231 
232  msg_reply.set_req_id(msg_req->req_id());
233  manifest::Breadcrumb breadcrumb;
234  const cvmfs::EnumStatus status = LoadBreadcrumb(msg_req->fqrn(), &breadcrumb);
235  msg_reply.set_status(status);
236  if (status == cvmfs::STATUS_OK) {
237  assert(breadcrumb.IsValid());
238  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
239  transport->FillMsgHash(breadcrumb.catalog_hash, msg_hash);
240  cvmfs::MsgBreadcrumb *msg_breadcrumb = new cvmfs::MsgBreadcrumb();
241  msg_breadcrumb->set_fqrn(msg_req->fqrn());
242  msg_breadcrumb->set_allocated_hash(msg_hash);
243  msg_breadcrumb->set_timestamp(breadcrumb.timestamp);
244  msg_breadcrumb->set_revision(breadcrumb.revision);
245  msg_reply.set_allocated_breadcrumb(msg_breadcrumb);
246  }
247  transport->SendFrame(&frame_send);
248 }
249 
250 
251 void CachePlugin::HandleHandshake(cvmfs::MsgHandshake *msg_req,
252  CacheTransport *transport) {
253  const uint64_t session_id = NextSessionId();
254  if (msg_req->has_name()) {
255  sessions_[session_id] = SessionInfo(session_id, msg_req->name());
256  } else {
257  sessions_[session_id] = SessionInfo(
258  session_id, "anonymous client (" + StringifyInt(session_id) + ")");
259  }
260  cvmfs::MsgHandshakeAck msg_ack;
261  CacheTransport::Frame frame_send(&msg_ack);
262 
263  msg_ack.set_status(cvmfs::STATUS_OK);
264  msg_ack.set_name(name_);
265  msg_ack.set_protocol_version(kPbProtocolVersion);
266  msg_ack.set_max_object_size(max_object_size_);
267  msg_ack.set_session_id(session_id);
268  msg_ack.set_capabilities(capabilities_);
269  if (is_local_)
270  msg_ack.set_pid(getpid());
271  transport->SendFrame(&frame_send);
272 }
273 
274 
275 void CachePlugin::HandleInfo(cvmfs::MsgInfoReq *msg_req,
276  CacheTransport *transport) {
277  const SessionCtxGuard session_guard(msg_req->session_id(), this);
278  cvmfs::MsgInfoReply msg_reply;
279  CacheTransport::Frame frame_send(&msg_reply);
280 
281  msg_reply.set_req_id(msg_req->req_id());
282  Info info;
283  const cvmfs::EnumStatus status = GetInfo(&info);
284  if (status != cvmfs::STATUS_OK) {
285  LogSessionError(msg_req->session_id(), status,
286  "failed to query cache status");
287  }
288  msg_reply.set_size_bytes(info.size_bytes);
289  msg_reply.set_used_bytes(info.used_bytes);
290  msg_reply.set_pinned_bytes(info.pinned_bytes);
291  msg_reply.set_no_shrink(info.no_shrink);
292  msg_reply.set_status(status);
293  transport->SendFrame(&frame_send);
294 }
295 
296 
297 void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
298  if (!msg_req->has_conncnt_change_by())
299  return;
300  const int32_t conncnt_change_by = msg_req->conncnt_change_by();
301  if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
302  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
303  "invalid request to drop connection counter below zero");
304  return;
305  }
306  if (conncnt_change_by > 0) {
307  LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
308  } else {
309  LogSessionInfo(msg_req->session_id(), "release session lock");
310  }
311  num_inlimbo_clients_ += conncnt_change_by;
312 }
313 
314 
315 void CachePlugin::HandleList(cvmfs::MsgListReq *msg_req,
316  CacheTransport *transport) {
317  const SessionCtxGuard session_guard(msg_req->session_id(), this);
318  cvmfs::MsgListReply msg_reply;
319  CacheTransport::Frame frame_send(&msg_reply);
320 
321  msg_reply.set_req_id(msg_req->req_id());
322  int64_t listing_id = msg_req->listing_id();
323  msg_reply.set_listing_id(listing_id);
324  msg_reply.set_is_last_part(true);
325 
326  cvmfs::EnumStatus status;
327  if (msg_req->listing_id() == 0) {
328  listing_id = NextLstId();
329  status = ListingBegin(listing_id, msg_req->object_type());
330  if (status != cvmfs::STATUS_OK) {
331  LogSessionError(msg_req->session_id(), status,
332  "failed to start enumeration of objects");
333  msg_reply.set_status(status);
334  transport->SendFrame(&frame_send);
335  return;
336  }
337  msg_reply.set_listing_id(listing_id);
338  }
339  assert(listing_id != 0);
340 
341  ObjectInfo item;
342  unsigned total_size = 0;
343  while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
344  cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
345  cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
346  transport->FillMsgHash(item.id, msg_hash);
347  msg_list_record->set_allocated_hash(msg_hash);
348  msg_list_record->set_pinned(item.pinned);
349  msg_list_record->set_description(item.description);
350  // Approximation of the message size
351  total_size += sizeof(item) + item.description.length();
352  if (total_size > kListingSize)
353  break;
354  }
355  if (status == cvmfs::STATUS_OUTOFBOUNDS) {
356  ListingEnd(listing_id);
357  status = cvmfs::STATUS_OK;
358  } else {
359  msg_reply.set_is_last_part(false);
360  }
361  if (status != cvmfs::STATUS_OK) {
362  LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
363  }
364  msg_reply.set_status(status);
365  transport->SendFrame(&frame_send);
366 }
367 
368 
369 void CachePlugin::HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
370  CacheTransport *transport) {
371  const SessionCtxGuard session_guard(msg_req->session_id(), this);
372  cvmfs::MsgObjectInfoReply msg_reply;
373  CacheTransport::Frame frame_send(&msg_reply);
374 
375  msg_reply.set_req_id(msg_req->req_id());
376  shash::Any object_id;
377  const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
378  if (!retval) {
379  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
380  "malformed hash received from client");
381  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
382  } else {
383  ObjectInfo info;
384  const cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
385  msg_reply.set_status(status);
386  if (status == cvmfs::STATUS_OK) {
387  msg_reply.set_object_type(info.object_type);
388  msg_reply.set_size(info.size);
389  } else if (status != cvmfs::STATUS_NOENTRY) {
390  LogSessionError(msg_req->session_id(), status,
391  "failed retrieving object details");
392  }
393  }
394  transport->SendFrame(&frame_send);
395 }
396 
397 
398 void CachePlugin::HandleRead(cvmfs::MsgReadReq *msg_req,
399  CacheTransport *transport) {
400  const SessionCtxGuard session_guard(msg_req->session_id(), this);
401  cvmfs::MsgReadReply msg_reply;
402  CacheTransport::Frame frame_send(&msg_reply);
403 
404  msg_reply.set_req_id(msg_req->req_id());
405  shash::Any object_id;
406  const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
407  if (!retval || (msg_req->size() > max_object_size_)) {
408  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
409  "malformed hash received from client");
410  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
411  transport->SendFrame(&frame_send);
412  return;
413  }
414  unsigned size = msg_req->size();
415 #ifdef __APPLE__
416  unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
417 #else
418  unsigned char buffer[size];
419 #endif
420  const cvmfs::EnumStatus status =
421  Pread(object_id, msg_req->offset(), &size, buffer);
422  msg_reply.set_status(status);
423  if (status == cvmfs::STATUS_OK) {
424  frame_send.set_attachment(buffer, size);
425  } else {
426  LogSessionError(msg_req->session_id(), status,
427  "failed to read from object");
428  }
429  transport->SendFrame(&frame_send);
430 #ifdef __APPLE__
431  free(buffer);
432 #endif
433 }
434 
435 
436 void CachePlugin::HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
437  CacheTransport *transport) {
438  const SessionCtxGuard session_guard(msg_req->session_id(), this);
439  cvmfs::MsgRefcountReply msg_reply;
440  CacheTransport::Frame frame_send(&msg_reply);
441 
442  msg_reply.set_req_id(msg_req->req_id());
443  shash::Any object_id;
444  const bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
445  if (!retval) {
446  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
447  "malformed hash received from client");
448  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
449  } else {
450  const cvmfs::EnumStatus status =
451  ChangeRefcount(object_id, msg_req->change_by());
452  msg_reply.set_status(status);
453  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
454  LogSessionError(msg_req->session_id(), status,
455  "failed to open/close object " + object_id.ToString());
456  }
457  }
458  transport->SendFrame(&frame_send);
459 }
460 
461 
462 bool CachePlugin::HandleRequest(int fd_con) {
464  char buffer[max_object_size_];
465  CacheTransport::Frame frame_recv;
466  frame_recv.set_attachment(buffer, max_object_size_);
467  const bool retval = transport.RecvFrame(&frame_recv);
468  if (!retval) {
470  "failed to receive request from connection (%d)", errno);
471  return false;
472  }
473 
474  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
475 
476  if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
477  cvmfs::MsgHandshake *msg_req = reinterpret_cast<cvmfs::MsgHandshake *>(
478  msg_typed);
479  HandleHandshake(msg_req, &transport);
480  } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
481  cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
482  const map<uint64_t, SessionInfo>::const_iterator iter =
483  sessions_.find(msg_req->session_id());
484  if (iter != sessions_.end()) {
485  free(iter->second.reponame);
486  free(iter->second.client_instance);
487  }
488  sessions_.erase(msg_req->session_id());
489  return false;
490  } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
491  HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
492  } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
493  cvmfs::MsgRefcountReq *msg_req = reinterpret_cast<cvmfs::MsgRefcountReq *>(
494  msg_typed);
495  HandleRefcount(msg_req, &transport);
496  } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
497  cvmfs::MsgObjectInfoReq
498  *msg_req = reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
499  HandleObjectInfo(msg_req, &transport);
500  } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
501  cvmfs::MsgReadReq *msg_req = reinterpret_cast<cvmfs::MsgReadReq *>(
502  msg_typed);
503  HandleRead(msg_req, &transport);
504  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
505  cvmfs::MsgStoreReq *msg_req = reinterpret_cast<cvmfs::MsgStoreReq *>(
506  msg_typed);
507  HandleStore(msg_req, &frame_recv, &transport);
508  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
509  cvmfs::MsgStoreAbortReq
510  *msg_req = reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
511  HandleStoreAbort(msg_req, &transport);
512  } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
513  cvmfs::MsgInfoReq *msg_req = reinterpret_cast<cvmfs::MsgInfoReq *>(
514  msg_typed);
515  HandleInfo(msg_req, &transport);
516  } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
517  cvmfs::MsgShrinkReq *msg_req = reinterpret_cast<cvmfs::MsgShrinkReq *>(
518  msg_typed);
519  HandleShrink(msg_req, &transport);
520  } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
521  cvmfs::MsgListReq *msg_req = reinterpret_cast<cvmfs::MsgListReq *>(
522  msg_typed);
523  HandleList(msg_req, &transport);
524  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbStoreReq") {
525  cvmfs::MsgBreadcrumbStoreReq
526  *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbStoreReq *>(msg_typed);
527  HandleBreadcrumbStore(msg_req, &transport);
528  } else if (msg_typed->GetTypeName() == "cvmfs.MsgBreadcrumbLoadReq") {
529  cvmfs::MsgBreadcrumbLoadReq
530  *msg_req = reinterpret_cast<cvmfs::MsgBreadcrumbLoadReq *>(msg_typed);
531  HandleBreadcrumbLoad(msg_req, &transport);
532  } else {
534  "unexpected message from client: %s",
535  std::string(msg_typed->GetTypeName()).c_str());
536  return false;
537  }
538 
539  return true;
540 }
541 
542 
543 void CachePlugin::HandleShrink(cvmfs::MsgShrinkReq *msg_req,
544  CacheTransport *transport) {
545  const SessionCtxGuard session_guard(msg_req->session_id(), this);
546  cvmfs::MsgShrinkReply msg_reply;
547  CacheTransport::Frame frame_send(&msg_reply);
548 
549  msg_reply.set_req_id(msg_req->req_id());
550  uint64_t used_bytes = 0;
551  const cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
552  msg_reply.set_used_bytes(used_bytes);
553  msg_reply.set_status(status);
554  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
555  LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
556  }
557  transport->SendFrame(&frame_send);
558 }
559 
560 
561 void CachePlugin::HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
562  CacheTransport *transport) {
563  const SessionCtxGuard session_guard(msg_req->session_id(), this);
564  cvmfs::MsgStoreReply msg_reply;
565  CacheTransport::Frame frame_send(&msg_reply);
566  msg_reply.set_req_id(msg_req->req_id());
567  msg_reply.set_part_nr(0);
568  uint64_t txn_id;
569  const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
570  const bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
571  if (!retval) {
572  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
573  "malformed transaction id received from client");
574  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
575  } else {
576  const cvmfs::EnumStatus status = AbortTxn(txn_id);
577  msg_reply.set_status(status);
578  if (status != cvmfs::STATUS_OK) {
579  LogSessionError(msg_req->session_id(), status,
580  "failed to abort transaction");
581  }
582  txn_ids_.Erase(uniq_req);
583  }
584  transport->SendFrame(&frame_send);
585 }
586 
587 
588 void CachePlugin::HandleStore(cvmfs::MsgStoreReq *msg_req,
589  CacheTransport::Frame *frame,
590  CacheTransport *transport) {
591  const SessionCtxGuard session_guard(msg_req->session_id(), this);
592  cvmfs::MsgStoreReply msg_reply;
593  CacheTransport::Frame frame_send(&msg_reply);
594  msg_reply.set_req_id(msg_req->req_id());
595  msg_reply.set_part_nr(msg_req->part_nr());
596  shash::Any object_id;
597  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
598  if (!retval || (frame->att_size() > max_object_size_)
599  || ((frame->att_size() < max_object_size_) && !msg_req->last_part())) {
600  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
601  "malformed hash or bad object size received from client");
602  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
603  transport->SendFrame(&frame_send);
604  return;
605  }
606 
607  const UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
608  uint64_t txn_id;
609  cvmfs::EnumStatus status = cvmfs::STATUS_OK;
610  if (msg_req->part_nr() == 1) {
611  if (txn_ids_.Contains(uniq_req)) {
612  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
613  "invalid attempt to restart running transaction");
614  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
615  transport->SendFrame(&frame_send);
616  return;
617  }
618  txn_id = NextTxnId();
619  ObjectInfo info;
620  info.id = object_id;
621  if (msg_req->has_expected_size()) {
622  info.size = msg_req->expected_size();
623  }
624  if (msg_req->has_object_type()) {
625  info.object_type = msg_req->object_type();
626  }
627  if (msg_req->has_description()) {
628  info.description = msg_req->description();
629  }
630  status = StartTxn(object_id, txn_id, info);
631  if (status != cvmfs::STATUS_OK) {
632  LogSessionError(msg_req->session_id(), status,
633  "failed to start transaction");
634  msg_reply.set_status(status);
635  transport->SendFrame(&frame_send);
636  return;
637  }
638  txn_ids_.Insert(uniq_req, txn_id);
639  } else {
640  retval = txn_ids_.Lookup(uniq_req, &txn_id);
641  if (!retval) {
642  LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
643  "invalid transaction received from client");
644  msg_reply.set_status(cvmfs::STATUS_MALFORMED);
645  transport->SendFrame(&frame_send);
646  return;
647  }
648  }
649 
650  // TODO(jblomer): check part number and send objects up in order
651  if (frame->att_size() > 0) {
652  status = WriteTxn(txn_id,
653  reinterpret_cast<unsigned char *>(frame->attachment()),
654  frame->att_size());
655  if (status != cvmfs::STATUS_OK) {
656  LogSessionError(msg_req->session_id(), status, "failure writing object");
657  msg_reply.set_status(status);
658  transport->SendFrame(&frame_send);
659  return;
660  }
661  }
662 
663  if (msg_req->last_part()) {
664  status = CommitTxn(txn_id);
665  if (status != cvmfs::STATUS_OK) {
666  LogSessionError(msg_req->session_id(), status,
667  "failure committing object");
668  }
669  txn_ids_.Erase(uniq_req);
670  }
671  msg_reply.set_status(status);
672  transport->SendFrame(&frame_send);
673 }
674 
675 
676 bool CachePlugin::IsRunning() { return atomic_read32(&running_) != 0; }
677 
678 
679 bool CachePlugin::Listen(const string &locator) {
680  vector<string> tokens = SplitString(locator, '=');
681  if (tokens[0] == "unix") {
682  const string lock_path = tokens[1] + ".lock";
683  fd_socket_lock_ = TryLockFile(lock_path);
684  if (fd_socket_lock_ == -1) {
686  "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
688  return false;
689  } else if (fd_socket_lock_ == -2) {
690  // Another plugin process probably started in the meantime
692  if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
694  "failed to lock on %s, file is busy", lock_path.c_str());
695  }
696  return false;
697  }
698  assert(fd_socket_lock_ >= 0);
699  fd_socket_ = MakeSocket(tokens[1], 0600);
700  is_local_ = true;
701  } else if (tokens[0] == "tcp") {
702  vector<string> tcp_address = SplitString(tokens[1], ':');
703  if (tcp_address.size() != 2) {
704  LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, "invalid locator: %s",
705  locator.c_str());
707  return false;
708  }
709  fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
710  } else {
712  "unknown endpoint in locator: %s", locator.c_str());
714  return false;
715  }
716 
717  if (fd_socket_ < 0) {
718  if (errno == EADDRINUSE) {
719  // Another plugin process probably started in the meantime
721  } else {
723  "failed to create endpoint %s (%d)", locator.c_str(), errno);
725  }
726  is_local_ = false;
727  return false;
728  }
729  const int retval = listen(fd_socket_, 32);
730  assert(retval == 0);
731 
732  return true;
733 }
734 
735 
736 void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
737  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
738  const map<uint64_t, SessionInfo>::const_iterator iter =
739  sessions_.find(session_id);
740  if (iter != sessions_.end()) {
741  session_str = iter->second.name;
742  }
743  LogCvmfs(kLogCache, kLogDebug | kLogSyslog, "session '%s': %s",
744  session_str.c_str(), msg.c_str());
745 }
746 
747 
748 void CachePlugin::LogSessionError(uint64_t session_id,
749  cvmfs::EnumStatus status,
750  const std::string &msg) {
751  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
752  const map<uint64_t, SessionInfo>::const_iterator iter =
753  sessions_.find(session_id);
754  if (iter != sessions_.end()) {
755  session_str = iter->second.name;
756  }
757  LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr, "session '%s': %s (%d - %s)",
758  session_str.c_str(), msg.c_str(), status,
759  CacheTransportCode2Ascii(status));
760 }
761 
762 
764  CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
765 
766  platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
767 
768  vector<struct pollfd> watch_fds;
769  // Elements 0, 1: control pipe, socket fd
770  struct pollfd watch_ctrl;
771  watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
772  watch_ctrl.events = POLLIN | POLLPRI;
773  watch_fds.push_back(watch_ctrl);
774  struct pollfd watch_socket;
775  watch_socket.fd = cache_plugin->fd_socket_;
776  watch_socket.events = POLLIN | POLLPRI;
777  watch_fds.push_back(watch_socket);
778 
779  bool terminated = false;
780  while (!terminated) {
781  for (unsigned i = 0; i < watch_fds.size(); ++i)
782  watch_fds[i].revents = 0;
783  const int retval = poll(&watch_fds[0], watch_fds.size(), -1);
784  if (retval < 0) {
785  if (errno == EINTR)
786  continue;
787  PANIC(kLogSyslogErr | kLogDebug, "cache plugin connection failure (%d)",
788  errno);
789  }
790 
791  // Termination or detach
792  if (watch_fds[0].revents) {
793  char signal;
794  ReadPipe(watch_fds[0].fd, &signal, 1);
795  if (signal == kSignalDetach) {
796  cache_plugin->SendDetachRequests();
797  continue;
798  }
799 
800  // termination
801  if (watch_fds.size() > 2) {
803  "terminating external cache manager with pending connections");
804  }
805  break;
806  }
807 
808  // New connection
809  if (watch_fds[1].revents) {
810  struct sockaddr_un remote;
811  socklen_t socket_size = sizeof(remote);
812  const int fd_con =
813  accept(watch_fds[1].fd, (struct sockaddr *)&remote, &socket_size);
814  if (fd_con < 0) {
816  "failed to establish connection (%d)", errno);
817  continue;
818  }
819  struct pollfd watch_con;
820  watch_con.fd = fd_con;
821  watch_con.events = POLLIN | POLLPRI;
822  watch_fds.push_back(watch_con);
823  cache_plugin->connections_.insert(fd_con);
824  }
825 
826  // New request
827  for (unsigned i = 2; i < watch_fds.size();) {
828  if (watch_fds[i].revents) {
829  const bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
830  if (!proceed) {
831  close(watch_fds[i].fd);
832  cache_plugin->connections_.erase(watch_fds[i].fd);
833  watch_fds.erase(watch_fds.begin() + i);
834  if ((getenv(CacheTransport::kEnvReadyNotifyFd) != NULL)
835  && (cache_plugin->connections_.empty())
836  && (cache_plugin->num_inlimbo_clients_ == 0)) {
838  "stopping cache plugin, no more active clients");
839  terminated = true;
840  break;
841  }
842  } else {
843  i++;
844  }
845  } else {
846  i++;
847  }
848  }
849  }
850 
851  // 0, 1 being closed by destructor
852  for (unsigned i = 2; i < watch_fds.size(); ++i)
853  close(watch_fds[i].fd);
854  cache_plugin->txn_ids_.Clear();
855 
856  signal(SIGPIPE, save_sigpipe);
857  return NULL;
858 }
859 
860 
864 void CachePlugin::NotifySupervisor(char signal) {
865  char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
866  if (pipe_ready == NULL)
867  return;
868  const int fd_pipe_ready = String2Int64(pipe_ready);
869  WritePipe(fd_pipe_ready, &signal, 1);
870 }
871 
872 
873 void CachePlugin::ProcessRequests(unsigned num_workers) {
874  num_workers_ = num_workers;
875  const int retval =
876  pthread_create(&thread_io_, NULL, MainProcessRequests, this);
877  assert(retval == 0);
879  atomic_cas32(&running_, 0, 1);
880 }
881 
882 
884  set<int>::const_iterator iter = connections_.begin();
885  const set<int>::const_iterator iter_end = connections_.end();
886  for (; iter != iter_end; ++iter) {
887  CacheTransport transport(*iter,
890  cvmfs::MsgDetach msg_detach;
891  CacheTransport::Frame frame_send(&msg_detach);
892  transport.SendFrame(&frame_send);
893  }
894 }
895 
896 
898  if (IsRunning()) {
899  char terminate = kSignalTerminate;
900  WritePipe(pipe_ctrl_[1], &terminate, 1);
901  pthread_join(thread_io_, NULL);
902  atomic_cas32(&running_, 1, 0);
903  }
904 }
905 
906 
908  if (!IsRunning())
909  return;
910  pthread_join(thread_io_, NULL);
911 }
uint64_t pinned_bytes
Definition: channel.h:89
void Get(uint64_t *id, char **reponame, char **client_instance)
Definition: channel.cc:71
int MakeSocket(const std::string &path, const int mode)
Definition: posix.cc:327
void HandleBreadcrumbStore(cvmfs::MsgBreadcrumbStoreReq *msg_req, CacheTransport *transport)
Definition: channel.cc:197
virtual cvmfs::EnumStatus ListingNext(int64_t lst_id, ObjectInfo *item)=0
atomic_int64 next_session_id_
Definition: channel.h:253
void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:369
static void CleanupInstance()
Definition: channel.cc:32
uint64_t timestamp
Definition: manifest.h:39
void Set(uint64_t id, char *reponame, char *client_instance)
Definition: channel.cc:96
static const uint64_t kSizeUnknown
Definition: channel.h:70
int MakeTcpEndpoint(const std::string &ipv4_address, int portno)
Definition: posix.cc:384
shash::Any catalog_hash
Definition: manifest.h:38
#define PANIC(...)
Definition: exception.h:29
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
bool is_set
either not yet set or deliberately unset
Definition: channel.h:43
bool IsRunning()
Definition: channel.cc:676
uint64_t NextLstId()
Definition: channel.h:206
void HandleIoctl(cvmfs::MsgIoctl *msg_req)
Definition: channel.cc:297
std::set< int > connections_
Definition: channel.h:257
virtual ~CachePlugin()
Definition: channel.cc:187
uint64_t capabilities_
Definition: channel.h:241
std::map< uint64_t, SessionInfo > sessions_
Definition: channel.h:258
uint64_t capabilities() const
Definition: channel.h:102
bool IsSet()
Definition: channel.cc:86
void ProcessRequests(unsigned num_workers)
Definition: channel.cc:873
cvmfs::EnumObjectType object_type
Definition: channel.h:80
virtual cvmfs::EnumStatus GetInfo(Info *info)=0
void Unset()
Definition: channel.cc:133
static SessionCtx * instance_
Definition: channel.h:56
static const unsigned kDefaultMaxObjectSize
Definition: channel.h:138
assert((mem||(size==0))&&"Out Of Memory")
void SendFrame(Frame *frame)
virtual cvmfs::EnumStatus LoadBreadcrumb(const std::string &fqrn, manifest::Breadcrumb *breadcrumb)=0
static const char kReadyNotification
static uint32_t HashUniqueRequest(const UniqueRequest &req)
Definition: channel.h:207
void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport)
Definition: channel.cc:543
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
bool is_local_
Definition: channel.h:240
static const uint32_t kFlagSendIgnoreFailure
virtual cvmfs::EnumStatus StartTxn(const shash::Any &id, const uint64_t txn_id, const ObjectInfo &info)=0
CachePlugin(uint64_t capabilities)
Definition: channel.cc:167
uint64_t num_inlimbo_clients_
Definition: channel.h:251
virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id, unsigned char *buffer, uint32_t size)=0
void HandleRefcount(cvmfs::MsgRefcountReq *msg_req, CacheTransport *transport)
Definition: channel.cc:436
uint64_t revision
Definition: manifest.h:40
bool Listen(const std::string &locator)
Definition: channel.cc:679
void FillMsgHash(const shash::Any &hash, cvmfs::MsgHash *msg_hash)
virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id)=0
int64_t String2Int64(const string &value)
Definition: string.cc:234
void Terminate()
Definition: channel.cc:897
uint64_t NextTxnId()
Definition: channel.h:205
void AskToDetach()
Definition: channel.cc:161
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
static const unsigned kListingSize
Definition: channel.h:139
#define platform_sighandler_t
static void TlsDestructor(void *data)
Definition: channel.cc:115
void NotifySupervisor(char signal)
Definition: channel.cc:864
void * attachment() const
unsigned max_object_size_
Definition: channel.h:246
bool ParseMsgHash(const cvmfs::MsgHash &msg_hash, shash::Any *hash)
virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id)=0
int TryLockFile(const std::string &path)
Definition: posix.cc:922
uint32_t att_size() const
int64_t no_shrink
Definition: channel.h:90
static const char kSignalDetach
Definition: channel.h:141
std::string description
Definition: channel.h:82
virtual cvmfs::EnumStatus StoreBreadcrumb(const std::string &fqrn, const manifest::Breadcrumb &breadcrumb)=0
pthread_t thread_io_
Definition: channel.h:259
void LogSessionError(uint64_t session_id, cvmfs::EnumStatus status, const std::string &msg)
Definition: channel.cc:748
int fd_socket_lock_
Definition: channel.h:243
string StringifyInt(const int64_t value)
Definition: string.cc:77
static const unsigned kPbProtocolVersion
Definition: channel.h:69
google::protobuf::MessageLite * GetMsgTyped()
virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to, uint64_t *used_bytes)=0
void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req, CacheTransport *transport)
Definition: channel.cc:561
SessionCtx()
Definition: channel.cc:38
int fd_socket_
Definition: channel.h:242
static const char kFailureNotification
virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id, cvmfs::EnumObjectType type)=0
void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport)
Definition: channel.cc:315
void HandleRead(cvmfs::MsgReadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:398
void HandleHandshake(cvmfs::MsgHandshake *msg_req, CacheTransport *transport)
Definition: channel.cc:251
void SendDetachRequests()
Definition: channel.cc:883
SmallHashDynamic< UniqueRequest, uint64_t > txn_ids_
Definition: channel.h:256
atomic_int64 next_lst_id_
Definition: channel.h:255
bool RecvFrame(Frame *frame)
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
uint64_t used_bytes
Definition: channel.h:88
atomic_int32 running_
Definition: channel.h:244
Definition: mutex.h:42
unsigned num_workers_
Definition: channel.h:245
~SessionCtx()
Definition: channel.cc:46
void HandleStore(cvmfs::MsgStoreReq *msg_req, CacheTransport::Frame *frame, CacheTransport *transport)
Definition: channel.cc:588
bool IsValid() const
Definition: manifest.h:33
virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id)=0
static void * MainProcessRequests(void *data)
Definition: channel.cc:763
void set_attachment(void *attachment, uint32_t att_size)
virtual cvmfs::EnumStatus Pread(const shash::Any &id, uint64_t offset, uint32_t *size, unsigned char *buffer)=0
void WaitFor()
Definition: channel.cc:907
static const char kSignalTerminate
Definition: channel.h:140
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
static void size_t size
Definition: smalloc.h:54
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
const char * CacheTransportCode2Ascii(const cvmfs::EnumStatus code)
virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id, int32_t change_by)=0
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id, ObjectInfo *info)=0
std::string name_
Definition: channel.h:252
atomic_int64 next_txn_id_
Definition: channel.h:254
void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport)
Definition: channel.cc:275
static const char * kEnvReadyNotifyFd
int pipe_ctrl_[2]
Definition: channel.h:260
void LogSessionInfo(uint64_t session_id, const std::string &msg)
Definition: channel.cc:736
uint64_t NextSessionId()
Definition: channel.h:202
void HandleBreadcrumbLoad(cvmfs::MsgBreadcrumbLoadReq *msg_req, CacheTransport *transport)
Definition: channel.cc:226
static const uint32_t kFlagSendNonBlocking
void UnlockFile(const int filedes)
Definition: posix.cc:1004
bool HandleRequest(int fd_con)
Definition: channel.cc:462
uint64_t size_bytes
Definition: channel.h:87
static SessionCtx * GetInstance()
Definition: channel.cc:59
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545