GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/cache_extern.cc Lines: 459 597 76.9 %
Date: 2019-02-03 02:48:13 Branches: 153 272 56.3 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
#include "cvmfs_config.h"
5
#include "cache_extern.h"
6
7
#include <errno.h>
8
#include <fcntl.h>
9
#include <inttypes.h>
10
#include <stdint.h>
11
#include <sys/socket.h>
12
#include <unistd.h>
13
14
#include <algorithm>
15
#include <cassert>
16
#ifdef __APPLE__
17
#include <cstdlib>
18
#endif
19
#include <cstring>
20
#include <map>
21
#include <new>
22
#include <set>
23
#include <string>
24
25
#include "atomic.h"
26
#include "cache.pb.h"
27
#include "hash.h"
28
#include "logging.h"
29
#ifdef __APPLE__
30
#include "smalloc.h"
31
#endif
32
#include "util/pointer.h"
33
#include "util/posix.h"
34
#include "util/string.h"
35
#include "util_concurrency.h"
36
37
using namespace std;  // NOLINT
38
39
namespace {
40
41
6125
int Ack2Errno(cvmfs::EnumStatus status_code) {
42



6125
  switch (status_code) {
43
    case cvmfs::STATUS_OK:
44
6109
      return 0;
45
    case cvmfs::STATUS_NOSUPPORT:
46
      return -EOPNOTSUPP;
47
    case cvmfs::STATUS_FORBIDDEN:
48
      return -EPERM;
49
    case cvmfs::STATUS_NOSPACE:
50
      return -ENOSPC;
51
    case cvmfs::STATUS_NOENTRY:
52
9
      return -ENOENT;
53
    case cvmfs::STATUS_MALFORMED:
54
2
      return -EINVAL;
55
    case cvmfs::STATUS_IOERR:
56
      return -EIO;
57
    case cvmfs::STATUS_CORRUPTED:
58
      return -EIO;
59
    case cvmfs::STATUS_TIMEOUT:
60
      return -EIO;
61
    case cvmfs::STATUS_BADCOUNT:
62
      return -EINVAL;
63
    case cvmfs::STATUS_OUTOFBOUNDS:
64
5
      return -EINVAL;
65
    default:
66
      return -EIO;
67
  }
68
}
69
70
}  // anonymous namespace
71
72
17
const shash::Any ExternalCacheManager::kInvalidHandle;
73
74
75
int ExternalCacheManager::AbortTxn(void *txn) {
76
  int result = Reset(txn);
77
#ifdef __APPLE__
78
  free(reinterpret_cast<Transaction *>(txn)->buffer);
79
#endif
80
  return result;
81
}
82
83
84
39
bool ExternalCacheManager::AcquireQuotaManager(QuotaManager *quota_mgr) {
85
39
  assert(quota_mgr != NULL);
86
39
  quota_mgr_ = quota_mgr;
87
17
  LogCvmfs(kLogCache, kLogDebug, "set quota manager");
88
39
  return true;
89
}
90
91
92
9105
void ExternalCacheManager::CallRemotely(ExternalCacheManager::RpcJob *rpc_job) {
93
9105
  if (!spawned_) {
94
7077
    transport_.SendFrame(rpc_job->frame_send());
95
7077
    uint32_t save_att_size = rpc_job->frame_recv()->att_size();
96
    bool again;
97
7078
    do {
98
7078
      again = false;
99
7078
      bool retval = transport_.RecvFrame(rpc_job->frame_recv());
100
7078
      assert(retval);
101
7078
      if (rpc_job->frame_recv()->IsMsgOutOfBand()) {
102
        google::protobuf::MessageLite *msg_typed =
103
1
          rpc_job->frame_recv()->GetMsgTyped();
104
1
        assert(msg_typed->GetTypeName() == "cvmfs.MsgDetach");
105
1
        quota_mgr_->BroadcastBackchannels("R");  //  release pinned catalogs
106
1
        rpc_job->frame_recv()->Reset(save_att_size);
107
1
        again = true;
108
      }
109
    } while (again);
110
  } else {
111
2028
    Signal signal;
112
    {
113
2028
      MutexLockGuard guard(lock_inflight_rpcs_);
114
2028
      inflight_rpcs_.push_back(RpcInFlight(rpc_job, &signal));
115
    }
116
    {
117
2028
      MutexLockGuard guard(lock_send_fd_);
118
2028
      transport_.SendFrame(rpc_job->frame_send());
119
    }
120
2028
    signal.Wait();
121
  }
122
9105
}
123
124
125
3068
int ExternalCacheManager::ChangeRefcount(const shash::Any &id, int change_by) {
126
3068
  cvmfs::MsgHash object_id;
127
3068
  transport_.FillMsgHash(id, &object_id);
128
3068
  cvmfs::MsgRefcountReq msg_refcount;
129
3068
  msg_refcount.set_session_id(session_id_);
130
3068
  msg_refcount.set_req_id(NextRequestId());
131
3068
  msg_refcount.set_allocated_object_id(&object_id);
132
3068
  msg_refcount.set_change_by(change_by);
133
3068
  RpcJob rpc_job(&msg_refcount);
134
3068
  CallRemotely(&rpc_job);
135
3068
  msg_refcount.release_object_id();
136
137
3068
  cvmfs::MsgRefcountReply *msg_reply = rpc_job.msg_refcount_reply();
138
3068
  return Ack2Errno(msg_reply->status());
139
}
140
141
142
513
int ExternalCacheManager::Close(int fd) {
143
513
  ReadOnlyHandle handle;
144
  {
145
513
    WriteLockGuard guard(rwlock_fd_table_);
146
513
    handle = fd_table_.GetHandle(fd);
147
513
    if (handle.id == kInvalidHandle)
148
1
      return -EBADF;
149
512
    int retval = fd_table_.CloseFd(fd);
150

512
    assert(retval == 0);
151
  }
152
153
512
  return ChangeRefcount(handle.id, -1);
154
}
155
156
157
2038
int ExternalCacheManager::CommitTxn(void *txn) {
158
2038
  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
159
  LogCvmfs(kLogCache, kLogDebug, "committing %s",
160
6
           transaction->id.ToString().c_str());
161
2038
  int retval = Flush(true, transaction);
162
2038
  if (retval != 0)
163
    return retval;
164
165
2038
  int refcount = transaction->open_fds - 1;
166
2038
  if (refcount != 0)
167
2036
    return ChangeRefcount(transaction->id, refcount);
168
#ifdef __APPLE__
169
  free(transaction->buffer);
170
#endif
171
2
  return 0;
172
}
173
174
175
int ExternalCacheManager::ConnectLocator(const std::string &locator) {
176
  vector<string> tokens = SplitString(locator, '=');
177
  int result = -1;
178
  if (tokens[0] == "unix") {
179
    result = ConnectSocket(tokens[1]);
180
  } else if (tokens[0] == "tcp") {
181
    vector<string> tcp_address = SplitString(tokens[1], ':');
182
    if (tcp_address.size() != 2)
183
      return -EINVAL;
184
    result = ConnectTcpEndpoint(tcp_address[0], String2Uint64(tcp_address[1]));
185
  } else {
186
    return -EINVAL;
187
  }
188
  if (result < 0) {
189
    if (errno) {
190
      LogCvmfs(kLogCache, kLogDebug | kLogStderr,
191
               "Failed to connect to socket: %s", strerror(errno));
192
    } else {
193
      LogCvmfs(kLogCache, kLogDebug | kLogStderr,
194
               "Failed to connect to socket (unknown error)");
195
    }
196
    return -EIO;
197
  }
198
  LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
199
           "connected to cache plugin at %s", locator.c_str());
