GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/quota_posix.h Lines: 24 25 96.0 %
Date: 2019-02-03 02:48:13 Branches: 0 0 - %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_QUOTA_POSIX_H_
6
#define CVMFS_QUOTA_POSIX_H_
7
8
#include <pthread.h>
9
#include <stdint.h>
10
#include <sys/types.h>
11
#include <unistd.h>
12
13
#include <map>
14
#include <string>
15
#include <vector>
16
17
#include "duplex_sqlite3.h"
18
#include "gtest/gtest_prod.h"
19
#include "hash.h"
20
#include "quota.h"
21
#include "statistics.h"
22
#include "util/single_copy.h"
23
#include "util/string.h"
24
25
namespace perf {
26
class Recorder;
27
}
28
29
/**
30
 * Works with the PosixCacheManager.  Uses an SQlite database for cache contents
31
 * tracking.  Tracking is asynchronously.
32
 *
33
 * TODO(jblomer): split into client, server, and protocol classes.
34
 */
35
class PosixQuotaManager : public QuotaManager {
36
  FRIEND_TEST(T_QuotaManager, BindReturnPipe);
37
  FRIEND_TEST(T_QuotaManager, Cleanup);
38
  FRIEND_TEST(T_QuotaManager, Contains);
39
  FRIEND_TEST(T_QuotaManager, InitDatabase);
40
  FRIEND_TEST(T_QuotaManager, MakeReturnPipe);
41
42
 public:
43
  static PosixQuotaManager *Create(const std::string &cache_workspace,
44
    const uint64_t limit, const uint64_t cleanup_threshold,
45
    const bool rebuild_database);
46
  static PosixQuotaManager *CreateShared(
47
    const std::string &exe_path,
48
    const std::string &cache_workspace,
49
    const uint64_t limit,
50
    const uint64_t cleanup_threshold,
51
    bool foreground);
52
  static int MainCacheManager(int argc, char **argv);
53
54
  virtual ~PosixQuotaManager();
55
  virtual bool HasCapability(Capabilities capability) { return true; }
56
57
  virtual void Insert(const shash::Any &hash, const uint64_t size,
58
                      const std::string &description);
59
  virtual void InsertVolatile(const shash::Any &hash, const uint64_t size,
60
                              const std::string &description);
61
  virtual bool Pin(const shash::Any &hash, const uint64_t size,
62
                   const std::string &description, const bool is_catalog);
63
  virtual void Unpin(const shash::Any &hash);
64
  virtual void Touch(const shash::Any &hash);
65
  virtual void Remove(const shash::Any &file);
66
  virtual bool Cleanup(const uint64_t leave_size);
67
68
  virtual void RegisterBackChannel(int back_channel[2],
69
                                   const std::string &channel_id);
70
  virtual void UnregisterBackChannel(int back_channel[2],
71
                                     const std::string &channel_id);
72
73
  virtual std::vector<std::string> List();
74
  virtual std::vector<std::string> ListPinned();
75
  virtual std::vector<std::string> ListCatalogs();
76
  virtual std::vector<std::string> ListVolatile();
77
  virtual uint64_t GetMaxFileSize();
78
  virtual uint64_t GetCapacity();
79
  virtual uint64_t GetSize();
80
  virtual uint64_t GetSizePinned();
81
  virtual uint64_t GetCleanupRate(uint64_t period_s);
82
83
  virtual void Spawn();
84
  virtual pid_t GetPid();
85
  virtual uint32_t GetProtocolRevision();
86
87
 private:
88
  /**
89
   * Loaded catalogs are pinned in the LRU and have to be treated differently.
90
   */
91
  enum FileTypes {
92
    kFileRegular = 0,
93
    kFileCatalog,
94
  };
95
96
  /**
97
   * List of RPCs that can be sent to the cache manager.
98
   */
99
  enum CommandType {
100
    kTouch = 0,
101
    kInsert,
102
    kReserve,
103
    kPin,
104
    kUnpin,
105
    kRemove,
106
    kCleanup,
107
    kList,
108
    kListPinned,
109
    kListCatalogs,
110
    kStatus,
111
    kLimits,
112
    kPid,
113
    kPinRegular,
114
    kRegisterBackChannel,
115
    kUnregisterBackChannel,
116
    kGetProtocolRevision,
117
    kInsertVolatile,
118
    // as of protocol revision 2
119
    kListVolatile,
120
    kCleanupRate,
121
  };
122
123
  /**
124
   * That could be done in more elegant way.  However, we might have a situation
125
   * with old cache manager serving new clients (or vice versa) and we don't
126
   * want to change the memory layout of LruCommand.
127
   */
128
  struct LruCommand {
129
    CommandType command_type;
130
    uint64_t size;    /**< Careful! Last 3 bits store hash algorithm */
131
    int return_pipe;  /**< For cleanup, listing, and reservations */
132
    unsigned char digest[shash::kMaxDigestSize];
133
    /**
134
     * Maximum 512-sizeof(LruCommand) in order to guarantee atomic pipe
135
     * operations.
136
     */
137
    uint16_t desc_length;
138
139
1176
    LruCommand()
140
      : command_type(static_cast<CommandType>(0))
141
      , size(0)
142
      , return_pipe(-1)
143
1176
      , desc_length(0)
144
    {
145
1176
      memset(digest, 0, shash::kMaxDigestSize);
146
1176
    }
147
148
73
    void SetSize(const uint64_t new_size) {
149
73
      uint64_t mask = 7;
150
73
      mask = ~(mask << (64-3));
151
73
      size = (new_size & mask) | size;
152
73
    }
153
154
214
    uint64_t GetSize() const {
155
214
      uint64_t mask = 7;
156
214
      mask = ~(mask << (64-3));
157
214
      return size & mask;
158
    }
159
160
153
    void StoreHash(const shash::Any &hash) {
161
153
      memcpy(digest, hash.digest, hash.GetDigestSize());
162
      // Exclude MD5
163
153
      uint64_t algo_flags = hash.algorithm - 1;
164
153
      algo_flags = algo_flags << (64-3);
165
153
      size |= algo_flags;
166
153
    }
167
168
86
    shash::Any RetrieveHash() const {
169
86
      uint64_t algo_flags = size >> (64-3);
170
86
      shash::Any result(static_cast<shash::Algorithms>(algo_flags+1));
171
86
      memcpy(result.digest, digest, result.GetDigestSize());
172
86
      return result;
173
    }
174
  };
175
176
  /**
177
   * Maximum page cache per thread (Bytes).
178
   */
179
  static const unsigned kSqliteMemPerThread = 2*1024*1024;
180
181
  /**
182
   * Collect a number of insert and touch operations before processing them
183
   * as sqlite commands.
184
   */
185
  static const unsigned kCommandBufferSize = 32;
186
187
  /**
188
   * Make sure that the amount of data transferred through the RPC pipe is
189
   * within the OS's guarantees for atomiticity.
190
   */
191
  static const unsigned kMaxDescription = 512-sizeof(LruCommand);
192
193
  /**
194
   * Alarm when more than 75% of the cache fraction allowed for pinned files
195
   * (50%) is filled with pinned files
196
   */
197
  static const unsigned kHighPinWatermark = 75;
198
199
  /**
200
   * The last bit in the sequence number indicates if an entry is volatile.
201
   * Such sequence numbers are negative and they are preferred during cleanup.
202
   * Volatile entries are used for instance for ALICE conditions data.
203
   */
204
  static const uint64_t kVolatileFlag = 1ULL << 63;
205
206
  bool InitDatabase(const bool rebuild_database);
207
  bool RebuildDatabase();
208
  void CloseDatabase();
209
  bool Contains(const std::string &hash_str);
210
  bool DoCleanup(const uint64_t leave_size);
211
212
  void MakeReturnPipe(int pipe[2]);
213
  int BindReturnPipe(int pipe_wronly);
214
  void UnbindReturnPipe(int pipe_wronly);
215
  void UnlinkReturnPipe(int pipe_wronly);
216
  void CloseReturnPipe(int pipe[2]);
217
  void CleanupPipes();
218
219
  void CheckFreeSpace();
220
  void CheckHighPinWatermark();
221
  void ProcessCommandBunch(const unsigned num,
222
                           const LruCommand *commands,
223
                           const char *descriptions);
224
  static void *MainCommandServer(void *data);
225
226
  void DoInsert(const shash::Any &hash, const uint64_t size,
227
                const std::string &description, const CommandType command_type);
228
  std::vector<std::string> DoList(const CommandType list_command);
229
  void GetSharedStatus(uint64_t *gauge, uint64_t *pinned);
230
  void GetLimits(uint64_t *limit, uint64_t *cleanup_threshold);
231
232
  static void ParseDirectories(const std::string cache_workspace,
233
                               std::string *cache_dir,
234
                               std::string *workspace_dir);
235
  PosixQuotaManager(const uint64_t limit, const uint64_t cleanup_threshold,
236
                    const std::string &cache_workspace);
237
238
  /**
239
   * Indicates if the cache manager is a shared process or a thread within the
240
   * same process (exclusive cache manager)
241
   */
242
  bool shared_;
243
244
 /**
245
  * True once the program switches into multi-threaded mode or the quota manager
246
  * process has been forked resp.
247
  */
248
  bool spawned_;
249
250
  /**
251
   * Soft limit in bytes, start cleanup when reached.
252
   */
253
  uint64_t limit_;
254
255
  /**
256
   * Cleanup until cleanup_threshold_ are left in the cache.
257
   */
258
  uint64_t cleanup_threshold_;
259
260
  /**
261
   * Current size of cache.
262
   */
263
  uint64_t gauge_;
264
265
  /**
266
   * Size of pinned files in bytes (usually file catalogs).
267
   */
268
  uint64_t pinned_;
269
270
  /**
271
   * Current access sequence number.  Gets increased on every access/insert
272
   * operation.
273
   */
274
  uint64_t seq_;
275
276
  /**
277
   * Should match the directory given to the cache manager.
278
   */
279
  std::string cache_dir_;
280
281
  /**
282
   * Directory for the database lock (shared manager) and the pipes (also
283
   * shared manager).  Usually the same as cache_dir_.  Can be different if
284
   * CVMFS_WORKSPACE or CVMFS_CACHE_WORKSPACE is set.
285
   */
286
  std::string workspace_dir_;
287
288
  /**
289
   * Pinned content hashes and their size.
290
   */
291
  std::map<shash::Any, uint64_t> pinned_chunks_;
292
293
  /**
294
   * Used to send RPCs to the quota manager thread or process.
295
   */
296
  int pipe_lru_[2];
297
298
  /**
299
   * In exclusive mode, controls the quota manager thread.
300
   */
301
  pthread_t thread_lru_;
302
303
  /**
304
   * Ensures exclusive cache database access through POSIX file lock.
305
   */
306
  int fd_lock_cachedb_;
307
308
  /**
309
   * If this is true, the unlink operations that correspond to a cleanup run
310
   * will be performed in a detached, asynchronous process.
311
   */
312
  bool async_delete_;
313
314
  /**
315
   * Keeps track of the number of cleanups over time.  Use by
316
   * `cvmfs_talk cleanup rate`
317
   */
318
  perf::MultiRecorder cleanup_recorder_;
319
320
  sqlite3 *database_;
321
  sqlite3_stmt *stmt_touch_;
322
  sqlite3_stmt *stmt_unpin_;
323
  sqlite3_stmt *stmt_block_;
324
  sqlite3_stmt *stmt_unblock_;
325
  sqlite3_stmt *stmt_new_;
326
  sqlite3_stmt *stmt_lru_;
327
  sqlite3_stmt *stmt_size_;
328
  sqlite3_stmt *stmt_rm_;
329
  sqlite3_stmt *stmt_list_;
330
  sqlite3_stmt *stmt_list_pinned_;  /**< Loaded catalogs are pinned. */
331
  sqlite3_stmt *stmt_list_catalogs_;
332
  sqlite3_stmt *stmt_list_volatile_;
333
334
  /**
335
   * Used in the destructor to steer closing of the database and so on.
336
   */
337
  bool initialized_;
338
};  // class PosixQuotaManager
339
340
#endif  // CVMFS_QUOTA_POSIX_H_