GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/cache_plugin/channel.cc Lines: 412 506 81.4 %
Date: 2019-02-03 02:48:13 Branches: 146 242 60.3 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
#include "cvmfs_config.h"
5
#include "channel.h"
6
7
#include <errno.h>
8
#include <poll.h>
9
#include <signal.h>
10
#include <sys/socket.h>
11
#include <sys/un.h>
12
#include <unistd.h>
13
14
#include <cassert>
15
#include <cstring>
16
#include <vector>
17
18
#include "logging.h"
19
#include "platform.h"
20
#include "smalloc.h"
21
#include "util/pointer.h"
22
#include "util/posix.h"
23
#include "util/string.h"
24
#include "util_concurrency.h"
25
26
using namespace std;  // NOLINT
27
28
29
SessionCtx *SessionCtx::instance_ = NULL;
30
31
void SessionCtx::CleanupInstance() {
32
  delete instance_;
33
  instance_ = NULL;
34
}
35
36
37
1
SessionCtx::SessionCtx() {
38
  lock_tls_blocks_ = reinterpret_cast<pthread_mutex_t *>(
39
1
    smalloc(sizeof(pthread_mutex_t)));
40
1
  int retval = pthread_mutex_init(lock_tls_blocks_, NULL);
41
1
  assert(retval == 0);
42
1
}
43
44
45
SessionCtx::~SessionCtx() {
46
  pthread_mutex_destroy(lock_tls_blocks_);
47
  free(lock_tls_blocks_);
48
49
  for (unsigned i = 0; i < tls_blocks_.size(); ++i) {
50
    delete tls_blocks_[i];
51
  }
52
53
  int retval = pthread_key_delete(thread_local_storage_);
54
  assert(retval == 0);
55
}
56
57
58
9604
SessionCtx *SessionCtx::GetInstance() {
59
9604
  if (instance_ == NULL) {
60
1
    instance_ = new SessionCtx();
61
    int retval =
62
1
      pthread_key_create(&instance_->thread_local_storage_, TlsDestructor);
63
1
    assert(retval == 0);
64
  }
65
66
9604
  return instance_;
67
}
68
69
70
2776
void SessionCtx::Get(uint64_t *id, char **reponame, char **client_instance) {
71
  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
72
2776
    pthread_getspecific(thread_local_storage_));
73

2777
  if ((tls == NULL) || !tls->is_set) {
74
1
    *id = 0;
75
1
    *reponame = NULL;
76
1
    *client_instance = NULL;
77
  } else {
78
2775
    *id = tls->id;
79
2775
    *reponame = tls->reponame;
80
2775
    *client_instance = tls->client_instance;
81
  }
82
2776
}
83
84
85
bool SessionCtx::IsSet() {
86
  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
87
    pthread_getspecific(thread_local_storage_));
88
  if (tls == NULL)
89
    return false;
90
91
  return tls->is_set;
92
}
93
94
95
3414
void SessionCtx::Set(uint64_t id, char *reponame, char *client_instance) {
96
  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
97
3414
    pthread_getspecific(thread_local_storage_));
98
99
3414
  if (tls == NULL) {
100
14
    tls = new ThreadLocalStorage(id, reponame, client_instance);
101
14
    int retval = pthread_setspecific(thread_local_storage_, tls);
102
14
    assert(retval == 0);
103
14
    MutexLockGuard lock_guard(lock_tls_blocks_);
104
14
    tls_blocks_.push_back(tls);
105
  } else {
106
3400
    tls->id = id;
107
3400
    tls->reponame = reponame;
108
3400
    tls->client_instance = client_instance;
109
3400
    tls->is_set = true;
110
  }
111
3414
}
112
113
114
14
void SessionCtx::TlsDestructor(void *data) {
115
14
  ThreadLocalStorage *tls = static_cast<SessionCtx::ThreadLocalStorage *>(data);
116
14
  delete tls;
117
118
14
  assert(instance_);
119
14
  MutexLockGuard lock_guard(instance_->lock_tls_blocks_);
120
14
  for (vector<ThreadLocalStorage *>::iterator i =
121
14
       instance_->tls_blocks_.begin(), iEnd = instance_->tls_blocks_.end();
122
       i != iEnd; ++i)
123
  {
124
14
    if ((*i) == tls) {
125
14
      instance_->tls_blocks_.erase(i);
126
14
      break;
127
    }
128
14
  }
129
14
}
130
131
132
3414
void SessionCtx::Unset() {
133
  ThreadLocalStorage *tls = static_cast<ThreadLocalStorage *>(
134
3414
    pthread_getspecific(thread_local_storage_));
135
3414
  if (tls != NULL) {
136
3414
    tls->is_set = false;
137
3414
    tls->id = 0;
138
3414
    tls->reponame = NULL;
139
3414
    tls->client_instance = NULL;
140
  }
141
3414
}
142
143
144
//------------------------------------------------------------------------------
145
146
147
17
CachePlugin::SessionInfo::SessionInfo(uint64_t id, const std::string &name)
148
  : id(id)