200
  return result;
201
}
202
203
204
41
ExternalCacheManager *ExternalCacheManager::Create(
205
  int fd_connection,
206
  unsigned max_open_fds,
207
  const string &ident)
208
{
209
  UniquePtr<ExternalCacheManager> cache_mgr(
210
41
    new ExternalCacheManager(fd_connection, max_open_fds));
211
41
  assert(cache_mgr.IsValid());
212
213
41
  cvmfs::MsgHandshake msg_handshake;
214
41
  msg_handshake.set_protocol_version(kPbProtocolVersion);
215
41
  msg_handshake.set_name(ident);
216
41
  CacheTransport::Frame frame_send(&msg_handshake);
217
41
  cache_mgr->transport_.SendFrame(&frame_send);
218
219
41
  CacheTransport::Frame frame_recv;
220
41
  bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
221
41
  if (!retval)
222
    return NULL;
223
41
  google::protobuf::MessageLite *msg_typed = frame_recv.GetMsgTyped();
224
41
  if (msg_typed->GetTypeName() != "cvmfs.MsgHandshakeAck")
225
    return NULL;
226
  cvmfs::MsgHandshakeAck *msg_ack =
227
41
    reinterpret_cast<cvmfs::MsgHandshakeAck *>(msg_typed);
228
41
  cache_mgr->session_id_ = msg_ack->session_id();
229
41
  cache_mgr->capabilities_ = msg_ack->capabilities();
230
41
  cache_mgr->max_object_size_ = msg_ack->max_object_size();
231
41
  assert(cache_mgr->max_object_size_ > 0);
232
41
  if (cache_mgr->max_object_size_ > kMaxSupportedObjectSize) {
233
    LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
234
             "external cache manager object size too large (%u)",
235
             cache_mgr->max_object_size_);
236
    return NULL;
237
  }
238
41
  if (cache_mgr->max_object_size_ < kMinSupportedObjectSize) {
239
    LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
240
             "external cache manager object size too small (%u)",
241
             cache_mgr->max_object_size_);
242
    return NULL;
243
  }
244
41
  if (msg_ack->has_pid())
245
17
    cache_mgr->pid_plugin_ = msg_ack->pid();
246
41
  return cache_mgr.Release();
247
}
248
249
250
/**
251
 * Tries to connect to the plugin at locator, or, if it doesn't exist, spawns
252
 * a new plugin using cmdline.  Two processes could try to spawn the plugin at
253
 * the same time.  In this case, the plugin should indicate to the client to
254
 * retry connecting.
255
 */
256
ExternalCacheManager::PluginHandle *ExternalCacheManager::CreatePlugin(
257
  const std::string &locator,
258
  const std::vector<std::string> &cmd_line)
