GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/cache_plugin/channel.h Lines: 38 41 92.7 %
Date: 2019-02-03 02:48:13 Branches: 6 10 60.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
#ifndef CVMFS_CACHE_PLUGIN_CHANNEL_H_
5
#define CVMFS_CACHE_PLUGIN_CHANNEL_H_
6
7
#include <pthread.h>
8
#include <stdint.h>
9
10
#include <cassert>
11
#include <map>
12
#include <set>
13
#include <string>
14
#include <vector>
15
16
#include "atomic.h"
17
#include "cache.pb.h"
18
#include "cache_transport.h"
19
#include "hash.h"
20
#include "murmur.h"
21
#include "smallhash.h"
22
#include "util/single_copy.h"
23
24
/**
25
 * A SessionCtx stores the session information related to the current cache
26
 * plugin callback in thread-local-storage.  Singleton.
27
 *
28
 * TODO(jblomer): merge code with ClientCtx
29
 */
30
class SessionCtx : SingleCopy {
31
 public:
32
  struct ThreadLocalStorage {
33
14
    ThreadLocalStorage(uint64_t id, char *reponame, char *client_instance)
34
      : id(id)
35
      , reponame(reponame)
36
      , client_instance(client_instance)
37
14
      , is_set(true)
38
14
    { }
39
40
    uint64_t id;
41
    char *reponame;
42
    char *client_instance;
43
    bool is_set;  ///< either not yet set or deliberately unset
44
  };
45
46
  static SessionCtx *GetInstance();
47
  static void CleanupInstance();
48
  ~SessionCtx();
49
50
  void Set(uint64_t id, char *reponame, char *client_instance);
51
  void Unset();
52
  void Get(uint64_t *id, char **reponame, char **client_instance);
53
  bool IsSet();
54
55
 private:
56
  static SessionCtx *instance_;
57
  static void TlsDestructor(void *data);
58
59
  SessionCtx();
60
61
  pthread_key_t thread_local_storage_;
62
  pthread_mutex_t *lock_tls_blocks_;
63
  std::vector<ThreadLocalStorage *> tls_blocks_;
64
};
65
66
67
class CachePlugin {
68
 public:
69
  static const unsigned kPbProtocolVersion = 1;
70
  static const uint64_t kSizeUnknown;
71
72
31
  struct ObjectInfo {
73
31
    ObjectInfo()
74
      : id()
75
      , size(kSizeUnknown)
76
      , object_type(cvmfs::OBJECT_REGULAR)
77
31
      , pinned(false) { }
78
    shash::Any id;
79
    uint64_t size;
80
    cvmfs::EnumObjectType object_type;
81
    bool pinned;
82
    std::string description;
83
  };
84
85
  struct Info {
86
4
    Info() : size_bytes(0), used_bytes(0), pinned_bytes(0), no_shrink(-1) { }
87
    uint64_t size_bytes;
88
    uint64_t used_bytes;
89
    uint64_t pinned_bytes;
90
    int64_t no_shrink;
91
  };
92
93
  bool Listen(const std::string &locator);
94
  virtual ~CachePlugin();
95
  void ProcessRequests(unsigned num_workers);
96
  bool IsRunning();
97
  void Terminate();
98
  void WaitFor();
99
  void AskToDetach();
100
101
  unsigned max_object_size() const { return max_object_size_; }
102
  uint64_t capabilities() const { return capabilities_; }
103
104
 protected:
105
  explicit CachePlugin(uint64_t capabilities);
106
107
  virtual cvmfs::EnumStatus ChangeRefcount(const shash::Any &id,
108
                                           int32_t change_by) = 0;
109
  virtual cvmfs::EnumStatus GetObjectInfo(const shash::Any &id,
110
                                          ObjectInfo *info) = 0;
111
  virtual cvmfs::EnumStatus Pread(const shash::Any &id,
112
                                  uint64_t offset,
113
                                  uint32_t *size,
114
                                  unsigned char *buffer) = 0;
115
  virtual cvmfs::EnumStatus StartTxn(const shash::Any &id,
116
                                     const uint64_t txn_id,
117
                                     const ObjectInfo &info) = 0;
118
  virtual cvmfs::EnumStatus WriteTxn(const uint64_t txn_id,
119
                                     unsigned char *buffer,
120
                                     uint32_t size) = 0;
121
  virtual cvmfs::EnumStatus AbortTxn(const uint64_t txn_id) = 0;
122
  virtual cvmfs::EnumStatus CommitTxn(const uint64_t txn_id) = 0;
123
124
  virtual cvmfs::EnumStatus GetInfo(Info *info) = 0;
125
  virtual cvmfs::EnumStatus Shrink(uint64_t shrink_to,
126
                                   uint64_t *used_bytes) = 0;
127
  virtual cvmfs::EnumStatus ListingBegin(uint64_t lst_id,
128
                                         cvmfs::EnumObjectType type) = 0;
129
  virtual cvmfs::EnumStatus ListingNext(int64_t lst_id,
130
                                        ObjectInfo *item) = 0;
131
  virtual cvmfs::EnumStatus ListingEnd(int64_t lst_id) = 0;
132
133
 private:
134
  static const unsigned kDefaultMaxObjectSize = 256 * 1024;  // 256kB
135
  static const unsigned kListingSize = 4 * 1024 * 1024;  // 4MB
136
  static const char kSignalTerminate = 'q';
137
  static const char kSignalDetach = 'd';
138
139
5440
  struct UniqueRequest {
140
5472
    UniqueRequest() : session_id(-1), req_id(-1) { }
141
609
    UniqueRequest(int64_t s, int64_t r) : session_id(s), req_id(r) { }
142
1239
    bool operator ==(const UniqueRequest &other) const {
143
      return (this->session_id == other.session_id) &&
144

1239
             (this->req_id == other.req_id);
145
    }
146
    bool operator !=(const UniqueRequest &other) const {
147
      return !(*this == other);
148
    }
149
150
    int64_t session_id;
151
    int64_t req_id;
152
  };
153
154
  /**
155
   * The char pointers are prepared on Handshake and removed when the session
156
   * closes.  They are created to be consumed by the cvmcache_get_session() API.
157
   */
158
119
  struct SessionInfo {
159
17
    SessionInfo() : id(0), reponame(NULL), client_instance(NULL) { }
160
    SessionInfo(uint64_t id, const std::string &name);
161
162
    uint64_t id;
163
    std::string name;
164
    char *reponame;
165
    char *client_instance;
166
  };
167
168
  /**
169
   * RAII form of the SessionCtx.  On construction, automatically sets the
170
   * session context if the session id is found.  On destruction, unsets the
171
   * session information.
172
   */
173
  class SessionCtxGuard {
174
   public:
175
3414
    SessionCtxGuard(uint64_t session_id, CachePlugin *plugin) {
176
3414
      char *reponame = NULL;
177
3414
      char *client_instance = NULL;
178
      std::map<uint64_t, SessionInfo>::const_iterator iter =
179
3414
        plugin->sessions_.find(session_id);
180
3414
      if (iter != plugin->sessions_.end()) {
181
3414
        reponame = iter->second.reponame;
182
3414
        client_instance = iter->second.client_instance;
183
      }
184
3414
      SessionCtx *session_ctx = SessionCtx::GetInstance();
185
3414
      assert(session_ctx);
186
3414
      session_ctx->Set(session_id, reponame, client_instance);
187
3414
    }
188
189
3414
    ~SessionCtxGuard() {
190
3414
      SessionCtx *session_ctx = SessionCtx::GetInstance();
191
3414
      assert(session_ctx);
192
3414
      session_ctx->Unset();
193
3414
    }
194
  };
195
196
  static void *MainProcessRequests(void *data);
197
198
17
  inline uint64_t NextSessionId() {
199
17
    return atomic_xadd64(&next_session_id_, 1);
200
  }
201
7
  inline uint64_t NextTxnId() {
202
7
    return atomic_xadd64(&next_txn_id_, 1);
203
  }
204
6
  inline uint64_t NextLstId() {
205
6
    return atomic_xadd64(&next_lst_id_, 1);
206
  }
207
623
  static inline uint32_t HashUniqueRequest(const UniqueRequest &req) {
208
623
    return MurmurHash2(&req, sizeof(req), 0x07387a4f);
209
  }
210
211
  bool HandleRequest(int fd_con);
212
  void HandleHandshake(cvmfs::MsgHandshake *msg_req,
213
                       CacheTransport *transport);
214
  void HandleRefcount(cvmfs::MsgRefcountReq *msg_req,
215
                      CacheTransport *transport);
216
  void HandleObjectInfo(cvmfs::MsgObjectInfoReq *msg_req,
217
                        CacheTransport *transport);
218
  void HandleRead(cvmfs::MsgReadReq *msg_req,
219
                     CacheTransport *transport);
220
  void HandleStore(cvmfs::MsgStoreReq *msg_req,
221
                   CacheTransport::Frame *frame,
222
                   CacheTransport *transport);
223
  void HandleStoreAbort(cvmfs::MsgStoreAbortReq *msg_req,
224
                        CacheTransport *transport);
225
  void HandleInfo(cvmfs::MsgInfoReq *msg_req, CacheTransport *transport);
226
  void HandleShrink(cvmfs::MsgShrinkReq *msg_req, CacheTransport *transport);
227
  void HandleList(cvmfs::MsgListReq *msg_req, CacheTransport *transport);
228
  void HandleIoctl(cvmfs::MsgIoctl *msg_req);
229
  void SendDetachRequests();
230
231
  void NotifySupervisor(char signal);
232
233
  void LogSessionError(uint64_t session_id,
234
                       cvmfs::EnumStatus status,
235
                       const std::string &msg);
236
  void LogSessionInfo(uint64_t session_id, const std::string &msg);
237
238
  bool is_local_;
239
  uint64_t capabilities_;
240
  int fd_socket_;
241
  int fd_socket_lock_;
242
  atomic_int32 running_;
243
  unsigned num_workers_;
244
  unsigned max_object_size_;
245
  /**
246
   * Number of clients undergoing a reload, i.e. they promise to come back
247
   * and open a new connection soon.
248
   */
249
  uint64_t num_inlimbo_clients_;
250
  std::string name_;
251
  atomic_int64 next_session_id_;
252
  atomic_int64 next_txn_id_;
253
  atomic_int64 next_lst_id_;
254
  SmallHashDynamic<UniqueRequest, uint64_t> txn_ids_;
255
  std::set<int> connections_;
256
  std::map<uint64_t, SessionInfo> sessions_;
257
  pthread_t thread_io_;
258
  int pipe_ctrl_[2];
259
};  // class CachePlugin
260
261
#endif  // CVMFS_CACHE_PLUGIN_CHANNEL_H_