149
17
  , name(name)
150
{
151
17
  vector<string> tokens = SplitString(name, ':');
152
17
  reponame = strdup(tokens[0].c_str());
153
17
  if (tokens.size() > 1)
154
15
    client_instance = strdup(tokens[1].c_str());
155
  else
156
2
    client_instance = NULL;
157
}
158
159
const uint64_t CachePlugin::kSizeUnknown = uint64_t(-1);
160
161
162
1002
void CachePlugin::AskToDetach() {
163
1002
  char detach = kSignalDetach;
164
1002
  WritePipe(pipe_ctrl_[1], &detach, 1);
165
1002
}
166
167
168
16
CachePlugin::CachePlugin(uint64_t capabilities)
169
  : is_local_(false)
170
  , capabilities_(capabilities)
171
  , fd_socket_(-1)
172
  , fd_socket_lock_(-1)
173
  , running_(0)
174
  , num_workers_(0)
175
  , max_object_size_(kDefaultMaxObjectSize)
176
16
  , num_inlimbo_clients_(0)
177
{
178
16
  atomic_init64(&next_session_id_);
179
16
  atomic_init64(&next_txn_id_);
180
16
  atomic_init64(&next_lst_id_);
181
  // Don't use listing id zero
182
16
  atomic_inc64(&next_lst_id_);
183
16
  txn_ids_.Init(128, UniqueRequest(), HashUniqueRequest);
184
16
  memset(&thread_io_, 0, sizeof(thread_io_));
185
16
  MakePipe(pipe_ctrl_);
186
}
187
188
189
16
CachePlugin::~CachePlugin() {
190
16
  Terminate();
191
16
  ClosePipe(pipe_ctrl_);
192
16
  if (fd_socket_ >= 0)
193
16
    close(fd_socket_);
194
16
  if (fd_socket_lock_ >= 0)
195
16
    UnlockFile(fd_socket_lock_);
196
}
197
198
199
17
void CachePlugin::HandleHandshake(
200
  cvmfs::MsgHandshake *msg_req,
201
  CacheTransport *transport)
202
{
203
17
  uint64_t session_id = NextSessionId();
204
17
  if (msg_req->has_name()) {
205
17
    sessions_[session_id] = SessionInfo(session_id, msg_req->name());
206
  } else {
207
    sessions_[session_id] = SessionInfo(session_id,
208
      "anonymous client (" + StringifyInt(session_id) + ")");
209
  }
210
17
  cvmfs::MsgHandshakeAck msg_ack;
211
17
  CacheTransport::Frame frame_send(&msg_ack);
212
213
17
  msg_ack.set_status(cvmfs::STATUS_OK);
214
17
  msg_ack.set_name(name_);
215
17
  msg_ack.set_protocol_version(kPbProtocolVersion);
216
17
  msg_ack.set_max_object_size(max_object_size_);
217
17
  msg_ack.set_session_id(session_id);
218
17
  msg_ack.set_capabilities(capabilities_);
219
17
  if (is_local_)
220
17
    msg_ack.set_pid(getpid());
221
17
  transport->SendFrame(&frame_send);
222
17
}
223
224
225
4
void CachePlugin::HandleInfo(
226
  cvmfs::MsgInfoReq *msg_req,
227
  CacheTransport *transport)
228
{
229
4
  SessionCtxGuard session_guard(msg_req->session_id(), this);
230
4
  cvmfs::MsgInfoReply msg_reply;
231
4
  CacheTransport::Frame frame_send(&msg_reply);
232
233
4
  msg_reply.set_req_id(msg_req->req_id());
234
4
  Info info;
235
4
  cvmfs::EnumStatus status = GetInfo(&info);
236
4
  if (status != cvmfs::STATUS_OK) {
237
    LogSessionError(msg_req->session_id(), status,
238
                    "failed to query cache status");
239
  }
240
4
  msg_reply.set_size_bytes(info.size_bytes);
241
4
  msg_reply.set_used_bytes(info.used_bytes);
242
4
  msg_reply.set_pinned_bytes(info.pinned_bytes);
243
4
  msg_reply.set_no_shrink(info.no_shrink);
244
4
  msg_reply.set_status(status);
245
4
  transport->SendFrame(&frame_send);
246
4
}
247
248
249
4
void CachePlugin::HandleIoctl(cvmfs::MsgIoctl *msg_req) {
250
4
  if (!msg_req->has_conncnt_change_by())
251
    return;
252
4
  int32_t conncnt_change_by = msg_req->conncnt_change_by();
253
4
  if ((static_cast<int32_t>(num_inlimbo_clients_) + conncnt_change_by) < 0) {
254
    LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
255
                    "invalid request to drop connection counter below zero");
256
    return;
257
  }
