GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/cache_extern.h Lines: 81 98 82.7 %
Date: 2019-02-03 02:48:13 Branches: 9 18 50.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
#ifndef CVMFS_CACHE_EXTERN_H_
5
#define CVMFS_CACHE_EXTERN_H_
6
7
#ifndef __STDC_FORMAT_MACROS
8
#define __STDC_FORMAT_MACROS
9
#endif
10
11
#include <pthread.h>
12
#include <stdint.h>
13
#include <unistd.h>
14
15
#include <cassert>
16
#include <string>
17
#include <vector>
18
19
#include "atomic.h"
20
#include "cache.h"
21
#include "cache_transport.h"
22
#include "fd_table.h"
23
#include "gtest/gtest_prod.h"
24
#include "hash.h"
25
#include "quota.h"
26
#include "util/single_copy.h"
27
#include "util_concurrency.h"
28
29
30
class ExternalCacheManager : public CacheManager {
31
  FRIEND_TEST(T_ExternalCacheManager, TransactionAbort);
32
  friend class ExternalQuotaManager;
33
34
 public:
35
  static const unsigned kPbProtocolVersion = 1;
36
  /**
37
   * Used for race-free startup of an external cache plugin.
38
   */
39
  class PluginHandle {
40
    friend class ExternalCacheManager;
41
   public:
42
    PluginHandle() : fd_connection_(-1) { }
43
    bool IsValid() const { return fd_connection_ >= 0; }
44
    int fd_connection() const { return fd_connection_; }
45
    std::string error_msg() const { return error_msg_; }
46
47
   private:
48
    /**
49
     * The connected file descriptor to pass to Create()
50
     */
51
    int fd_connection_;
52
53
    std::string error_msg_;
54
  };
55
56
  static PluginHandle *CreatePlugin(
57
    const std::string &locator,
58
    const std::vector<std::string> &cmd_line);
59
60
  static ExternalCacheManager *Create(int fd_connection,
61
                                      unsigned max_open_fds,
62
                                      const std::string &ident);
63
  virtual ~ExternalCacheManager();
64
65
6
  virtual CacheManagerIds id() { return kExternalCacheManager; }
66
  virtual std::string Describe();
67
  virtual bool AcquireQuotaManager(QuotaManager *quota_mgr);
68
69
  virtual int Open(const BlessedObject &object);
70
  virtual int64_t GetSize(int fd);
71
  virtual int Close(int fd);
72
  virtual int64_t Pread(int fd, void *buf, uint64_t size, uint64_t offset);
73
  virtual int Dup(int fd);
74
  virtual int Readahead(int fd);
75
76
#ifdef __APPLE__
77
  virtual uint32_t SizeOfTxn() { return sizeof(Transaction); }
78
#else
79
2040
  virtual uint32_t SizeOfTxn() {
80
2040
    return sizeof(Transaction) + max_object_size_;
81
  }
82
#endif
83
  virtual int StartTxn(const shash::Any &id, uint64_t size, void *txn);
84
  virtual void CtrlTxn(const ObjectInfo &object_info,
85
                       const int flags,
86
                       void *txn);
87
  virtual int64_t Write(const void *buf, uint64_t size, void *txn);
88
  virtual int Reset(void *txn);
89
  virtual int AbortTxn(void *txn);
90
  virtual int OpenFromTxn(void *txn);
91
  virtual int CommitTxn(void *txn);
92
93
  virtual void Spawn();
94
95
10
  int64_t session_id() const { return session_id_; }
96
207
  uint32_t max_object_size() const { return max_object_size_; }
97
8
  uint64_t capabilities() const { return capabilities_; }
98
1
  pid_t pid_plugin() const { return pid_plugin_; }
99
100
 protected:
101
  virtual void *DoSaveState();
102
  virtual bool DoRestoreState(void *data);
103
  virtual bool DoFreeState(void *data);
104
105
 private:
106
  /**
107
   * The null hash (hashed output is all null bytes) serves as a marker for an
108
   * invalid handle
109
   */
110
  static const shash::Any kInvalidHandle;
111
  /**
112
   * Objects cannot be larger than 512 kB.  Keeps transaction memory consumption
113
   * under control.
114
   */
115
  static const unsigned kMaxSupportedObjectSize = 512 * 1024;
116
  /**
117
   * Statistically, at least half of our objects should not be further chunked.
118
   */
119
  static const unsigned kMinSupportedObjectSize = 4 * 1024;
120
121
  struct Transaction {
122
2038
    explicit Transaction(const shash::Any &id)
123
      : buffer(reinterpret_cast<unsigned char *>(this) + sizeof(Transaction))
124
      , buf_pos(0)
125
      , size(0)
126
      , expected_size(kSizeUnknown)
127
      , object_info(kTypeRegular, "")
128
      , open_fds(0)
129
      , flushed(false)
130
      , committed(false)
131
      , object_info_modified(false)
132
      , transaction_id(0)
133
2038
      , id(id)
134
2038
    { }
135
136
    /**
137
     * Allocated size is max_object_size_, allocated by the caller at the end
138
     * of the transaction (Linux only).
139
     */
140
    unsigned char *buffer;
141
    unsigned buf_pos;
142
    uint64_t size;
143
    uint64_t expected_size;
144
    ObjectInfo object_info;
145
    int open_fds;
146
    bool flushed;
147
    bool committed;
148
    bool object_info_modified;
149
    uint64_t transaction_id;
150
    shash::Any id;
151
  };  // class Transaction
152
153
  struct ReadOnlyHandle {
154
554
    ReadOnlyHandle() : id(kInvalidHandle) { }
155
524
    explicit ReadOnlyHandle(const shash::Any &h) : id(h) { }
156
1046
    bool operator ==(const ReadOnlyHandle &other) const {
157
1046
      return this->id == other.id;
158
    }
159
1702
    bool operator !=(const ReadOnlyHandle &other) const {
160
1702
      return this->id != other.id;
161
    }
162
    shash::Any id;
163
  };  // class ReadOnlyHandle
164
165
9105
  class RpcJob {
166
   public:
167
3068
    explicit RpcJob(cvmfs::MsgRefcountReq *msg)
168
3068
      : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
169
40
    explicit RpcJob(cvmfs::MsgObjectInfoReq *msg)
170
40
      : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
171
2920
    explicit RpcJob(cvmfs::MsgReadReq *msg)
172
2920
      : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
173
3022
    explicit RpcJob(cvmfs::MsgStoreReq *msg)
174
      : req_id_(msg->req_id()), part_nr_(msg->part_nr()), msg_req_(msg),
175
3022
        frame_send_(msg) { }
176
1
    explicit RpcJob(cvmfs::MsgStoreAbortReq *msg)
177
      : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg),
178
1
        frame_send_(msg) { }
179
28
    explicit RpcJob(cvmfs::MsgInfoReq *msg)
180
28
      : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
181
10
    explicit RpcJob(cvmfs::MsgShrinkReq *msg)
182
10
      : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
183
16
    explicit RpcJob(cvmfs::MsgListReq *msg)
184
16
      : req_id_(msg->req_id()), part_nr_(0), msg_req_(msg), frame_send_(msg) { }
185
186
3022
    void set_attachment_send(void *data, unsigned size) {
187
3022
      frame_send_.set_attachment(data, size);
188
3022
    }
189
190
2920
    void set_attachment_recv(void *data, unsigned size) {
191
2920
      frame_recv_.set_attachment(data, size);
192
2920
    }
193
194
    google::protobuf::MessageLite *msg_req() { return msg_req_; }
195
    // Type checking has been already performed
196
3068
    cvmfs::MsgRefcountReply *msg_refcount_reply() {
197
      cvmfs::MsgRefcountReply *m = reinterpret_cast<cvmfs::MsgRefcountReply *>(
198
3068
        frame_recv_.GetMsgTyped());
199
3068
      assert(m->req_id() == req_id_);
200
3068
      return m;
201
    }
202
40
    cvmfs::MsgObjectInfoReply *msg_object_info_reply() {
203
      cvmfs::MsgObjectInfoReply *m =
204
        reinterpret_cast<cvmfs::MsgObjectInfoReply *>(
205
40
          frame_recv_.GetMsgTyped());
206
40
      assert(m->req_id() == req_id_);
207
40
      return m;
208
    }
209
2920
    cvmfs::MsgReadReply *msg_read_reply() {
210
      cvmfs::MsgReadReply *m = reinterpret_cast<cvmfs::MsgReadReply *>(
211
2920
        frame_recv_.GetMsgTyped());
212
2920
      assert(m->req_id() == req_id_);
213
2920
      return m;
214
    }
215
3023
    cvmfs::MsgStoreReply *msg_store_reply() {
216
      cvmfs::MsgStoreReply *m = reinterpret_cast<cvmfs::MsgStoreReply *>(
217
3023
        frame_recv_.GetMsgTyped());
218
3023
      assert(m->req_id() == req_id_);
219
3023
      assert(m->part_nr() == part_nr_);
220
3023
      return m;
221
    }
222
28
    cvmfs::MsgInfoReply *msg_info_reply() {
223
      cvmfs::MsgInfoReply *m = reinterpret_cast<cvmfs::MsgInfoReply *>(
224
28
        frame_recv_.GetMsgTyped());
225
28
      assert(m->req_id() == req_id_);
226
28
      return m;
227
    }
228
10
    cvmfs::MsgShrinkReply *msg_shrink_reply() {
229
      cvmfs::MsgShrinkReply *m = reinterpret_cast<cvmfs::MsgShrinkReply *>(
230
10
        frame_recv_.GetMsgTyped());
231
10
      assert(m->req_id() == req_id_);
232
10
      return m;
233
    }
234
16
    cvmfs::MsgListReply *msg_list_reply() {
235
      cvmfs::MsgListReply *m = reinterpret_cast<cvmfs::MsgListReply *>(
236
16
        frame_recv_.GetMsgTyped());
237
16
      assert(m->req_id() == req_id_);
238
16
      return m;
239
    }
240
241
9105
    CacheTransport::Frame *frame_send() { return &frame_send_; }
242
29093
    CacheTransport::Frame *frame_recv() { return &frame_recv_; }
243
2032
    uint64_t req_id() const { return req_id_; }
244
2028
    uint64_t part_nr() const { return part_nr_; }
245
246
   private:
247
    uint64_t req_id_;
248
    uint64_t part_nr_;
249
    google::protobuf::MessageLite *msg_req_;
250
    CacheTransport::Frame frame_send_;
251
    CacheTransport::Frame frame_recv_;
252
  };  // class RpcJob