259
{
260
  UniquePtr<PluginHandle> plugin_handle(new PluginHandle());
261
  unsigned num_attempts = 0;
262
  bool try_again = false;
263
  do {
264
    num_attempts++;
265
    if (num_attempts > 2) {
266
      // Prevent violate busy loops
267
      SafeSleepMs(1000);
268
    }
269
    plugin_handle->fd_connection_ = ConnectLocator(locator);
270
    if (plugin_handle->IsValid()) {
271
      break;
272
    } else if (plugin_handle->fd_connection_ == -EINVAL) {
273
      LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
274
               "Invalid locator: %s", locator.c_str());
275
      plugin_handle->error_msg_ = "Invalid locator: " + locator;
276
      break;
277
    } else {
278
      if (num_attempts > 1) {
279
        LogCvmfs(kLogCache, kLogDebug | kLogStderr,
280
                 "Failed to connect to external cache manager: %d",
281
                 plugin_handle->fd_connection_);
282
      }
283
      plugin_handle->error_msg_ = "Failed to connect to external cache manager";
284
    }
285
286
    try_again = SpawnPlugin(cmd_line);
287
  } while (try_again);
288
289
  return plugin_handle.Release();
290
}
291
292
293
2027
void ExternalCacheManager::CtrlTxn(
294
  const ObjectInfo &object_info,
295
  const int flags,
296
  void *txn)
297
{
298
2027
  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
299
2027
  transaction->object_info = object_info;
300
2027
  transaction->object_info_modified = true;
301
2027
}
302
303
304
string ExternalCacheManager::Describe() {
305
  return "External cache manager\n";
306
}
307
308
309
2
bool ExternalCacheManager::DoFreeState(void *data) {
310
  FdTable<ReadOnlyHandle> *fd_table =
311
2
    reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
312
2
  delete fd_table;
313
2
  return true;
314
}
315
316
317
522
int ExternalCacheManager::DoOpen(const shash::Any &id) {
318
522
  int fd = -1;
319
  {
320
522
    WriteLockGuard guard(rwlock_fd_table_);
321
522
    fd = fd_table_.OpenFd(ReadOnlyHandle(id));
322
522
    if (fd < 0) {
323
      LogCvmfs(kLogCache, kLogDebug, "error while creating new fd",
324
2
               strerror(-fd));
325
2
      return fd;
326
    }
327
  }
328
329
520
  int status_refcnt = ChangeRefcount(id, 1);
330
520
  if (status_refcnt == 0)
331
510
    return fd;
332
333
10
  WriteLockGuard guard(rwlock_fd_table_);
334
10
  int retval = fd_table_.CloseFd(fd);
335
10
  assert(retval == 0);
336
10
  return status_refcnt;
337
}
338
339
340
2
bool ExternalCacheManager::DoRestoreState(void *data) {
341
  FdTable<ReadOnlyHandle> *other =
342
2
    reinterpret_cast<FdTable<ReadOnlyHandle> *>(data);
343
2
  fd_table_.AssignFrom(*other);
344
2
  cvmfs::MsgIoctl msg_ioctl;
345
2
  msg_ioctl.set_session_id(session_id_);
346
2
  msg_ioctl.set_conncnt_change_by(-1);
347
2
  CacheTransport::Frame frame(&msg_ioctl);
348
2
  transport_.SendFrame(&frame);
349
2
  return true;
350
}
351
352
353
2
void *ExternalCacheManager::DoSaveState() {
354
2
  cvmfs::MsgIoctl msg_ioctl;
355
2
  msg_ioctl.set_session_id(session_id_);
356
2
  msg_ioctl.set_conncnt_change_by(1);
357
2
  CacheTransport::Frame frame(&msg_ioctl);
358
2
  transport_.SendFrame(&frame);
359
2
  return fd_table_.Clone();
360
}
361
362
363
129
int ExternalCacheManager::Dup(int fd) {
364
129
  shash::Any id = GetHandle(fd);
365
129
  if (id == kInvalidHandle)
366
1
    return -EBADF;
367
128
  return DoOpen(id);
368
}
369
370
371
41
ExternalCacheManager::ExternalCacheManager(
372
  int fd_connection,
373
  unsigned max_open_fds)
374
  : pid_plugin_(0)
375
  , fd_table_(max_open_fds, ReadOnlyHandle())
376
  , transport_(fd_connection)
377
  , session_id_(-1)
378
  , max_object_size_(0)
379
  , spawned_(false)
380
  , terminated_(false)
381
41
  , capabilities_(cvmfs::CAP_NONE)