258
4
  if (conncnt_change_by > 0) {
259
2
    LogSessionInfo(msg_req->session_id(), "lock session beyond lifetime");
260
  } else {
261
2
    LogSessionInfo(msg_req->session_id(), "release session lock");
262
  }
263
4
  num_inlimbo_clients_ += conncnt_change_by;
264
}
265
266
267
8
void CachePlugin::HandleList(
268
  cvmfs::MsgListReq *msg_req,
269
  CacheTransport *transport)
270
{
271
8
  SessionCtxGuard session_guard(msg_req->session_id(), this);
272
8
  cvmfs::MsgListReply msg_reply;
273
8
  CacheTransport::Frame frame_send(&msg_reply);
274
275
8
  msg_reply.set_req_id(msg_req->req_id());
276
8
  int64_t listing_id = msg_req->listing_id();
277
8
  msg_reply.set_listing_id(listing_id);
278
8
  msg_reply.set_is_last_part(true);
279
280
  cvmfs::EnumStatus status;
281
8
  if (msg_req->listing_id() == 0) {
282
6
    listing_id = NextLstId();
283
6
    status = ListingBegin(listing_id, msg_req->object_type());
284
6
    if (status != cvmfs::STATUS_OK) {
285
      LogSessionError(msg_req->session_id(), status,
286
                      "failed to start enumeration of objects");
287
      msg_reply.set_status(status);
288
      transport->SendFrame(&frame_send);
289
      return;
290
    }
291
6
    msg_reply.set_listing_id(listing_id);
292
  }
293
8
  assert(listing_id != 0);
294
295
8
  ObjectInfo item;
296
8
  unsigned total_size = 0;
297
200014
  while ((status = ListingNext(listing_id, &item)) == cvmfs::STATUS_OK) {
298
200000
    cvmfs::MsgListRecord *msg_list_record = msg_reply.add_list_record();
299
200000
    cvmfs::MsgHash *msg_hash = new cvmfs::MsgHash();
300
200000
    transport->FillMsgHash(item.id, msg_hash);
301
200000
    msg_list_record->set_allocated_hash(msg_hash);
302
200000
    msg_list_record->set_pinned(item.pinned);
303
200000
    msg_list_record->set_description(item.description);
304
    // Approximation of the message size
305
200000
    total_size += sizeof(item) + item.description.length();
306
200000
    if (total_size > kListingSize)
307
2
      break;
308
  }
309
8
  if (status == cvmfs::STATUS_OUTOFBOUNDS) {
310
6
    ListingEnd(listing_id);
311
6
    status = cvmfs::STATUS_OK;
312
  } else {
313
2
    msg_reply.set_is_last_part(false);
314
  }
315
8
  if (status != cvmfs::STATUS_OK) {
316
    LogSessionError(msg_req->session_id(), status, "failed enumerate objects");
317
  }
318
8
  msg_reply.set_status(status);
319

8
  transport->SendFrame(&frame_send);
320
}
321
322
323
16
void CachePlugin::HandleObjectInfo(
324
  cvmfs::MsgObjectInfoReq *msg_req,
325
  CacheTransport *transport)
326
{
327
16
  SessionCtxGuard session_guard(msg_req->session_id(), this);
328
16
  cvmfs::MsgObjectInfoReply msg_reply;
329
16
  CacheTransport::Frame frame_send(&msg_reply);
330
331
16
  msg_reply.set_req_id(msg_req->req_id());
332
16
  shash::Any object_id;
333
16
  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
334
16
  if (!retval) {
335
    LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
336
                    "malformed hash received from client");
337
    msg_reply.set_status(cvmfs::STATUS_MALFORMED);
338
  } else {
339
16
    ObjectInfo info;
340
16
    cvmfs::EnumStatus status = GetObjectInfo(object_id, &info);
341
16
    msg_reply.set_status(status);
342
16
    if (status == cvmfs::STATUS_OK) {
343
15
      msg_reply.set_object_type(info.object_type);
344
15
      msg_reply.set_size(info.size);
345
1
    } else if (status != cvmfs::STATUS_NOENTRY) {
346
      LogSessionError(msg_req->session_id(), status,
347
1
                      "failed retrieving object details");
348
    }
349
  }