253
254
2028
  struct RpcInFlight {
255
2028
    RpcInFlight() : rpc_job(NULL), signal(NULL) { }
256
2028
    RpcInFlight(RpcJob *r, Signal *s) : rpc_job(r), signal(s) { }
257
258
    RpcJob *rpc_job;
259
    Signal *signal;
260
  };
261
262
  static void *MainRead(void *data);
263
  static int ConnectLocator(const std::string &locator);
264
  static bool SpawnPlugin(const std::vector<std::string> &cmd_line);
265
266
  explicit ExternalCacheManager(int fd_connection, unsigned max_open_fds);
267
8121
  int64_t NextRequestId() { return atomic_xadd64(&next_request_id_, 1); }
268
  void CallRemotely(RpcJob *rpc_job);
269
  int ChangeRefcount(const shash::Any &id, int change_by);
270
  int DoOpen(const shash::Any &id);
271
  shash::Any GetHandle(int fd);
272
  int Flush(bool do_commit, Transaction *transaction);
273
274
  pid_t pid_plugin_;
275
  FdTable<ReadOnlyHandle> fd_table_;
276
  CacheTransport transport_;
277
  int64_t session_id_;
278
  uint32_t max_object_size_;
279
  bool spawned_;
280
  bool terminated_;
281
  pthread_rwlock_t rwlock_fd_table_;