382
{
383
41
  int retval = pthread_rwlock_init(&rwlock_fd_table_, NULL);
384
41
  assert(retval == 0);
385
41
  retval = pthread_mutex_init(&lock_send_fd_, NULL);
386
41
  assert(retval == 0);
387
41
  retval = pthread_mutex_init(&lock_inflight_rpcs_, NULL);
388
41
  assert(retval == 0);
389
41
  memset(&thread_read_, 0, sizeof(thread_read_));
390
41
  atomic_init64(&next_request_id_);
391
41
}
392
393
394
78
ExternalCacheManager::~ExternalCacheManager() {
395
39
  terminated_ = true;
396
39
  MemoryFence();
397
39
  if (session_id_ >= 0) {
398
39
    cvmfs::MsgQuit msg_quit;
399
39
    msg_quit.set_session_id(session_id_);
400
39
    CacheTransport::Frame frame(&msg_quit);
401
39
    transport_.SendFrame(&frame);
402
  }
403
39
  shutdown(transport_.fd_connection(), SHUT_RDWR);
404
39
  if (spawned_)
405
1
    pthread_join(thread_read_, NULL);
406
39
  close(transport_.fd_connection());
407
39
  pthread_rwlock_destroy(&rwlock_fd_table_);
408
39
  pthread_mutex_destroy(&lock_send_fd_);
409
39
  pthread_mutex_destroy(&lock_inflight_rpcs_);
410
78
}
411
412
413
3024
int ExternalCacheManager::Flush(bool do_commit, Transaction *transaction) {
414
3024
  if (transaction->committed)
415
2
    return 0;
416
  LogCvmfs(kLogCache, kLogDebug, "flushing %u bytes for %s",
417
608
           transaction->buf_pos, transaction->id.ToString().c_str());
418
3022
  cvmfs::MsgHash object_id;
419
3022
  transport_.FillMsgHash(transaction->id, &object_id);
420
3022
  cvmfs::MsgStoreReq msg_store;
421
3022
  msg_store.set_session_id(session_id_);
422
3022
  msg_store.set_req_id(transaction->transaction_id);
423
3022
  msg_store.set_allocated_object_id(&object_id);
424
3022
  msg_store.set_part_nr((transaction->size / max_object_size_) + 1);
425
3022
  msg_store.set_expected_size(transaction->expected_size);
426
3022
  msg_store.set_last_part(do_commit);
427
428
3022
  if (transaction->object_info_modified) {
429
    cvmfs::EnumObjectType object_type;
430
3014
    transport_.FillObjectType(transaction->object_info.type, &object_type);
431
3014
    msg_store.set_object_type(object_type);
432
3014
    msg_store.set_description(transaction->object_info.description);
433
  }
434
435
3022
  RpcJob rpc_job(&msg_store);
436
3022
  rpc_job.set_attachment_send(transaction->buffer, transaction->buf_pos);
437
  // TODO(jblomer): allow for out of order chunk upload
438
3022
  CallRemotely(&rpc_job);
439
3022
  msg_store.release_object_id();
440
441
3022
  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
442
3022
  if (msg_reply->status() == cvmfs::STATUS_OK) {
443
3022
    transaction->flushed = true;
444
3022
    if (do_commit)
445
2038
      transaction->committed = true;
446
  }
447
3022
  return Ack2Errno(msg_reply->status());
448
}
449
450
451
430
shash::Any ExternalCacheManager::GetHandle(int fd) {
452
430
  ReadLockGuard guard(rwlock_fd_table_);
453
430
  ReadOnlyHandle handle = fd_table_.GetHandle(fd);
454
430
  return handle.id;
455
}
456
457
458
41
int64_t ExternalCacheManager::GetSize(int fd) {
459
41
  shash::Any id = GetHandle(fd);
460
41
  if (id == kInvalidHandle)
461
1
    return -EBADF;
462
463
40
  cvmfs::MsgHash object_id;
464
40
  transport_.FillMsgHash(id, &object_id);
465
40
  cvmfs::MsgObjectInfoReq msg_info;
466
40
  msg_info.set_session_id(session_id_);
467
40
  msg_info.set_req_id(NextRequestId());
468
40
  msg_info.set_allocated_object_id(&object_id);
469
40
  RpcJob rpc_job(&msg_info);
470
40
  CallRemotely(&rpc_job);
471
40
  msg_info.release_object_id();
472
473
40
  cvmfs::MsgObjectInfoReply *msg_reply = rpc_job.msg_object_info_reply();
474
40
  if (msg_reply->status() == cvmfs::STATUS_OK) {
475
39
    assert(msg_reply->has_size());
476
39
    return msg_reply->size();
477
  }
478
1
  return Ack2Errno(msg_reply->status());
479
}
480
481
482
1
void *ExternalCacheManager::MainRead(void *data) {
483
  ExternalCacheManager *cache_mgr =
484
1
    reinterpret_cast<ExternalCacheManager *>(data);
485
1
  LogCvmfs(kLogCache, kLogDebug, "starting external cache reader thread");
486
487
1
  unsigned char buffer[cache_mgr->max_object_size_];
488
  while (true) {
489
3029
    CacheTransport::Frame frame_recv;
490
3029
    frame_recv.set_attachment(buffer, cache_mgr->max_object_size_);
491
3029
    bool retval = cache_mgr->transport_.RecvFrame(&frame_recv);
492
3029
    if (!retval)
493
      break;
494
495
    uint64_t req_id;
496
3028
    uint64_t part_nr = 0;
497
3028
    google::protobuf::MessageLite *msg = frame_recv.GetMsgTyped();
498
3028
    if (msg->GetTypeName() == "cvmfs.MsgRefcountReply") {
499
19
      req_id = reinterpret_cast<cvmfs::MsgRefcountReply *>(msg)->req_id();
500
3009
    } else if (msg->GetTypeName() == "cvmfs.MsgObjectInfoReply") {
501
9
      req_id = reinterpret_cast<cvmfs::MsgObjectInfoReply *>(msg)->req_id();
502
3000
    } else if (msg->GetTypeName() == "cvmfs.MsgReadReply") {
503
1800
      req_id = reinterpret_cast<cvmfs::MsgReadReply *>(msg)->req_id();
504
1200
    } else if (msg->GetTypeName() == "cvmfs.MsgStoreReply") {
505
200
      req_id = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->req_id();
506
200
      part_nr = reinterpret_cast<cvmfs::MsgStoreReply *>(msg)->part_nr();
507
1000
    } else if (msg->GetTypeName() == "cvmfs.MsgInfoReply") {
508
      req_id = reinterpret_cast<cvmfs::MsgInfoReply *>(msg)->req_id();
509
1000
    } else if (msg->GetTypeName() == "cvmfs.MsgShrinkReply") {
510
      req_id = reinterpret_cast<cvmfs::MsgShrinkReply *>(msg)->req_id();
511
1000
    } else if (msg->GetTypeName() == "cvmfs.MsgListReply") {
512
      req_id = reinterpret_cast<cvmfs::MsgListReply *>(msg)->req_id();
513
1000
    } else if (msg->GetTypeName() == "cvmfs.MsgDetach") {
514
      // Release pinned catalogs
515
1000
      cache_mgr->quota_mgr_->BroadcastBackchannels("R");
516
1000
      continue;
517
    } else {
518
      LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug, "unexpected message %s",
519
               msg->GetTypeName().c_str());
520
      abort();
521
    }
522
523
2028
    RpcInFlight rpc_inflight;
524
    {
525
2028
      MutexLockGuard guard(cache_mgr->lock_inflight_rpcs_);
526
2032
      for (unsigned i = 0; i < cache_mgr->inflight_rpcs_.size(); ++i) {
527
2032
        RpcJob *rpc_job = cache_mgr->inflight_rpcs_[i].rpc_job;
528

2032
        if ((rpc_job->req_id() == req_id) && (rpc_job->part_nr() == part_nr)) {
529
2028
          rpc_inflight = cache_mgr->inflight_rpcs_[i];
530
          cache_mgr->inflight_rpcs_.erase(
531
2028
            cache_mgr->inflight_rpcs_.begin() + i);
532
2028
          break;
533
        }
534
2028
      }
535
    }
536
2028
    if (rpc_inflight.rpc_job == NULL) {
537
      LogCvmfs(kLogCache, kLogSyslogWarn | kLogDebug,
538
               "got unmatched rpc reply");
539
      continue;
540
    }
541
2028
    rpc_inflight.rpc_job->frame_recv()->MergeFrom(frame_recv);
542
2028
    rpc_inflight.signal->Wakeup();
543
  }