350
16
  transport->SendFrame(&frame_send);
351
16
}
352
353
354
2211
void CachePlugin::HandleRead(
355
  cvmfs::MsgReadReq *msg_req,
356
  CacheTransport *transport)
357
{
358
2211
  SessionCtxGuard session_guard(msg_req->session_id(), this);
359
2211
  cvmfs::MsgReadReply msg_reply;
360
2211
  CacheTransport::Frame frame_send(&msg_reply);
361
362
2211
  msg_reply.set_req_id(msg_req->req_id());
363
2211
  shash::Any object_id;
364
2211
  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
365

2211
  if (!retval || (msg_req->size() > max_object_size_)) {
366
    LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
367
                    "malformed hash received from client");
368
    msg_reply.set_status(cvmfs::STATUS_MALFORMED);
369
    transport->SendFrame(&frame_send);
370
    return;
371
  }
372
2211
  unsigned size = msg_req->size();
373
#ifdef __APPLE__
374
  unsigned char *buffer = reinterpret_cast<unsigned char *>(smalloc(size));
375
#else
376
2211
  unsigned char buffer[size];
377
#endif
378
2211
  cvmfs::EnumStatus status = Pread(object_id, msg_req->offset(), &size, buffer);
379
2211
  msg_reply.set_status(status);
380
2211
  if (status == cvmfs::STATUS_OK) {
381
2210
    frame_send.set_attachment(buffer, size);
382
  } else {
383
    LogSessionError(msg_req->session_id(), status,
384
1
                    "failed to read from object");
385
  }
386

2211
  transport->SendFrame(&frame_send);
387
#ifdef __APPLE__
388
  free(buffer);
389
#endif
390
}
391
392
393
564
void CachePlugin::HandleRefcount(
394
  cvmfs::MsgRefcountReq *msg_req,
395
  CacheTransport *transport)
396
{
397
564
  SessionCtxGuard session_guard(msg_req->session_id(), this);
398
564
  cvmfs::MsgRefcountReply msg_reply;
399
564
  CacheTransport::Frame frame_send(&msg_reply);
400
401
564
  msg_reply.set_req_id(msg_req->req_id());
402
564
  shash::Any object_id;
403
564
  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
404
564
  if (!retval) {
405
    LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
406
                    "malformed hash received from client");
407
    msg_reply.set_status(cvmfs::STATUS_MALFORMED);
408
  } else {
409
564
    cvmfs::EnumStatus status = ChangeRefcount(object_id, msg_req->change_by());
410
564
    msg_reply.set_status(status);
411

564
    if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_NOENTRY)) {
412
      LogSessionError(msg_req->session_id(), status,
413
1
                      "failed to open/close object");
414
    }
415
  }
416
564
  transport->SendFrame(&frame_send);
417
564
}
418
419
420
3447
bool CachePlugin::HandleRequest(int fd_con) {
421
3447
  CacheTransport transport(fd_con, CacheTransport::kFlagSendIgnoreFailure);
422
3447
  char buffer[max_object_size_];
423
3447
  CacheTransport::Frame frame_recv;
424
3447
  frame_recv.set_attachment(buffer, max_object_size_);
425
3447
  bool retval = transport.RecvFrame(&frame_recv);
426
3447
  if (!retval) {
427
    LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
428
             "failed to receive request from connection (%d)", errno);
429
    return false;
430
  }
431
432
3447
  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