282
  atomic_int64 next_request_id_;
283
284
  /**
285
   * Serialize concurrent write access to the session fd
286
   */
287
  pthread_mutex_t lock_send_fd_;
288
  std::vector<RpcInFlight> inflight_rpcs_;
289
  pthread_mutex_t lock_inflight_rpcs_;
290
  pthread_t thread_read_;
291
  uint64_t capabilities_;
292
};  // class ExternalCacheManager
293
294
295
78
class ExternalQuotaManager : public QuotaManager {
296
 public:
297
  static ExternalQuotaManager *Create(ExternalCacheManager *cache_mgr);
298
  virtual bool HasCapability(Capabilities capability);
299
300
  virtual void Insert(const shash::Any &hash, const uint64_t size,
301
                      const std::string &description)
302
  { }
303
304
  virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
305
                              const std::string &description)
306
  { }
307
308
  virtual bool Pin(const shash::Any &hash, const uint64_t size,
309
                   const std::string &description, const bool is_catalog)
310
  { return is_catalog; }
311
312
  virtual void Unpin(const shash::Any &hash) { }
313
  virtual void Touch(const shash::Any &hash) { }
314
  virtual void Remove(const shash::Any &file) { }
315
  virtual bool Cleanup(const uint64_t leave_size);
316
317
  virtual void RegisterBackChannel(int back_channel[2],
318
                                   const std::string &channel_id);
319
  virtual void UnregisterBackChannel(int back_channel[2],
320
                                     const std::string &channel_id);
321
322
  virtual std::vector<std::string> List();
323
  virtual std::vector<std::string> ListPinned();
324
  virtual std::vector<std::string> ListCatalogs();
325
  virtual std::vector<std::string> ListVolatile();
326
  virtual uint64_t GetMaxFileSize() { return uint64_t(-1); }
327
  virtual uint64_t GetCapacity();
328
  virtual uint64_t GetSize();
329
  virtual uint64_t GetSizePinned();
330
  virtual uint64_t GetCleanupRate(uint64_t period_s);
331
332
  virtual void Spawn() { }
333
1
  virtual pid_t GetPid() { return cache_mgr_->pid_plugin(); }
334
  virtual uint32_t GetProtocolRevision() { return 0; }
335
336
 private:
337
  struct QuotaInfo {
338
28
    QuotaInfo() : size(0), used(0), pinned(0), no_shrink(0) { }
339
    uint64_t size;
340
    uint64_t used;
341
    uint64_t pinned;
342
    uint64_t no_shrink;
343
  };
344
345
39
  explicit ExternalQuotaManager(ExternalCacheManager *cache_mgr)
346
39
    : cache_mgr_(cache_mgr) { }
347
  int GetInfo(QuotaInfo *quota_info);
348
  bool DoListing(cvmfs::EnumObjectType type,
349
                 std::vector<cvmfs::MsgListRecord> *result);
350
351
  ExternalCacheManager *cache_mgr_;
352
};
353
354
#endif  // CVMFS_CACHE_EXTERN_H_