544
545
1
  if (!cache_mgr->terminated_) {
546
    LogCvmfs(kLogCache, kLogSyslogErr | kLogDebug,
547
             "connection to external cache manager broken (%d)", errno);
548
    abort();
549
  }
550
1
  LogCvmfs(kLogCache, kLogDebug, "stopping external cache reader thread");
551
1
  return NULL;
552
}
553
554
555
394
int ExternalCacheManager::Open(const BlessedObject &object) {
556
394
  return DoOpen(object.id);
557
}
558
559
560
2
int ExternalCacheManager::OpenFromTxn(void *txn) {
561
2
  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
562
  LogCvmfs(kLogCache, kLogDebug, "open fd for transaction %s",
563
           transaction->id.ToString().c_str());
564
2
  int retval = Flush(true, transaction);
565
2
  if (retval != 0)
566
    return retval;
567
568
2
  int fd = -1;
569
  {
570
2
    WriteLockGuard guard(rwlock_fd_table_);
571
2
    fd = fd_table_.OpenFd(ReadOnlyHandle(transaction->id));
572
2
    if (fd < 0) {
573
      LogCvmfs(kLogCache, kLogDebug, "error while creating new fd",
574
               strerror(-fd));
575
      return fd;
576
    }
577
  }
578
2
  transaction->open_fds++;
579
2
  return fd;
580
}
581
582
583
258
int64_t ExternalCacheManager::Pread(
584
  int fd,
585
  void *buf,
586
  uint64_t size,
587
  uint64_t offset)