433
434
3447
  if (msg_typed->GetTypeName() == "cvmfs.MsgHandshake") {
435
    cvmfs::MsgHandshake *msg_req =
436
17
      reinterpret_cast<cvmfs::MsgHandshake *>(msg_typed);
437
17
    HandleHandshake(msg_req, &transport);
438
3430
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgQuit") {
439
12
    cvmfs::MsgQuit *msg_req = reinterpret_cast<cvmfs::MsgQuit *>(msg_typed);
440
    map<uint64_t, SessionInfo>::const_iterator iter =
441
12
      sessions_.find(msg_req->session_id());
442
12
    if (iter != sessions_.end()) {
443
12
      free(iter->second.reponame);
444
12
      free(iter->second.client_instance);
445
    }
446
12
    sessions_.erase(msg_req->session_id());
447
12
    return false;
448
3418
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgIoctl") {
449
4
    HandleIoctl(reinterpret_cast<cvmfs::MsgIoctl *>(msg_typed));
450
3414
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgRefcountReq") {
451
    cvmfs::MsgRefcountReq *msg_req =
452
564
      reinterpret_cast<cvmfs::MsgRefcountReq *>(msg_typed);
453
564
    HandleRefcount(msg_req, &transport);
454
2850
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgObjectInfoReq") {
455
    cvmfs::MsgObjectInfoReq *msg_req =
456
16
      reinterpret_cast<cvmfs::MsgObjectInfoReq *>(msg_typed);
457
16
    HandleObjectInfo(msg_req, &transport);
458
2834
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgReadReq") {
459
    cvmfs::MsgReadReq *msg_req =
460
2211
      reinterpret_cast<cvmfs::MsgReadReq *>(msg_typed);
461
2211
    HandleRead(msg_req, &transport);
462
623
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreReq") {
463
    cvmfs::MsgStoreReq *msg_req =
464
608
      reinterpret_cast<cvmfs::MsgStoreReq *>(msg_typed);
465
608
    HandleStore(msg_req, &frame_recv, &transport);
466
15
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgStoreAbortReq") {
467
    cvmfs::MsgStoreAbortReq *msg_req =
468
1
      reinterpret_cast<cvmfs::MsgStoreAbortReq *>(msg_typed);
469
1
    HandleStoreAbort(msg_req, &transport);
470
14
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgInfoReq") {
471
    cvmfs::MsgInfoReq *msg_req =
472
4
      reinterpret_cast<cvmfs::MsgInfoReq *>(msg_typed);
473
4
    HandleInfo(msg_req, &transport);
474
10
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgShrinkReq") {
475
    cvmfs::MsgShrinkReq *msg_req =
476
2
      reinterpret_cast<cvmfs::MsgShrinkReq *>(msg_typed);
477
2
    HandleShrink(msg_req, &transport);
478
8
  } else if (msg_typed->GetTypeName() == "cvmfs.MsgListReq") {
479
    cvmfs::MsgListReq *msg_req =
480
8
      reinterpret_cast<cvmfs::MsgListReq *>(msg_typed);
481
8
    HandleList(msg_req, &transport);
482
  } else {
483
    LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
484
             "unexpected message from client: %s",
485
             msg_typed->GetTypeName().c_str());
486
    return false;
487
  }
488
489
3435
  return true;
490
}
491
492
493
2
void CachePlugin::HandleShrink(
494
  cvmfs::MsgShrinkReq *msg_req,
495
  CacheTransport *transport)
496
{
497
2
  SessionCtxGuard session_guard(msg_req->session_id(), this);
498
2
  cvmfs::MsgShrinkReply msg_reply;
499
2
  CacheTransport::Frame frame_send(&msg_reply);
500
501
2
  msg_reply.set_req_id(msg_req->req_id());
502
2
  uint64_t used_bytes = 0;
503
2
  cvmfs::EnumStatus status = Shrink(msg_req->shrink_to(), &used_bytes);
504
2
  msg_reply.set_used_bytes(used_bytes);
505
2
  msg_reply.set_status(status);
506

2
  if ((status != cvmfs::STATUS_OK) && (status != cvmfs::STATUS_PARTIAL)) {
507
    LogSessionError(msg_req->session_id(), status, "failed to cleanup cache");
508
  }
509
2
  transport->SendFrame(&frame_send);
510
2
}
511
512
513
1
void CachePlugin::HandleStoreAbort(
514
  cvmfs::MsgStoreAbortReq *msg_req,
515
  CacheTransport *transport)
516
{
517
1
  SessionCtxGuard session_guard(msg_req->session_id(), this);
518
1
  cvmfs::MsgStoreReply msg_reply;
519
1
  CacheTransport::Frame frame_send(&msg_reply);
520
1
  msg_reply.set_req_id(msg_req->req_id());
521
1
  msg_reply.set_part_nr(0);
522
  uint64_t txn_id;
523
1
  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
524
1
  bool retval = txn_ids_.Lookup(uniq_req, &txn_id);
525
1
  if (!retval) {
526
    LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
527
                    "malformed transaction id received from client");
528
    msg_reply.set_status(cvmfs::STATUS_MALFORMED);
529
  } else {
530
1
    cvmfs::EnumStatus status = AbortTxn(txn_id);
531
1
    msg_reply.set_status(status);
532
1
    if (status != cvmfs::STATUS_OK) {
533
      LogSessionError(msg_req->session_id(), status,
534
                      "failed to abort transaction");
535
    }
536
1
    txn_ids_.Erase(uniq_req);
537
  }
538
1
  transport->SendFrame(&frame_send);
539
1
}
540
541
542
608
void CachePlugin::HandleStore(
543
  cvmfs::MsgStoreReq *msg_req,
544
  CacheTransport::Frame *frame,
545
  CacheTransport *transport)
546
{
547
608
  SessionCtxGuard session_guard(msg_req->session_id(), this);
548
608
  cvmfs::MsgStoreReply msg_reply;
549
608
  CacheTransport::Frame frame_send(&msg_reply);
550
608
  msg_reply.set_req_id(msg_req->req_id());
551
608
  msg_reply.set_part_nr(msg_req->part_nr());
552
608
  shash::Any object_id;
553
608
  bool retval = transport->ParseMsgHash(msg_req->object_id(), &object_id);
554


608
  if ( !retval ||
555
       (frame->att_size() > max_object_size_) ||
556
       ((frame->att_size() < max_object_size_) && !msg_req->last_part()) )
557
  {
558
    LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
559
                    "malformed hash or bad object size received from client");
560
    msg_reply.set_status(cvmfs::STATUS_MALFORMED);
561
    transport->SendFrame(&frame_send);
562
    return;
563
  }
564
565
608
  UniqueRequest uniq_req(msg_req->session_id(), msg_req->req_id());
566
  uint64_t txn_id;
567
608
  cvmfs::EnumStatus status = cvmfs::STATUS_OK;
568
608
  if (msg_req->part_nr() == 1) {
569
7
    if (txn_ids_.Contains(uniq_req)) {
570
      LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
571
                      "invalid attempt to restart running transaction");
572
      msg_reply.set_status(cvmfs::STATUS_MALFORMED);
573
      transport->SendFrame(&frame_send);
574
      return;
575
    }
576
7
    txn_id = NextTxnId();
577
7
    ObjectInfo info;
578
7
    info.id = object_id;
579
7
    if (msg_req->has_expected_size()) {info.size = msg_req->expected_size();}
580
7
    if (msg_req->has_object_type()) {info.object_type = msg_req->object_type();}
581
7
    if (msg_req->has_description()) {info.description = msg_req->description();}
582
7
    status = StartTxn(object_id, txn_id, info);
583
7
    if (status != cvmfs::STATUS_OK) {
584
      LogSessionError(msg_req->session_id(), status,
585
                      "failed to start transaction");
586
      msg_reply.set_status(status);
587
      transport->SendFrame(&frame_send);
588
      return;
589
    }
590
7
    txn_ids_.Insert(uniq_req, txn_id);
591
  } else {
592
601
    retval = txn_ids_.Lookup(uniq_req, &txn_id);
593
601
    if (!retval) {
594
      LogSessionError(msg_req->session_id(), cvmfs::STATUS_MALFORMED,
595
                      "invalid transaction received from client");
596
      msg_reply.set_status(cvmfs::STATUS_MALFORMED);
597
      transport->SendFrame(&frame_send);
598
      return;
599
    }
600
  }
601
602
  // TODO(jblomer): check part number and send objects up in order
603
608
  if (frame->att_size() > 0) {
604
    status = WriteTxn(txn_id,
605
                      reinterpret_cast<unsigned char *>(frame->attachment()),
606
607
                      frame->att_size());
607
607
    if (status != cvmfs::STATUS_OK) {
608
      LogSessionError(msg_req->session_id(), status, "failure writing object");
609
      msg_reply.set_status(status);
610
      transport->SendFrame(&frame_send);
611
      return;
612
    }
613
  }
614
615
608
  if (msg_req->last_part()) {
616
6
    status = CommitTxn(txn_id);
617
6
    if (status != cvmfs::STATUS_OK) {
618
      LogSessionError(msg_req->session_id(), status,
619
                      "failure committing object");
620
    }
621
6
    txn_ids_.Erase(uniq_req);
622
  }
623
608
  msg_reply.set_status(status);
624

608
  transport->SendFrame(&frame_send);