588
{
589
258
  shash::Any id = GetHandle(fd);
590
258
  if (id == kInvalidHandle)
591
1
    return -EBADF;
592
593
257
  cvmfs::MsgHash object_id;
594
257
  transport_.FillMsgHash(id, &object_id);
595
257
  uint64_t nbytes = 0;
596

257
  while (nbytes < size) {
597
    uint64_t batch_size =
598
2920
      std::min(size - nbytes, static_cast<uint64_t>(max_object_size_));
599
2920
    cvmfs::MsgReadReq msg_read;
600
2920
    msg_read.set_session_id(session_id_);
601
2920
    msg_read.set_req_id(NextRequestId());
602
2920
    msg_read.set_allocated_object_id(&object_id);
603
2920
    msg_read.set_offset(offset + nbytes);
604
2920
    msg_read.set_size(batch_size);
605
2920
    RpcJob rpc_job(&msg_read);
606
    rpc_job.set_attachment_recv(reinterpret_cast<char *>(buf) + nbytes,
607
2920
                                batch_size);
608
2920
    CallRemotely(&rpc_job);
609
2920
    msg_read.release_object_id();
610
611
2920
    cvmfs::MsgReadReply *msg_reply = rpc_job.msg_read_reply();
612
2920
    if (msg_reply->status() == cvmfs::STATUS_OK) {
613
2915
      nbytes += rpc_job.frame_recv()->att_size();
614
      // Fuse sends in rounded up buffers, so short reads are expected
615
2915
      if (rpc_job.frame_recv()->att_size() < batch_size)
616
9
        return nbytes;
617
    } else {
618
5
      return Ack2Errno(msg_reply->status());
619
    }
620
  }
621
243
  return size;
622
}
623
624
625
2
int ExternalCacheManager::Readahead(int fd) {
626
2
  shash::Any id = GetHandle(fd);
627
2
  if (id == kInvalidHandle)
628
1
    return -EBADF;
629
  // No-op
630
1
  return 0;
631
}
632
633
634
7
int ExternalCacheManager::Reset(void *txn) {
635
7
  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
636
7
  transaction->buf_pos = 0;
637
7
  transaction->size = 0;
638
7
  transaction->open_fds = 0;
639
7
  transaction->committed = false;
640
7
  transaction->object_info_modified = true;
641
642
7
  if (!transaction->flushed)
643
6
    return 0;
644
645
1
  cvmfs::MsgHash object_id;
646
1
  transport_.FillMsgHash(transaction->id, &object_id);
647
1
  cvmfs::MsgStoreAbortReq msg_abort;
648
1
  msg_abort.set_session_id(session_id_);
649
1
  msg_abort.set_req_id(transaction->transaction_id);
650
1
  msg_abort.set_allocated_object_id(&object_id);
651
1
  RpcJob rpc_job(&msg_abort);
652
1
  CallRemotely(&rpc_job);
653
1
  msg_abort.release_object_id();
654
1
  cvmfs::MsgStoreReply *msg_reply = rpc_job.msg_store_reply();
655
1
  transaction->transaction_id = NextRequestId();
656
1
  transaction->flushed = false;
657
1
  return Ack2Errno(msg_reply->status());
658
}
659
660
661
1
void ExternalCacheManager::Spawn() {
662
1
  int retval = pthread_create(&thread_read_, NULL, MainRead, this);
663
1
  assert(retval == 0);
664
1
  spawned_ = true;
665
1
}
666
667
668
/**
669
 * Returns true if the plugin could be spawned or was spawned by another
670
 * process.
671
 */
672
bool ExternalCacheManager::SpawnPlugin(const vector<string> &cmd_line) {
673
  if (cmd_line.empty())
674
    return false;
675
676
  int pipe_ready[2];
677
  MakePipe(pipe_ready);
678
  set<int> preserve_filedes;
679
  preserve_filedes.insert(pipe_ready[1]);
680
681
  int fd_null_read = open("/dev/null", O_RDONLY);
682
  int fd_null_write = open("/dev/null", O_WRONLY);
683
  assert((fd_null_read >= 0) && (fd_null_write >= 0));
684
  map<int, int> map_fildes;
685
  map_fildes[fd_null_read] = 0;
686
  map_fildes[fd_null_write] = 1;
687
  map_fildes[fd_null_write] = 2;
688
689
  pid_t child_pid;
690
  int retval = setenv(CacheTransport::kEnvReadyNotifyFd,
691
                      StringifyInt(pipe_ready[1]).c_str(), 1);
692
  assert(retval == 0);
693
  retval = ManagedExec(cmd_line,
694
                       preserve_filedes,
695
                       map_fildes,
696
                       false,  // drop_credentials
697
                       true,   // double fork
698
                       &child_pid);
699
  unsetenv(CacheTransport::kEnvReadyNotifyFd);
700
  close(fd_null_read);
701
  close(fd_null_write);
702
  if (!retval) {
703
    LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
704
             "failed to start cache plugin '%s'",
705
             JoinStrings(cmd_line, " ").c_str());
706
    ClosePipe(pipe_ready);
707
    return false;
708
  }
709
710
  LogCvmfs(kLogCache, kLogDebug | kLogSyslog,
711
           "started cache plugin '%s' (pid %d), waiting for it to become ready",
712
           JoinStrings(cmd_line, " ").c_str(), child_pid);
713
  close(pipe_ready[1]);
714
  char buf;
715
  if (read(pipe_ready[0], &buf, 1) != 1) {
716
    close(pipe_ready[0]);
717
    LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
718
             "cache plugin did not start properly");
719
    return false;
720
  }
721
  close(pipe_ready[0]);
722
723
  if (buf == CacheTransport::kReadyNotification)
724
    return true;
725
  LogCvmfs(kLogCache, kLogDebug | kLogSyslogErr,
726
           "cache plugin failed to create an endpoint");
727
  return false;
728
}
729
730
731
2040
int ExternalCacheManager::StartTxn(
732
  const shash::Any &id,
733
  uint64_t size,
734
  void *txn)
735
{
736
2040
  if (!(capabilities_ & cvmfs::CAP_WRITE))
737
2
    return -EROFS;
738
739
2038
  Transaction *transaction = new (txn) Transaction(id);
740
2038
  transaction->expected_size = size;
741
2038
  transaction->transaction_id = NextRequestId();
742
#ifdef __APPLE__
743
  transaction->buffer =
744
    reinterpret_cast<unsigned char *>(smalloc(max_object_size_));
745
#endif
746
2038
  return 0;
747
}
748
749
750
2042
int64_t ExternalCacheManager::Write(const void *buf, uint64_t size, void *txn) {
751
2042
  Transaction *transaction = reinterpret_cast<Transaction *>(txn);
752
2042
  assert(!transaction->committed);
753
  LogCvmfs(kLogCache, kLogDebug, "writing %" PRIu64 " bytes for %s",
754
8
           size, transaction->id.ToString().c_str());
755
756
2042
  if (transaction->expected_size != kSizeUnknown) {
757
2042
    if (transaction->size + size > transaction->expected_size) {
758
      LogCvmfs(kLogCache, kLogDebug,
759
               "Transaction size (%" PRIu64 ") > expected size (%" PRIu64 ")",
760
               transaction->size + size, transaction->expected_size);
761
      return -EFBIG;
762
    }
763
  }
764
765
2042
  uint64_t written = 0;
766
2042
  const unsigned char *read_pos = reinterpret_cast<const unsigned char *>(buf);
767
7107
  while (written < size) {
768
3023
    if (transaction->buf_pos == max_object_size_) {
769
984
      bool do_commit = false;
770
984
      if (transaction->expected_size != kSizeUnknown)
771
984
        do_commit = (transaction->size + written) == transaction->expected_size;
772
984
      int retval = Flush(do_commit, transaction);
773
984
      if (retval != 0) {
774
        transaction->size += written;
775
        return retval;
776
      }
777
984
      transaction->size += transaction->buf_pos;
778
984
      transaction->buf_pos = 0;
779
    }
780
3023
    uint64_t remaining = size - written;
781
3023
    uint64_t space_in_buffer = max_object_size_ - transaction->buf_pos;
782
3023
    uint64_t batch_size = std::min(remaining, space_in_buffer);
783
3023
    memcpy(transaction->buffer + transaction->buf_pos, read_pos, batch_size);
784
3023
    transaction->buf_pos += batch_size;
785
3023
    written += batch_size;
786
3023
    read_pos += batch_size;
787
  }
788
2042
  return written;
789
}
790
791
792
//------------------------------------------------------------------------------
793
794
795
14
bool ExternalQuotaManager::DoListing(
796
  cvmfs::EnumObjectType type,
797
  vector<cvmfs::MsgListRecord> *result)
798
{
799
14
  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_LIST))
800
    return false;
801
802
14
  uint64_t listing_id = 0;
803
14
  bool more_data = false;
804
16
  do {
805
16
    cvmfs::MsgListReq msg_list;
806
16
    msg_list.set_session_id(cache_mgr_->session_id_);
807
16
    msg_list.set_req_id(cache_mgr_->NextRequestId());
808
16
    msg_list.set_listing_id(listing_id);
809
16
    msg_list.set_object_type(type);
810
16
    ExternalCacheManager::RpcJob rpc_job(&msg_list);
811
16
    cache_mgr_->CallRemotely(&rpc_job);
812
813
16
    cvmfs::MsgListReply *msg_reply = rpc_job.msg_list_reply();
814
16
    if (msg_reply->status() != cvmfs::STATUS_OK)
815
      return false;
816
16
    more_data = !msg_reply->is_last_part();
817
16
    listing_id = msg_reply->listing_id();
818
204020
    for (int i = 0; i < msg_reply->list_record_size(); ++i) {
819
204004
      result->push_back(msg_reply->list_record(i));
820
    }
821
  } while (more_data);
822
823
14
  return true;
824
}
825
826
827
10
bool ExternalQuotaManager::Cleanup(const uint64_t leave_size) {
828
10
  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK))
829
    return false;
830
831
10
  cvmfs::MsgShrinkReq msg_shrink;
832
10
  msg_shrink.set_session_id(cache_mgr_->session_id_);
833
10
  msg_shrink.set_req_id(cache_mgr_->NextRequestId());
834
10
  msg_shrink.set_shrink_to(leave_size);
835
10
  ExternalCacheManager::RpcJob rpc_job(&msg_shrink);
836
10
  cache_mgr_->CallRemotely(&rpc_job);
837
838
10
  cvmfs::MsgShrinkReply *msg_reply = rpc_job.msg_shrink_reply();
839
10
  return msg_reply->status() == cvmfs::STATUS_OK;
840
}
841
842
843
39
ExternalQuotaManager *ExternalQuotaManager::Create(
844
  ExternalCacheManager *cache_mgr)