625
}
626
627
628
16
bool CachePlugin::IsRunning() {
629
16
  return atomic_read32(&running_) != 0;
630
}
631
632
633
16
bool CachePlugin::Listen(const string &locator) {
634
16
  vector<string> tokens = SplitString(locator, '=');
635
16
  if (tokens[0] == "unix") {
636
16
    string lock_path = tokens[1] + ".lock";
637
16
    fd_socket_lock_ = TryLockFile(lock_path);
638
16
    if (fd_socket_lock_ == -1) {
639
      LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
640
               "failed to acquire lock file %s (%d)", lock_path.c_str(), errno);
641
      NotifySupervisor(CacheTransport::kFailureNotification);
642
      return false;
643
16
    } else if (fd_socket_lock_ == -2) {
644
      // Another plugin process probably started in the meantime
645
      NotifySupervisor(CacheTransport::kReadyNotification);
646
      if (getenv(CacheTransport::kEnvReadyNotifyFd) == NULL) {
647
        LogCvmfs(kLogCache, kLogSyslogErr | kLogStderr,
648
                 "failed to lock on %s, file is busy", lock_path.c_str());
649
      }
650
      return false;
651
    }
652
16
    assert(fd_socket_lock_ >= 0);
653
16
    fd_socket_ = MakeSocket(tokens[1], 0600);
654
16
    is_local_ = true;
655
  } else if (tokens[0] == "tcp") {
656
    vector<string> tcp_address = SplitString(tokens[1], ':');
657
    if (tcp_address.size() != 2) {
658
      LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
659
               "invalid locator: %s", locator.c_str());
660
      NotifySupervisor(CacheTransport::kFailureNotification);
661
      return false;
662
    }
663
    fd_socket_ = MakeTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
664
  } else {
665
    LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
666
             "unknown endpoint in locator: %s", locator.c_str());
667
    NotifySupervisor(CacheTransport::kFailureNotification);
668
    return false;
669
  }
670
671
16
  if (fd_socket_ < 0) {
672
    if (errno == EADDRINUSE) {
673
      // Another plugin process probably started in the meantime
674
      NotifySupervisor(CacheTransport::kReadyNotification);
675
    } else {
676
      LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
677
               "failed to create endpoint %s (%d)", locator.c_str(), errno);
678
      NotifySupervisor(CacheTransport::kFailureNotification);
679
    }
680
    is_local_ = false;
681
    return false;
682
  }
683
16
  int retval = listen(fd_socket_, 32);
684
16
  assert(retval == 0);
685
686
16
  return true;
687
}
688
689
690
4
void CachePlugin::LogSessionInfo(uint64_t session_id, const string &msg) {
691
4
  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
692
4
  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
693
4
  if (iter != sessions_.end()) {
694
4
    session_str = iter->second.name;
695
  }
696
  LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
697
4
           "session '%s': %s", session_str.c_str(), msg.c_str());
698
4
}
699
700
701
3
void CachePlugin::LogSessionError(
702
  uint64_t session_id,
703
  cvmfs::EnumStatus status,
704
  const std::string &msg)
705
{
706
3
  string session_str("unidentified client (" + StringifyInt(session_id) + ")");
707
3
  map<uint64_t, SessionInfo>::const_iterator iter = sessions_.find(session_id);
708
3
  if (iter != sessions_.end()) {
709
3
    session_str = iter->second.name;
710
  }
711
  LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
712
           "session '%s': %s (%d - %s)",
713
           session_str.c_str(), msg.c_str(), status,
714
3
           CacheTransportCode2Ascii(status));
715
3
}
716
717
718
16
void *CachePlugin::MainProcessRequests(void *data) {
719
16
  CachePlugin *cache_plugin = reinterpret_cast<CachePlugin *>(data);
720
721
16
  platform_sighandler_t save_sigpipe = signal(SIGPIPE, SIG_IGN);
722
723
16
  vector<struct pollfd> watch_fds;
724
  // Elements 0, 1: control pipe, socket fd
725
  struct pollfd watch_ctrl;
726
16
  watch_ctrl.fd = cache_plugin->pipe_ctrl_[0];
727
16
  watch_ctrl.events = POLLIN | POLLPRI;
728
16
  watch_fds.push_back(watch_ctrl);
729
  struct pollfd watch_socket;
730
16
  watch_socket.fd = cache_plugin->fd_socket_;
731
16
  watch_socket.events = POLLIN | POLLPRI;
732
16
  watch_fds.push_back(watch_socket);
733
734
16
  bool terminated = false;
735
4498
  while (!terminated) {
736
17900
    for (unsigned i = 0; i < watch_fds.size(); ++i)
737
13418
      watch_fds[i].revents = 0;
738
4482
    int retval = poll(&watch_fds[0], watch_fds.size(), -1);
739
4482
    if (retval < 0) {
740
      if (errno == EINTR)
741
        continue;
742
      LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
743
               "cache plugin connection failure (%d)", errno);
744
      abort();
745
    }
746
747
    // Termination or detach
748
4482
    if (watch_fds[0].revents) {
749
      char signal;
750
1018
      ReadPipe(watch_fds[0].fd, &signal, 1);
751
1018
      if (signal == kSignalDetach) {
752
1002
        cache_plugin->SendDetachRequests();
753
1002
        continue;
754
      }
755
756
      // termination
757
16
      if (watch_fds.size() > 2) {
758
        LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
759
5
                 "terminating external cache manager with pending connections");
760
      }
761
16
      break;
762
    }
763
764
    // New connection
765
3464
    if (watch_fds[1].revents) {
766
      struct sockaddr_un remote;
767
17
      socklen_t socket_size = sizeof(remote);
768
      int fd_con =
769
17
        accept(watch_fds[1].fd, (struct sockaddr *)&remote, &socket_size);
770
17
      if (fd_con < 0) {
771
        LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
772
                 "failed to establish connection (%d)", errno);
773
        continue;
774
      }
775
      struct pollfd watch_con;
776
17
      watch_con.fd = fd_con;
777
17
      watch_con.events = POLLIN | POLLPRI;
778
17
      watch_fds.push_back(watch_con);
779
17
      cache_plugin->connections_.insert(fd_con);
780
    }
781
782
    // New request
783
10392
    for (unsigned i = 2; i < watch_fds.size(); ) {
784
3464
      if (watch_fds[i].revents) {
785
3447
        bool proceed = cache_plugin->HandleRequest(watch_fds[i].fd);
786
3447
        if (!proceed) {
787
12
          close(watch_fds[i].fd);
788
12
          cache_plugin->connections_.erase(watch_fds[i].fd);
789
12
          watch_fds.erase(watch_fds.begin() + i);
790


12
          if ( (getenv(CacheTransport::kEnvReadyNotifyFd) != NULL) &&
791
               (cache_plugin->connections_.empty()) &&
792
               (cache_plugin->num_inlimbo_clients_ == 0) )
793
          {
794
            LogCvmfs(kLogCache, kLogSyslog,
795
                     "stopping cache plugin, no more active clients");
796
            terminated = true;
797
            break;
798
          }
799
        } else {
800
3435
          i++;
801
        }
802
      } else {
803
17
        i++;
804
      }
805
    }
806
  }
807
808
  // 0, 1 being closed by destructor
809
21
  for (unsigned i = 2; i < watch_fds.size(); ++i)
810
5
    close(watch_fds[i].fd);
811
16
  cache_plugin->txn_ids_.Clear();
812
813
16
  signal(SIGPIPE, save_sigpipe);
814
16
  return NULL;
815
}
816
817
818
/**
819
 * Used during startup to synchronize with the cvmfs client.
820
 */
821
16
void CachePlugin::NotifySupervisor(char signal) {
822
16
  char *pipe_ready = getenv(CacheTransport::kEnvReadyNotifyFd);
823
16
  if (pipe_ready == NULL)
824
16
    return;
825
  int fd_pipe_ready = String2Int64(pipe_ready);
826
  WritePipe(fd_pipe_ready, &signal, 1);
827
}
828
829
830
16
void CachePlugin::ProcessRequests(unsigned num_workers) {
831
16
  num_workers_ = num_workers;
832
16
  int retval = pthread_create(&thread_io_, NULL, MainProcessRequests, this);
833
16
  assert(retval == 0);
834
16
  NotifySupervisor(CacheTransport::kReadyNotification);
835
16
  atomic_cas32(&running_, 0, 1);
836
16
}
837
838
839
1002
void CachePlugin::SendDetachRequests() {
840
1002
  set<int>::const_iterator iter = connections_.begin();
841
1002
  set<int>::const_iterator iter_end = connections_.end();
842
1002
  for (; iter != iter_end; ++iter) {
843
    CacheTransport transport(*iter,
844
      CacheTransport::kFlagSendIgnoreFailure |
845
1002
      CacheTransport::kFlagSendNonBlocking);
846
1002
    cvmfs::MsgDetach msg_detach;
847
1002
    CacheTransport::Frame frame_send(&msg_detach);
848
1002
    transport.SendFrame(&frame_send);
849
  }
850
1002
}
851
852
853
16
void CachePlugin::Terminate() {
854
16
  if (IsRunning()) {
855
16
    char terminate = kSignalTerminate;
856
16
    WritePipe(pipe_ctrl_[1], &terminate, 1);
857
16
    pthread_join(thread_io_, NULL);
858
16
    atomic_cas32(&running_, 1, 0);
859
  }
860
16
}
861
862
863
void CachePlugin::WaitFor() {
864
  if (!IsRunning())
865
    return;
866
  pthread_join(thread_io_, NULL);
867

45
}