845
{
846
  UniquePtr<ExternalQuotaManager> quota_mgr(
847
39
    new ExternalQuotaManager(cache_mgr));
848
39
  assert(quota_mgr.IsValid());
849
850
39
  return quota_mgr.Release();
851
}
852
853
854
28
int ExternalQuotaManager::GetInfo(QuotaInfo *quota_info) {
855
28
  if (!(cache_mgr_->capabilities_ & cvmfs::CAP_INFO))
856
    return Ack2Errno(cvmfs::STATUS_NOSUPPORT);
857
858
28
  cvmfs::MsgInfoReq msg_info;
859
28
  msg_info.set_session_id(cache_mgr_->session_id_);
860
28
  msg_info.set_req_id(cache_mgr_->NextRequestId());
861
28
  ExternalCacheManager::RpcJob rpc_job(&msg_info);
862
28
  cache_mgr_->CallRemotely(&rpc_job);
863
864
28
  cvmfs::MsgInfoReply *msg_reply = rpc_job.msg_info_reply();
865
28
  if (msg_reply->status() == cvmfs::STATUS_OK) {
866
28
    quota_info->size = msg_reply->size_bytes();
867
28
    quota_info->used = msg_reply->used_bytes();
868
28
    quota_info->pinned = msg_reply->pinned_bytes();
869
28
    if (msg_reply->no_shrink() >= 0)
870
28
      quota_info->no_shrink = msg_reply->no_shrink();
871
  }
872
28
  return Ack2Errno(msg_reply->status());
873
}
874
875
876
5
uint64_t ExternalQuotaManager::GetCapacity() {
877
5
  QuotaInfo info;
878
5
  int retval = GetInfo(&info);
879
5
  if (retval != 0)
880
    return uint64_t(-1);
881
5
  return info.size;
882
}
883
884
885
uint64_t ExternalQuotaManager::GetCleanupRate(uint64_t period_s) {
886
  QuotaInfo info;
887
  int retval = GetInfo(&info);
888
  if (retval != 0)
889
    return 0;
890
  return info.no_shrink;
891
}
892
893
894
15
uint64_t ExternalQuotaManager::GetSize() {
895
15
  QuotaInfo info;
896
15
  int retval = GetInfo(&info);
897
15
  if (retval != 0)
898
    return 0;
899
15
  return info.used;
900
}
901
902
903
8
uint64_t ExternalQuotaManager::GetSizePinned() {
904
8
  QuotaInfo info;
905
8
  int retval = GetInfo(&info);
906
8
  if (retval != 0)
907
    return 0;
908
8
  return info.pinned;
909
}
910
911
912
bool ExternalQuotaManager::HasCapability(Capabilities capability) {
913
  switch (capability) {
914
    case kCapIntrospectSize:
915
      return cache_mgr_->capabilities_ & cvmfs::CAP_INFO;
916
    case kCapIntrospectCleanupRate:
917
      return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK_RATE;
918
    case kCapList:
919
      return cache_mgr_->capabilities_ & cvmfs::CAP_LIST;
920
    case kCapShrink:
921
      return cache_mgr_->capabilities_ & cvmfs::CAP_SHRINK;
922
    case kCapListeners:
923
      return true;
924
    default:
925
      return false;
926
  }
927
}
928
929
930
3
vector<string> ExternalQuotaManager::List() {
931
3
  vector<string> result;
932
3
  vector<cvmfs::MsgListRecord> raw_list;
933
3
  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_list);
934
3
  if (!retval)
935
    return result;
936
102005
  for (unsigned i = 0; i < raw_list.size(); ++i)
937
102002
    result.push_back(raw_list[i].description());
938
  return result;
939
}
940
941
942
1
vector<string> ExternalQuotaManager::ListCatalogs() {
943
1
  vector<string> result;
944
1
  vector<cvmfs::MsgListRecord> raw_list;
945
1
  bool retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_list);
946
1
  if (!retval)
947
    return result;
948
1
  for (unsigned i = 0; i < raw_list.size(); ++i)
949
    result.push_back(raw_list[i].description());
950
  return result;
951
}
952
953
954
3
vector<string> ExternalQuotaManager::ListPinned() {
955
3
  vector<string> result;
956

3
  vector<cvmfs::MsgListRecord> raw_lists[3];
957
3
  bool retval = DoListing(cvmfs::OBJECT_REGULAR, &raw_lists[0]);
958
3
  if (!retval)
959
    return result;
960
3
  retval = DoListing(cvmfs::OBJECT_CATALOG, &raw_lists[1]);
961
3
  if (!retval)
962
    return result;
963
3
  retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_lists[2]);
964
3
  if (!retval)
965
    return result;
966
12
  for (unsigned i = 0; i < sizeof(raw_lists) / sizeof(raw_lists[0]); ++i) {
967
102011
    for (unsigned j = 0; j < raw_lists[i].size(); ++j) {
968
102002
      if (raw_lists[i][j].pinned())
969
200
        result.push_back(raw_lists[i][j].description());
970
    }
971
  }
972
  return result;
973
}
974
975
976
1
vector<string> ExternalQuotaManager::ListVolatile() {
977
1
  vector<string> result;
978
1
  vector<cvmfs::MsgListRecord> raw_list;
979
1
  bool retval = DoListing(cvmfs::OBJECT_VOLATILE, &raw_list);
980
1
  if (!retval)
981
    return result;
982
1
  for (unsigned i = 0; i < raw_list.size(); ++i)
983
    result.push_back(raw_list[i].description());
984
  return result;
985
}
986
987
988
1
void ExternalQuotaManager::RegisterBackChannel(
989
  int back_channel[2],
990
  const string &channel_id)
991
{
992
1
  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
993
1
  MakePipe(back_channel);
994
1
  LockBackChannels();
995
1
  assert(back_channels_.find(hash_id) == back_channels_.end());
996
1
  back_channels_[hash_id] = back_channel[1];
997
1
  UnlockBackChannels();
998
1
}
999
1000
1001
1
void ExternalQuotaManager::UnregisterBackChannel(
1002
  int back_channel[2],
1003
  const string &channel_id)
1004
{
1005
1
  shash::Md5 hash_id = shash::Md5(shash::AsciiPtr(channel_id));
1006
1
  LockBackChannels();
1007
1
  back_channels_.erase(hash_id);
1008
1
  UnlockBackChannels();
1009
1
  ClosePipe(back_channel);
1010

52
}