GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/quota_posix.cc Lines: 735 1001 73.4 %
Date: 2019-02-03 02:48:13 Branches: 290 483 60.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 *
4
 * This module implements a "managed local cache".
5
 * This way, we are able to track access times of files in the cache
6
 * and remove files based on least recently used strategy.
7
 *
8
 * We setup another SQLite catalog, a "cache catalog", that helps us
9
 * in the bookkeeping of files, file sizes and access times.
10
 *
11
 * We might choose to not manage the local cache.  This is indicated
12
 * by limit == 0 and everything succeeds in that case.
13
 */
14
15
#define __STDC_LIMIT_MACROS
16
#define __STDC_FORMAT_MACROS
17
18
#include "cvmfs_config.h"
19
#include "quota_posix.h"
20
21
#include <dirent.h>
22
#include <errno.h>
23
#include <fcntl.h>
24
#include <inttypes.h>
25
#include <pthread.h>
26
#include <signal.h>
27
#include <stdint.h>
28
#include <sys/dir.h>
29
#include <sys/stat.h>
30
#ifndef __APPLE__
31
#include <sys/statfs.h>
32
#endif
33
#include <sys/statvfs.h>
34
#include <sys/types.h>
35
#include <sys/wait.h>
36
#include <unistd.h>
37
38
#include <cassert>
39
#include <cstdio>
40
#include <cstdlib>
41
#include <cstring>
42
43
#include <map>
44
#include <set>
45
#include <string>
46
#include <vector>
47
48
#include "duplex_sqlite3.h"
49
#include "hash.h"
50
#include "logging.h"
51
#include "monitor.h"
52
#include "platform.h"
53
#include "smalloc.h"
54
#include "statistics.h"
55
#include "util/pointer.h"
56
#include "util/posix.h"
57
#include "util/string.h"
58
#include "util_concurrency.h"
59
60
using namespace std;  // NOLINT
61
62
63
91
int PosixQuotaManager::BindReturnPipe(int pipe_wronly) {
64
91
  if (!shared_)
65
89
    return pipe_wronly;
66
67
  // Connect writer's end
68
  int result =
69
    open((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str(),
70
2
         O_WRONLY | O_NONBLOCK);
71
2
  if (result >= 0) {
72
1
    Nonblock2Block(result);
73
  } else {
74
    LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
75
1
             "failed to bind return pipe (%d)", errno);
76
  }
77
2
  return result;
78
}
79
80
81
33
void PosixQuotaManager::CheckHighPinWatermark() {
82
33
  const uint64_t watermark = kHighPinWatermark*cleanup_threshold_/100;
83

33
  if ((cleanup_threshold_ > 0) && (pinned_ > watermark)) {
84
    LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn,
85
             "high watermark of pinned files (%" PRIu64 "M > %" PRIu64 "M)",
86
3
             pinned_/(1024*1024), watermark/(1024*1024));
87
3
    BroadcastBackchannels("R");  // clients: please release pinned catalogs
88
  }
89
33
}
90
91
92
void PosixQuotaManager::CleanupPipes() {
93
  DIR *dirp = opendir(workspace_dir_.c_str());
94
  assert(dirp != NULL);
95
96
  platform_dirent64 *dent;
97
  bool found_leftovers = false;
98
  while ((dent = platform_readdir(dirp)) != NULL) {
99
    const string name = dent->d_name;
100
    const string path = workspace_dir_ + "/" + name;
101
    platform_stat64 info;
102
    int retval = platform_stat(path.c_str(), &info);
103
    if (retval != 0)
104
      continue;
105
    if (S_ISFIFO(info.st_mode) && (name.substr(0, 4) == "pipe")) {
106
      if (!found_leftovers) {
107
        LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogWarn,
108
                 "removing left-over FIFOs from cache directory");
109
      }
110
      found_leftovers = true;
111
      unlink(path.c_str());
112
    }
113
  }
114
  closedir(dirp);
115
}
116
117
118
/**
119
 * Cleans up in data cache, until cache size is below leave_size.
120
 * The actual unlinking is done in a separate process (fork).
121
 *
122
 * \return True on success, false otherwise
123
 */
124
9
bool PosixQuotaManager::Cleanup(const uint64_t leave_size) {
125
9
  if (!spawned_)
126
    return DoCleanup(leave_size);
127
128
  bool result;
129
  int pipe_cleanup[2];
130
9
  MakeReturnPipe(pipe_cleanup);
131
132
9
  LruCommand cmd;
133
9
  cmd.command_type = kCleanup;
134
9
  cmd.size = leave_size;
135
9
  cmd.return_pipe = pipe_cleanup[1];
136
137
9
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
138
9
  ReadHalfPipe(pipe_cleanup[0], &result, sizeof(result));
139
9
  CloseReturnPipe(pipe_cleanup);
140
141
9
  return result;
142
}
143
144
145
106
void PosixQuotaManager::CloseDatabase() {
146
106
  if (stmt_list_catalogs_) sqlite3_finalize(stmt_list_catalogs_);
147
106
  if (stmt_list_pinned_) sqlite3_finalize(stmt_list_pinned_);
148
106
  if (stmt_list_volatile_) sqlite3_finalize(stmt_list_volatile_);
149
106
  if (stmt_list_) sqlite3_finalize(stmt_list_);
150
106
  if (stmt_lru_) sqlite3_finalize(stmt_lru_);
151
106
  if (stmt_rm_) sqlite3_finalize(stmt_rm_);
152
106
  if (stmt_size_) sqlite3_finalize(stmt_size_);
153
106
  if (stmt_touch_) sqlite3_finalize(stmt_touch_);
154
106
  if (stmt_unpin_) sqlite3_finalize(stmt_unpin_);
155
106
  if (stmt_block_) sqlite3_finalize(stmt_block_);
156
106
  if (stmt_unblock_) sqlite3_finalize(stmt_unblock_);
157
106
  if (stmt_new_) sqlite3_finalize(stmt_new_);
158
106
  if (database_) sqlite3_close(database_);
159
106
  UnlockFile(fd_lock_cachedb_);
160
161
106
  stmt_list_catalogs_ = NULL;
162
106
  stmt_list_pinned_ = NULL;
163
106
  stmt_list_volatile_ = NULL;
164
106
  stmt_list_ = NULL;
165
106
  stmt_rm_ = NULL;
166
106
  stmt_size_ = NULL;
167
106
  stmt_touch_ = NULL;
168
106
  stmt_unpin_ = NULL;
169
106
  stmt_block_ = NULL;
170
106
  stmt_unblock_ = NULL;
171
106
  stmt_new_ = NULL;
172
106
  database_ = NULL;
173
174
106
  pinned_chunks_.clear();
175
106
}
176
177
178
86
void PosixQuotaManager::CloseReturnPipe(int pipe[2]) {
179
86
  if (shared_) {
180
2
    close(pipe[0]);
181
2
    UnlinkReturnPipe(pipe[1]);
182
  } else {
183
84
    ClosePipe(pipe);
184
  }
185
86
}
186
187
188
87
bool PosixQuotaManager::Contains(const string &hash_str) {
189
87
  bool result = false;
190
191
  sqlite3_bind_text(stmt_size_, 1, &hash_str[0], hash_str.length(),
192
87
                    SQLITE_STATIC);
193
87
  if (sqlite3_step(stmt_size_) == SQLITE_ROW)
194
23
    result = true;
195
87
  sqlite3_reset(stmt_size_);
196
  LogCvmfs(kLogQuota, kLogDebug, "contains %s returns %d",
197
87
           hash_str.c_str(), result);
198
199
87
  return result;
200
}
201
202
203
106
void PosixQuotaManager::CheckFreeSpace() {
204

106
  if ((limit_ == 0) || (gauge_ >= limit_))
205
1
    return;
206
207
  struct statvfs vfs_info;
208
105
  int retval = statvfs((cache_dir_ + "/cachedb").c_str(), &vfs_info);
209
105
  if (retval != 0) {
210
    LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn,
211
             "failed to query %s for free space (%d)",
212
             cache_dir_.c_str(), errno);
213
    return;
214
  }
215
105
  int64_t free_space_byte = vfs_info.f_bavail * vfs_info.f_bsize;
216
  LogCvmfs(kLogQuota, kLogDebug, "free space: %" PRId64 " MB",
217
105
           free_space_byte / (1024 * 1024));
218
219
105
  int64_t required_byte = limit_ - gauge_;
220
105
  if (free_space_byte < required_byte) {
221
    LogCvmfs(kLogQuota, kLogSyslogWarn,
222
             "too little free space on the file system hosting the cache,"
223
             " %" PRId64 " MB available",
224
             free_space_byte / (1024 * 1024));
225
  }
226
}
227
228
229
109
PosixQuotaManager *PosixQuotaManager::Create(
230
  const string &cache_workspace,
231
  const uint64_t limit,
232
  const uint64_t cleanup_threshold,
233
  const bool rebuild_database)
234
{
235
109
  if (cleanup_threshold >= limit) {
236
    LogCvmfs(kLogQuota, kLogDebug, "invalid parameters: limit %" PRIu64 ", "
237
2
             "cleanup_threshold %" PRIu64, limit, cleanup_threshold);
238
2
    return NULL;
239
  }
240
241
  PosixQuotaManager *quota_manager =
242
107
    new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
243
244
  // Initialize cache catalog
245
107
  if (!quota_manager->InitDatabase(rebuild_database)) {
246
1
    delete quota_manager;
247
1
    return NULL;
248
  }
249
106
  quota_manager->CheckFreeSpace();
250
106
  MakePipe(quota_manager->pipe_lru_);
251
252
106
  quota_manager->protocol_revision_ = kProtocolRevision;
253
106
  quota_manager->initialized_ = true;
254
106
  return quota_manager;
255
}
256
257
258
/**
259
 * Connects to a running shared local quota manager.  Creates one if necessary.
260
 */
261
4
PosixQuotaManager *PosixQuotaManager::CreateShared(
262
  const std::string &exe_path,
263
  const std::string &cache_workspace,
264
  const uint64_t limit,
265
  const uint64_t cleanup_threshold,
266
  bool foreground)
267
{
268
4
  string cache_dir;
269
4
  string workspace_dir;
270
4
  ParseDirectories(cache_workspace, &cache_dir, &workspace_dir);
271
272
  // Create lock file: only one fuse client at a time
273
4
  const int fd_lockfile = LockFile(workspace_dir + "/lock_cachemgr");
274
4
  if (fd_lockfile < 0) {
275
    LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
276
1
             (workspace_dir + "/lock_cachemgr").c_str(), errno);
277
1
    return NULL;
278
  }
279
280
  PosixQuotaManager *quota_mgr =
281
3
    new PosixQuotaManager(limit, cleanup_threshold, cache_workspace);
282
3
  quota_mgr->shared_ = true;
283
3
  quota_mgr->spawned_ = true;
284
285
  // Try to connect to pipe
286
3
  const string fifo_path = workspace_dir + "/cachemgr";
287
3
  LogCvmfs(kLogQuota, kLogDebug, "trying to connect to existing pipe");
288
3
  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
289
3
  if (quota_mgr->pipe_lru_[1] >= 0) {
290
    LogCvmfs(kLogQuota, kLogDebug, "connected to existing cache manager pipe");
291
    quota_mgr->initialized_ = true;
292
    Nonblock2Block(quota_mgr->pipe_lru_[1]);
293
    UnlockFile(fd_lockfile);
294
    quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
295
    LogCvmfs(kLogQuota, kLogDebug,
296
             "received limit %" PRIu64 ", threshold %" PRIu64,
297
             quota_mgr->limit_, quota_mgr->cleanup_threshold_);
298
    if (FileExists(workspace_dir + "/cachemgr.protocol")) {
299
      quota_mgr->protocol_revision_ = quota_mgr->GetProtocolRevision();
300
      LogCvmfs(kLogQuota, kLogDebug, "connected protocol revision %u",
301
               quota_mgr->protocol_revision_);
302
    } else {
303
      LogCvmfs(kLogQuota, kLogDebug, "connected to ancient cache manager");
304
    }
305
    return quota_mgr;
306
  }
307
3
  const int connect_error = errno;
308
309
  // Lock file: let existing cache manager finish first
310
3
  const int fd_lockfile_fifo = LockFile(workspace_dir + "/lock_cachemgr.fifo");
311
3
  if (fd_lockfile_fifo < 0) {
312
    LogCvmfs(kLogQuota, kLogDebug, "could not open lock file %s (%d)",
313
             (workspace_dir + "/lock_cachemgr.fifo").c_str(), errno);
314
    UnlockFile(fd_lockfile);
315
    delete quota_mgr;
316
    return NULL;
317
  }
318
3
  UnlockFile(fd_lockfile_fifo);
319
320
3
  if (connect_error == ENXIO) {
321
    LogCvmfs(kLogQuota, kLogDebug, "left-over FIFO found, unlinking");
322
    unlink(fifo_path.c_str());
323
  }
324
325
  // Creating a new FIFO for the cache manager (to be bound later)
326
3
  int retval = mkfifo(fifo_path.c_str(), 0600);
327
3
  if (retval != 0) {
328
    LogCvmfs(kLogQuota, kLogDebug, "failed to create cache manager FIFO (%d)",
329
             errno);
330
    UnlockFile(fd_lockfile);
331
    delete quota_mgr;
332
    return NULL;
333
  }
334
335
  // Create new cache manager
336
  int pipe_boot[2];
337
  int pipe_handshake[2];
338
3
  MakePipe(pipe_boot);
339
3
  MakePipe(pipe_handshake);
340
341
3
  vector<string> command_line;
342
3
  command_line.push_back(exe_path);
343
3
  command_line.push_back("__cachemgr__");
344
3
  command_line.push_back(cache_workspace);
345
3
  command_line.push_back(StringifyInt(pipe_boot[1]));
346
3
  command_line.push_back(StringifyInt(pipe_handshake[0]));
347
3
  command_line.push_back(StringifyInt(limit));
348
3
  command_line.push_back(StringifyInt(cleanup_threshold));
349
3
  command_line.push_back(StringifyInt(foreground));
350
3
  command_line.push_back(StringifyInt(GetLogSyslogLevel()));
351
3
  command_line.push_back(StringifyInt(GetLogSyslogFacility()));
352
3
  command_line.push_back(GetLogDebugFile() + ":" + GetLogMicroSyslog());
353
354
3
  set<int> preserve_filedes;
355
3
  preserve_filedes.insert(0);
356
3
  preserve_filedes.insert(1);
357
3
  preserve_filedes.insert(2);
358
3
  preserve_filedes.insert(pipe_boot[1]);
359
3
  preserve_filedes.insert(pipe_handshake[0]);
360
361
3
  retval = ManagedExec(command_line, preserve_filedes, map<int, int>(), false);
362
3
  if (!retval) {
363
    UnlockFile(fd_lockfile);
364
    ClosePipe(pipe_boot);
365
    ClosePipe(pipe_handshake);
366
    delete quota_mgr;
367
    LogCvmfs(kLogQuota, kLogDebug, "failed to start cache manager");
368
    return NULL;
369
  }
370
371
  // Wait for cache manager to be ready
372
3
  close(pipe_boot[1]);
373
3
  close(pipe_handshake[0]);
374
  char buf;
375
3
  if (read(pipe_boot[0], &buf, 1) != 1) {
376
3
    UnlockFile(fd_lockfile);
377
3
    close(pipe_boot[0]);
378
3
    close(pipe_handshake[1]);
379
3
    delete quota_mgr;
380
    LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
381
3
             "cache manager did not start");
382
3
    return NULL;
383
  }
384
  close(pipe_boot[0]);
385
386
  // Connect write end
387
  quota_mgr->pipe_lru_[1] = open(fifo_path.c_str(), O_WRONLY | O_NONBLOCK);
388
  if (quota_mgr->pipe_lru_[1] < 0) {
389
    LogCvmfs(kLogQuota, kLogDebug,
390
             "failed to connect to newly created FIFO (%d)", errno);
391
    close(pipe_handshake[1]);
392
    UnlockFile(fd_lockfile);
393
    delete quota_mgr;
394
    return NULL;
395
  }
396
397
  // Finalize handshake
398
  buf = 'C';
399
  if (write(pipe_handshake[1], &buf, 1) != 1) {
400
    UnlockFile(fd_lockfile);
401
    close(pipe_handshake[1]);
402
    LogCvmfs(kLogQuota, kLogDebug, "could not finalize handshake");
403
    delete quota_mgr;
404
    return NULL;
405
  }
406
  close(pipe_handshake[1]);
407
408
  Nonblock2Block(quota_mgr->pipe_lru_[1]);
409
  LogCvmfs(kLogQuota, kLogDebug, "connected to a new cache manager");
410
  quota_mgr->protocol_revision_ = kProtocolRevision;
411
412
  UnlockFile(fd_lockfile);
413
414
  quota_mgr->initialized_ = true;
415
  quota_mgr->GetLimits(&quota_mgr->limit_, &quota_mgr->cleanup_threshold_);
416
  LogCvmfs(kLogQuota, kLogDebug, "received limit %" PRIu64 ", "
417
           "threshold %" PRIu64,
418
           quota_mgr->limit_, quota_mgr->cleanup_threshold_);
419
  return quota_mgr;
420
}
421
422
423
10
bool PosixQuotaManager::DoCleanup(const uint64_t leave_size) {
424
10
  if (gauge_ <= leave_size)
425
2
    return true;
426
427
  // TODO(jblomer) transaction
428
  LogCvmfs(kLogQuota, kLogSyslog,
429
8
           "clean up cache until at most %lu KB is used", leave_size/1024);
430
8
  LogCvmfs(kLogQuota, kLogDebug, "gauge %" PRIu64, gauge_);
431
8
  cleanup_recorder_.Tick();
432
433
  bool result;
434
8
  string hash_str;
435
8
  vector<string> trash;
436
437
15
  do {
438
16
    sqlite3_reset(stmt_lru_);
439
16
    if (sqlite3_step(stmt_lru_) != SQLITE_ROW) {
440
1
      LogCvmfs(kLogQuota, kLogDebug, "could not get lru-entry");
441
1
      break;
442
    }
443
444
    hash_str = string(reinterpret_cast<const char *>(
445
15
                      sqlite3_column_text(stmt_lru_, 0)));
446
15
    LogCvmfs(kLogQuota, kLogDebug, "removing %s", hash_str.c_str());
447
15
    shash::Any hash = shash::MkFromHexPtr(shash::HexPtr(hash_str));
448
449
    // That's a critical condition.  We must not delete a not yet inserted
450
    // pinned file as it is already reserved (but will be inserted later).
451
    // Instead, set the pin bit in the db to not run into an endless loop
452
15
    if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
453
14
      trash.push_back(cache_dir_ + "/" + hash.MakePathWithoutSuffix());
454
14
      gauge_ -= sqlite3_column_int64(stmt_lru_, 1);
455
      LogCvmfs(kLogQuota, kLogDebug, "lru cleanup %s, new gauge %" PRIu64,
456
14
               hash_str.c_str(), gauge_);
457
458
      sqlite3_bind_text(stmt_rm_, 1, &hash_str[0], hash_str.length(),
459
14
                        SQLITE_STATIC);
460
14
      result = (sqlite3_step(stmt_rm_) == SQLITE_DONE);
461
14
      sqlite3_reset(stmt_rm_);
462
463
14
      if (!result) {
464
        LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
465
                 "failed to find %s in cache database (%d). "
466
                 "Cache database is out of sync. "
467
                 "Restart cvmfs with clean cache.", hash_str.c_str(), result);
468
        return false;
469
      }
470
    } else {
471
      sqlite3_bind_text(stmt_block_, 1, &hash_str[0], hash_str.length(),
472
1
                        SQLITE_STATIC);
473
1
      result = (sqlite3_step(stmt_block_) == SQLITE_DONE);
474
1
      sqlite3_reset(stmt_block_);
475
1
      assert(result);
476
    }
477
  } while (gauge_ > leave_size);
478
479
8
  result = (sqlite3_step(stmt_unblock_) == SQLITE_DONE);
480
8
  sqlite3_reset(stmt_unblock_);
481
8
  assert(result);
482
483
  // Double fork avoids zombie, forked removal process must not flush file
484
  // buffers
485
8
  if (!trash.empty()) {
486
8
    if (async_delete_) {
487
      pid_t pid;
488
      int statloc;
489
6
      if ((pid = fork()) == 0) {
490
        // TODO(jblomer): eviciting files in the cache should perhaps become a
491
        // thread.  This would also allow to block the chunks and prevent the
492
        // race with re-insertion.  Then again, a thread can block umount.
493
#ifndef DEBUGMSG
494
        int max_fd = sysconf(_SC_OPEN_MAX);
495
        for (int i = 0; i < max_fd; ++i)
496
          close(i);
497
#endif
498
        if (fork() == 0) {
499
          for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
500
            LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
501
            unlink(trash[i].c_str());
502
          }
503
          _exit(0);
504
        }
505
        _exit(0);
506
      } else {
507
6
        if (pid > 0)
508
6
          waitpid(pid, &statloc, 0);
509
        else
510
          return false;
511
      }
512
    } else {  // !async_delete_
513
5
      for (unsigned i = 0, iEnd = trash.size(); i < iEnd; ++i) {
514
3
        LogCvmfs(kLogQuota, kLogDebug, "unlink %s", trash[i].c_str());
515
3
        unlink(trash[i].c_str());
516
      }
517
    }
518
  }
519
520
8
  if (gauge_ > leave_size) {
521
    LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn,
522
             "request to clean until %" PRIu64 ", "
523
1
             "but effective gauge is %" PRIu64, leave_size, gauge_);
524
1
    return false;
525
  }
526
7
  return true;
527
}
528
529
530
59
void PosixQuotaManager::DoInsert(
531
  const shash::Any &hash,
532
  const uint64_t size,
533
  const string &description,
534
  const CommandType command_type)
535
{
536
59
  const string hash_str = hash.ToString();
537
  LogCvmfs(kLogQuota, kLogDebug, "insert into lru %s, path %s, method %d",
538
59
           hash_str.c_str(), description.c_str(), command_type);
539
  const unsigned desc_length = (description.length() > kMaxDescription) ?
540
59
    kMaxDescription : description.length();
541
542
  LruCommand *cmd =
543
59
    reinterpret_cast<LruCommand *>(alloca(sizeof(LruCommand) + desc_length));
544
59
  new (cmd) LruCommand;
545
59
  cmd->command_type = command_type;
546
59
  cmd->SetSize(size);
547
59
  cmd->StoreHash(hash);
548
59
  cmd->desc_length = desc_length;
549
  memcpy(reinterpret_cast<char *>(cmd)+sizeof(LruCommand),
550
59
         &description[0], desc_length);
551
59
  WritePipe(pipe_lru_[1], cmd, sizeof(LruCommand) + desc_length);
552
59
}
553
554
555
35
vector<string> PosixQuotaManager::DoList(const CommandType list_command) {
556
35
  vector<string> result;
557
558
  int pipe_list[2];
559
35
  MakeReturnPipe(pipe_list);
560
  char description_buffer[kMaxDescription];
561
562
35
  LruCommand cmd;
563
35
  cmd.command_type = list_command;
564
35
  cmd.return_pipe = pipe_list[1];
565
35
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
566
567
  int length;
568
86
  do {
569
86
    ReadHalfPipe(pipe_list[0], &length, sizeof(length));
570
86
    if (length > 0) {
571
51
      ReadPipe(pipe_list[0], description_buffer, length);
572
51
      result.push_back(string(description_buffer, length));
573
    }
574
  } while (length >= 0);
575
576
35
  CloseReturnPipe(pipe_list);
577
35
  return result;
578
}
579
580
581
55
uint64_t PosixQuotaManager::GetCapacity() {
582
55
  if (limit_ != (uint64_t)(-1))
583
55
    return limit_;
584
585
  // Unrestricted cache, look at free space on cache dir fs
586
  struct statfs info;
587
  if (statfs(".", &info) == 0) {
588
    return info.f_bavail * info.f_bsize;
589
  } else {
590
    LogCvmfs(kLogQuota, kLogSyslogErr | kLogDebug,
591
             "failed to query file system info of cache (%d)", errno);
592
    return limit_;
593
  }
594
}
595
596
597
void PosixQuotaManager::GetLimits(uint64_t *limit, uint64_t *cleanup_threshold)
598
{
599
  int pipe_limits[2];
600
  MakeReturnPipe(pipe_limits);
601
602
  LruCommand cmd;
603
  cmd.command_type = kLimits;
604
  cmd.return_pipe = pipe_limits[1];
605
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
606
  ReadHalfPipe(pipe_limits[0], limit, sizeof(*limit));
607
  ReadPipe(pipe_limits[0], cleanup_threshold, sizeof(*cleanup_threshold));
608
  CloseReturnPipe(pipe_limits);
609
}
610
611
612
/**
613
 * Since we only cleanup until cleanup_threshold, we can only add
614
 * files smaller than limit-cleanup_threshold.
615
 */
616
11
uint64_t PosixQuotaManager::GetMaxFileSize() {
617
11
  return limit_ - cleanup_threshold_;
618
}
619
620
621
1
pid_t PosixQuotaManager::GetPid() {
622

1
  if (!shared_ || !spawned_) {
623
1
    return getpid();
624
  }
625
626
  pid_t result;
627
  int pipe_pid[2];
628
  MakeReturnPipe(pipe_pid);
629
630
  LruCommand cmd;
631
  cmd.command_type = kPid;
632
  cmd.return_pipe = pipe_pid[1];
633
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
634
  ReadHalfPipe(pipe_pid[0], &result, sizeof(result));
635
  CloseReturnPipe(pipe_pid);
636
  return result;
637
}
638
639
640
1
uint32_t PosixQuotaManager::GetProtocolRevision() {
641
  int pipe_revision[2];
642
1
  MakeReturnPipe(pipe_revision);
643
644
1
  LruCommand cmd;
645
1
  cmd.command_type = kGetProtocolRevision;
646
1
  cmd.return_pipe = pipe_revision[1];
647
1
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
648
649
  uint32_t revision;
650
1
  ReadHalfPipe(pipe_revision[0], &revision, sizeof(revision));
651
1
  CloseReturnPipe(pipe_revision);
652
1
  return revision;
653
}
654
655
656
/**
657
 * Queries the shared local hard disk quota manager.
658
 */
659
18
void PosixQuotaManager::GetSharedStatus(uint64_t *gauge, uint64_t *pinned) {
660
  int pipe_status[2];
661
18
  MakeReturnPipe(pipe_status);
662
663
18
  LruCommand cmd;
664
18
  cmd.command_type = kStatus;
665
18
  cmd.return_pipe = pipe_status[1];
666
18
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
667
18
  ReadHalfPipe(pipe_status[0], gauge, sizeof(*gauge));
668
18
  ReadPipe(pipe_status[0], pinned, sizeof(*pinned));
669
18
  CloseReturnPipe(pipe_status);
670
18
}
671
672
673
126
uint64_t PosixQuotaManager::GetSize() {
674
126
  if (!spawned_) return gauge_;
675
  uint64_t gauge, size_pinned;
676
16
  GetSharedStatus(&gauge, &size_pinned);
677
16
  return gauge;
678
}
679
680
681
2
uint64_t PosixQuotaManager::GetSizePinned() {
682
2
  if (!spawned_) return pinned_;
683
  uint64_t gauge, size_pinned;
684
2
  GetSharedStatus(&gauge, &size_pinned);
685
2
  return size_pinned;
686
}
687
688
689
4
uint64_t PosixQuotaManager::GetCleanupRate(uint64_t period_s) {
690

4
  if (!spawned_ || (protocol_revision_ < 2)) return 0;
691
  uint64_t cleanup_rate;
692
693
  int pipe_cleanup_rate[2];
694
4
  MakeReturnPipe(pipe_cleanup_rate);
695
4
  LruCommand cmd;
696
4
  cmd.command_type = kCleanupRate;
697
4
  cmd.size = period_s;
698
4
  cmd.return_pipe = pipe_cleanup_rate[1];
699
4
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
700
4
  ReadHalfPipe(pipe_cleanup_rate[0], &cleanup_rate, sizeof(cleanup_rate));
701
4
  CloseReturnPipe(pipe_cleanup_rate);
702
703
4
  return cleanup_rate;
704
}
705
706
707
112
bool PosixQuotaManager::InitDatabase(const bool rebuild_database) {
708
112
  string sql;
709
  sqlite3_stmt *stmt;
710
711
112
  fd_lock_cachedb_ = LockFile(workspace_dir_ + "/lock_cachedb");
712
112
  if (fd_lock_cachedb_ < 0) {
713
1
    LogCvmfs(kLogQuota, kLogDebug, "failed to create cachedb lock");
714
1
    return false;
715
  }
716
717
111
  bool retry = false;
718
111
  const string db_file = cache_dir_ + "/cachedb";
719
111
  if (rebuild_database) {
720
    LogCvmfs(kLogQuota, kLogDebug, "rebuild database, unlinking existing (%s)",
721
6
             db_file.c_str());
722
6
    unlink(db_file.c_str());
723
6
    unlink((db_file + "-journal").c_str());
724
  }
725
726
 init_recover:
727
111
  int err = sqlite3_open(db_file.c_str(), &database_);
728
111
  if (err != SQLITE_OK) {
729
    LogCvmfs(kLogQuota, kLogDebug, "could not open cache database (%d)", err);
730
    goto init_database_fail;
731
  }
732
  // TODO(reneme): make this a `QuotaDatabase : public sqlite::Database`
733
  sql = "PRAGMA synchronous=0; PRAGMA locking_mode=EXCLUSIVE; "
734
    "PRAGMA auto_vacuum=1; "
735
    "CREATE TABLE IF NOT EXISTS cache_catalog (sha1 TEXT, size INTEGER, "
736
    "  acseq INTEGER, path TEXT, type INTEGER, pinned INTEGER, "
737
    "CONSTRAINT pk_cache_catalog PRIMARY KEY (sha1)); "
738
    "CREATE UNIQUE INDEX IF NOT EXISTS idx_cache_catalog_acseq "
739
    "  ON cache_catalog (acseq); "
740
    "CREATE TEMP TABLE fscache (sha1 TEXT, size INTEGER, actime INTEGER, "
741
    "CONSTRAINT pk_fscache PRIMARY KEY (sha1)); "
742
    "CREATE INDEX idx_fscache_actime ON fscache (actime); "
743
    "CREATE TABLE IF NOT EXISTS properties (key TEXT, value TEXT, "
744
111
    "  CONSTRAINT pk_properties PRIMARY KEY(key));";
745
111
  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
746
111
  if (err != SQLITE_OK) {
747
    if (!retry) {
748
      retry = true;
749
      sqlite3_close(database_);
750
      unlink(db_file.c_str());
751
      unlink((db_file + "-journal").c_str());
752
      LogCvmfs(kLogQuota, kLogSyslogWarn,
753
               "LRU database corrupted, re-building");
754
      goto init_recover;
755
    }
756
    LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
757
             sql.c_str());
758
    goto init_database_fail;
759
  }
760
761
  // If this an old cache catalog,
762
  // add and initialize new columns to cache_catalog
763
  sql = "ALTER TABLE cache_catalog ADD type INTEGER; "
764
111
        "ALTER TABLE cache_catalog ADD pinned INTEGER";
765
111
  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
766
111
  if (err == SQLITE_OK) {
767
    sql = "UPDATE cache_catalog SET type=" + StringifyInt(kFileRegular) + ";";
768
    err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
769
    if (err != SQLITE_OK) {
770
      LogCvmfs(kLogQuota, kLogDebug,
771
               "could not init cache database (failed: %s)", sql.c_str());
772
      goto init_database_fail;
773
    }
774
  }
775
776
  // Set pinned back
777
111
  sql = "UPDATE cache_catalog SET pinned=0;";
778
111
  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
779
111
  if (err != SQLITE_OK) {
780
    LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
781
             sql.c_str());
782
    goto init_database_fail;
783
  }
784
785
  // Set schema version
786
  sql = "INSERT OR REPLACE INTO properties (key, value) "
787
111
  "VALUES ('schema', '1.0')";
788
111
  err = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
789
111
  if (err != SQLITE_OK) {
790
    LogCvmfs(kLogQuota, kLogDebug, "could not init cache database (failed: %s)",
791
             sql.c_str());
792
    goto init_database_fail;
793
  }
794
795
  // If cache catalog is empty, recreate from file system
796
111
  sql = "SELECT count(*) FROM cache_catalog;";
797
111
  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
798
111
  if (sqlite3_step(stmt) == SQLITE_ROW) {
799

111
    if ((sqlite3_column_int64(stmt, 0)) == 0 || rebuild_database) {
800
      LogCvmfs(kLogCvmfs, kLogDebug,
801
107
               "CernVM-FS: building lru cache database...");
802
107
      if (!RebuildDatabase()) {
803
        LogCvmfs(kLogQuota, kLogDebug,
804
3
                 "could not build cache database from file system");
805
3
        sqlite3_finalize(stmt);
806
3
        goto init_database_fail;
807
      }
808
    }
809
108
    sqlite3_finalize(stmt);
810
  } else {
811
    LogCvmfs(kLogQuota, kLogDebug, "could not select on cache catalog");
812
    sqlite3_finalize(stmt);
813
    goto init_database_fail;
814
  }
815
816
  // How many bytes do we already have in cache?
817
108
  sql = "SELECT sum(size) FROM cache_catalog;";
818
108
  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
819
108
  if (sqlite3_step(stmt) == SQLITE_ROW) {
820
108
    gauge_ = sqlite3_column_int64(stmt, 0);
821
  } else {
822
    LogCvmfs(kLogQuota, kLogDebug, "could not determine cache size");
823
    sqlite3_finalize(stmt);
824
    goto init_database_fail;
825
  }
826
108
  sqlite3_finalize(stmt);
827
828
  // Highest seq-no?
829
108
  sql = "SELECT coalesce(max(acseq & (~(1<<63))), 0) FROM cache_catalog;";
830
108
  sqlite3_prepare_v2(database_, sql.c_str(), -1, &stmt, NULL);
831
108
  if (sqlite3_step(stmt) == SQLITE_ROW) {
832
108
    seq_ = sqlite3_column_int64(stmt, 0)+1;
833
  } else {
834
    LogCvmfs(kLogQuota, kLogDebug, "could not determine highest seq-no");
835
    sqlite3_finalize(stmt);
836
    goto init_database_fail;
837
  }
838
108
  sqlite3_finalize(stmt);
839
840
  // Prepare touch, new, remove statements
841
  sqlite3_prepare_v2(database_,
842
                     "UPDATE cache_catalog SET acseq=:seq | (acseq&(1<<63)) "
843
108
                     "WHERE sha1=:sha1;", -1, &stmt_touch_, NULL);
844
  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=0 "
845
108
                     "WHERE sha1=:sha1;", -1, &stmt_unpin_, NULL);
846
  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=2 "
847
108
                     "WHERE sha1=:sha1;", -1, &stmt_block_, NULL);
848
  sqlite3_prepare_v2(database_, "UPDATE cache_catalog SET pinned=1 "
849
108
                     "WHERE pinned=2;", -1, &stmt_unblock_, NULL);
850
  sqlite3_prepare_v2(database_,
851
                     "INSERT OR REPLACE INTO cache_catalog "
852
                     "(sha1, size, acseq, path, type, pinned) "
853
                     "VALUES (:sha1, :s, :seq, :p, :t, :pin);",
854
108
                     -1, &stmt_new_, NULL);
855
  sqlite3_prepare_v2(database_,
856
                     "SELECT size, pinned FROM cache_catalog WHERE sha1=:sha1;",
857
108
                     -1, &stmt_size_, NULL);
858
  sqlite3_prepare_v2(database_, "DELETE FROM cache_catalog WHERE sha1=:sha1;",
859
108
                     -1, &stmt_rm_, NULL);
860
  sqlite3_prepare_v2(database_,
861
                     "SELECT sha1, size FROM cache_catalog WHERE "
862
                     "acseq=(SELECT min(acseq) "
863
                     "FROM cache_catalog WHERE pinned<>2);",
864
108
                     -1, &stmt_lru_, NULL);
865
  sqlite3_prepare_v2(database_,
866
                     ("SELECT path FROM cache_catalog WHERE type=" +
867
                      StringifyInt(kFileRegular) +
868
108
                      ";").c_str(), -1, &stmt_list_, NULL);
869
  sqlite3_prepare_v2(database_,
870
                     "SELECT path FROM cache_catalog WHERE pinned<>0;",
871
108
                     -1, &stmt_list_pinned_, NULL);
872
  sqlite3_prepare_v2(database_,
873
                     "SELECT path FROM cache_catalog WHERE acseq < 0;",
874
108
                     -1, &stmt_list_volatile_, NULL);
875
  sqlite3_prepare_v2(database_,
876
                     ("SELECT path FROM cache_catalog WHERE type=" +
877
                      StringifyInt(kFileCatalog) +
878
108
                      ";").c_str(), -1, &stmt_list_catalogs_, NULL);
879
108
  return true;
880
881
 init_database_fail:
882
3
  sqlite3_close(database_);
883
3
  database_ = NULL;
884
3
  UnlockFile(fd_lock_cachedb_);
885
3
  return false;
886
}
887
888
889
/**
890
 * Inserts a new file into cache catalog.  This file gets a new,
891
 * highest sequence number. Does cache cleanup if necessary.
892
 */
893
42
void PosixQuotaManager::Insert(
894
  const shash::Any &any_hash,
895
  const uint64_t size,
896
  const string &description)
897
{
898
42
  DoInsert(any_hash, size, description, kInsert);
899
42
}
900
901
902
/**
903
 * Inserts a new file into cache catalog.  This file is marked as volatile
904
 * and gets a new highest sequence number with the first bit set.  Cache cleanup
905
 * treats these files with priority.
906
 */
907
4
void PosixQuotaManager::InsertVolatile(
908
  const shash::Any &any_hash,
909
  const uint64_t size,
910
  const string &description)
911
{
912
4
  DoInsert(any_hash, size, description, kInsertVolatile);
913
4
}
914
915
916
/**
917
 * Lists all path names from the cache db.
918
 */
919
21
vector<string> PosixQuotaManager::List() {
920
21
  return DoList(kList);
921
}
922
923
924
/**
925
 * Lists all pinned files from the cache db.
926
 */
927
8
vector<string> PosixQuotaManager::ListPinned() {
928
8
  return DoList(kListPinned);
929
}
930
931
932
/**
933
 * Lists all sqlite catalog files from the cache db.
934
 */
935
3
vector<string> PosixQuotaManager::ListCatalogs() {
936
3
  return DoList(kListCatalogs);
937
}
938
939
940
/**
941
 * Lists only files flagged as volatile (priority removal)
942
 */
943
3
vector<string> PosixQuotaManager::ListVolatile() {
944
3
  return DoList(kListVolatile);
945
}
946
947
948
/**
949
 * Entry point for the shared cache manager process
950
 */
951
int PosixQuotaManager::MainCacheManager(int argc, char **argv) {
952
  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
953
  int retval;
954
955
  UniquePtr<Watchdog> watchdog(Watchdog::Create("./stacktrace.cachemgr"));
956
  assert(watchdog.IsValid());
957
  watchdog->Spawn();
958
959
  PosixQuotaManager shared_manager(0, 0, "");
960
  shared_manager.shared_ = true;
961
  shared_manager.spawned_ = true;
962
  shared_manager.pinned_ = 0;
963
964
  // Process command line arguments
965
  ParseDirectories(string(argv[2]),
966
                   &shared_manager.cache_dir_,
967
                   &shared_manager.workspace_dir_);
968
  int pipe_boot = String2Int64(argv[3]);
969
  int pipe_handshake = String2Int64(argv[4]);
970
  shared_manager.limit_ = String2Int64(argv[5]);
971
  shared_manager.cleanup_threshold_ = String2Int64(argv[6]);
972
  int foreground = String2Int64(argv[7]);
973
  int syslog_level = String2Int64(argv[8]);
974
  int syslog_facility = String2Int64(argv[9]);
975
  vector<string> logfiles = SplitString(argv[10], ':');
976
977
  SetLogSyslogLevel(syslog_level);
978
  SetLogSyslogFacility(syslog_facility);
979
  if ((logfiles.size() > 0) && (logfiles[0] != ""))
980
    SetLogDebugFile(logfiles[0] + ".cachemgr");
981
  if (logfiles.size() > 1)
982
    SetLogMicroSyslog(logfiles[1]);
983
984
  if (!foreground)
985
    Daemonize();
986
987
  // Initialize pipe, open non-blocking as cvmfs is not yet connected
988
  const int fd_lockfile_fifo =
989
    LockFile(shared_manager.workspace_dir_ + "/lock_cachemgr.fifo");
990
  if (fd_lockfile_fifo < 0) {
991
    LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr, "could not open lock file "
992
             "%s (%d)",
993
             (shared_manager.workspace_dir_ + "/lock_cachemgr.fifo").c_str(),
994
             errno);
995
    return 1;
996
  }
997
  const string crash_guard = shared_manager.cache_dir_ + "/cachemgr.running";
998
  const bool rebuild = FileExists(crash_guard);
999
  retval = open(crash_guard.c_str(), O_RDONLY | O_CREAT, 0600);
1000
  if (retval < 0) {
1001
    LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogErr,
1002
             "failed to create shared cache manager crash guard");
1003
    UnlockFile(fd_lockfile_fifo);
1004
    return 1;
1005
  }
1006
  close(retval);
1007
1008
  // Redirect SQlite temp directory to cache (global variable)
1009
  const string tmp_dir = shared_manager.workspace_dir_;
1010
  sqlite3_temp_directory =
1011
    static_cast<char *>(sqlite3_malloc(tmp_dir.length() + 1));
1012
  snprintf(sqlite3_temp_directory, tmp_dir.length() + 1, "%s", tmp_dir.c_str());
1013
1014
  // Cleanup leftover named pipes
1015
  shared_manager.CleanupPipes();
1016
1017
  if (!shared_manager.InitDatabase(rebuild)) {
1018
    UnlockFile(fd_lockfile_fifo);
1019
    return 1;
1020
  }
1021
  shared_manager.CheckFreeSpace();
1022
1023
  // Save protocol revision to file.  If the file is not found, it indicates
1024
  // to the client that the cache manager is from times before the protocol
1025
  // was versioned.
1026
  const string protocol_revision_path =
1027
    shared_manager.workspace_dir_ + "/cachemgr.protocol";
1028
  retval = open(protocol_revision_path.c_str(), O_WRONLY | O_CREAT, 0600);
1029
  if (retval < 0) {
1030
    LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogErr,
1031
             "failed to open protocol revision file (%d)", errno);
1032
    UnlockFile(fd_lockfile_fifo);
1033
    return 1;
1034
  }
1035
  const string revision = StringifyInt(kProtocolRevision);
1036
  int written = write(retval, revision.data(), revision.length());
1037
  close(retval);
1038
  if ((written < 0) || static_cast<unsigned>(written) != revision.length()) {
1039
    LogCvmfs(kLogCvmfs, kLogDebug | kLogSyslogErr,
1040
             "failed to write protocol revision (%d)", errno);
1041
    UnlockFile(fd_lockfile_fifo);
1042
    return 1;
1043
  }
1044
1045
  const string fifo_path = shared_manager.workspace_dir_ + "/cachemgr";
1046
  shared_manager.pipe_lru_[0] = open(fifo_path.c_str(), O_RDONLY | O_NONBLOCK);
1047
  if (shared_manager.pipe_lru_[0] < 0) {
1048
    LogCvmfs(kLogQuota, kLogDebug, "failed to listen on FIFO %s (%d)",
1049
             fifo_path.c_str(), errno);
1050
    UnlockFile(fd_lockfile_fifo);
1051
    return 1;
1052
  }
1053
  Nonblock2Block(shared_manager.pipe_lru_[0]);
1054
  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager listening");
1055
1056
  char buf = 'C';
1057
  WritePipe(pipe_boot, &buf, 1);
1058
  close(pipe_boot);
1059
1060
  ReadPipe(pipe_handshake, &buf, 1);
1061
  close(pipe_handshake);
1062
  LogCvmfs(kLogQuota, kLogDebug, "shared cache manager handshake done");
1063
1064
  // Ensure that broken pipes from clients do not kill the cache manager
1065
  signal(SIGPIPE, SIG_IGN);
1066
  // Don't let Ctrl-C ungracefully kill interactive session
1067
  signal(SIGINT, SIG_IGN);
1068
1069
  shared_manager.MainCommandServer(&shared_manager);
1070
  unlink(fifo_path.c_str());
1071
  unlink(protocol_revision_path.c_str());
1072
  shared_manager.CloseDatabase();
1073
  unlink(crash_guard.c_str());
1074
  UnlockFile(fd_lockfile_fifo);
1075
1076
  if (sqlite3_temp_directory) {
1077
    sqlite3_free(sqlite3_temp_directory);
1078
    sqlite3_temp_directory = NULL;
1079
  }
1080
1081
  return 0;
1082
}
1083
1084
1085
30
void *PosixQuotaManager::MainCommandServer(void *data) {
1086
30
  PosixQuotaManager *quota_mgr = static_cast<PosixQuotaManager *>(data);
1087
1088
30
  LogCvmfs(kLogQuota, kLogDebug, "starting quota manager");
1089
30
  sqlite3_soft_heap_limit(quota_mgr->kSqliteMemPerThread);
1090
1091
30
  LruCommand command_buffer[kCommandBufferSize];
1092
  char description_buffer[kCommandBufferSize*kMaxDescription];
1093
30
  unsigned num_commands = 0;
1094
1095
207
  while (read(quota_mgr->pipe_lru_[0], &command_buffer[num_commands],
1096
              sizeof(command_buffer[0])) == sizeof(command_buffer[0]))
1097
  {
1098
147
    const CommandType command_type = command_buffer[num_commands].command_type;
1099
147
    LogCvmfs(kLogQuota, kLogDebug, "received command %d", command_type);
1100
147
    const uint64_t size = command_buffer[num_commands].GetSize();
1101
1102
    // Inserts and pins come with a description (usually a path)
1103


147
    if ((command_type == kInsert) || (command_type == kInsertVolatile) ||
1104
        (command_type == kPin) || (command_type == kPinRegular))
1105
    {
1106
46
      const int desc_length = command_buffer[num_commands].desc_length;
1107
      ReadPipe(quota_mgr->pipe_lru_[0],
1108
46
               &description_buffer[kMaxDescription*num_commands], desc_length);
1109
    }
1110
1111
    // The protocol revision is returned immediately
1112
147
    if (command_type == kGetProtocolRevision) {
1113
      int return_pipe =
1114
1
        quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1115
1
      if (return_pipe < 0)
1116
        continue;
1117
      WritePipe(return_pipe, &quota_mgr->kProtocolRevision,
1118
1
                sizeof(quota_mgr->kProtocolRevision));
1119
1
      quota_mgr->UnbindReturnPipe(return_pipe);
1120
1
      continue;
1121
    }
1122
1123
    // The cleanup rate is returned immediately
1124
146
    if (command_type == kCleanupRate) {
1125
      int return_pipe =
1126
4
        quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1127
4
      if (return_pipe < 0)
1128
        continue;
1129
4
      uint64_t period_s = size;  // use the size field to transmit the period
1130
4
      uint64_t rate = quota_mgr->cleanup_recorder_.GetNoTicks(period_s);
1131
4
      WritePipe(return_pipe, &rate, sizeof(rate));
1132
4
      quota_mgr->UnbindReturnPipe(return_pipe);
1133
4
      continue;
1134
    }
1135
1136
    // Reservations are handled immediately and "out of band"
1137
142
    if (command_type == kReserve) {
1138
14
      bool success = true;
1139
      int return_pipe =
1140
14
        quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1141
14
      if (return_pipe < 0)
1142
        continue;
1143
1144
14
      const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1145
14
      const string hash_str(hash.ToString());
1146
      LogCvmfs(kLogQuota, kLogDebug, "reserve %d bytes for %s",
1147
14
               size, hash_str.c_str());
1148
1149
14
      if (quota_mgr->pinned_chunks_.find(hash) ==
1150
          quota_mgr->pinned_chunks_.end())
1151
      {
1152
12
        if ((quota_mgr->pinned_ + size) > quota_mgr->cleanup_threshold_) {
1153
          LogCvmfs(kLogQuota, kLogDebug,
1154
1
                   "failed to insert %s (pinned), no space", hash_str.c_str());
1155
1
          success = false;
1156
        } else {
1157
11
          quota_mgr->pinned_chunks_[hash] = size;
1158
11
          quota_mgr->pinned_ += size;
1159
11
          quota_mgr->CheckHighPinWatermark();
1160
        }
1161
      }
1162
1163
14
      WritePipe(return_pipe, &success, sizeof(success));
1164
14
      quota_mgr->UnbindReturnPipe(return_pipe);
1165
14
      continue;
1166
    }
1167
1168
    // Back channels are also handled out of band
1169
128
    if (command_type == kRegisterBackChannel) {
1170
      int return_pipe =
1171
4
        quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1172
4
      if (return_pipe < 0)
1173
        continue;
1174
1175
4
      quota_mgr->UnlinkReturnPipe(command_buffer[num_commands].return_pipe);
1176
4
      Block2Nonblock(return_pipe);  // back channels are opportunistic
1177
4
      shash::Md5 hash;
1178
      memcpy(hash.digest, command_buffer[num_commands].digest,
1179
4
             shash::kDigestSizes[shash::kMd5]);
1180
1181
4
      quota_mgr->LockBackChannels();
1182
      map<shash::Md5, int>::const_iterator iter =
1183
4
        quota_mgr->back_channels_.find(hash);
1184
4
      if (iter != quota_mgr->back_channels_.end()) {
1185
        LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn,
1186
                 "closing left-over back channel %s", hash.ToString().c_str());
1187
        close(iter->second);
1188
      }
1189
4
      quota_mgr->back_channels_[hash] = return_pipe;
1190
4
      quota_mgr->UnlockBackChannels();
1191
1192
4
      char success = 'S';
1193
4
      WritePipe(return_pipe, &success, sizeof(success));
1194
      LogCvmfs(kLogQuota, kLogDebug, "register back channel %s on fd %d",
1195
4
               hash.ToString().c_str(), return_pipe);
1196
1197
4
      continue;
1198
    }
1199
1200
124
    if (command_type == kUnregisterBackChannel) {
1201
2
      shash::Md5 hash;
1202
      memcpy(hash.digest, command_buffer[num_commands].digest,
1203
2
             shash::kDigestSizes[shash::kMd5]);
1204
1205
2
      quota_mgr->LockBackChannels();
1206
      map<shash::Md5, int>::iterator iter =
1207
2
        quota_mgr->back_channels_.find(hash);
1208
2
      if (iter != quota_mgr->back_channels_.end()) {
1209
        LogCvmfs(kLogQuota, kLogDebug,
1210
2
                 "closing back channel %s", hash.ToString().c_str());
1211
2
        close(iter->second);
1212
2
        quota_mgr->back_channels_.erase(iter);
1213
      } else {
1214
        LogCvmfs(kLogQuota, kLogDebug | kLogSyslogWarn,
1215
                 "did not find back channel %s", hash.ToString().c_str());
1216
      }
1217
2
      quota_mgr->UnlockBackChannels();
1218
1219
2
      continue;
1220
    }
1221
1222
    // Unpinnings are also handled immediately with respect to the pinned gauge
1223
122
    if (command_type == kUnpin) {
1224
2
      const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1225
2
      const string hash_str(hash.ToString());
1226
1227
      map<shash::Any, uint64_t>::iterator iter =
1228
2
        quota_mgr->pinned_chunks_.find(hash);
1229
2
      if (iter != quota_mgr->pinned_chunks_.end()) {
1230
2
        quota_mgr->pinned_ -= iter->second;
1231
2
        quota_mgr->pinned_chunks_.erase(iter);
1232
        // It can happen that files get pinned that were removed from the cache
1233
        // (see cache.cc).  We fix this at this point, where we remove such
1234
        // entries from the cache database.
1235
2
        if (!FileExists(quota_mgr->cache_dir_ + "/" +
1236
                        hash.MakePathWithoutSuffix()))
1237
        {
1238
          LogCvmfs(kLogQuota, kLogDebug,
1239
                   "remove orphaned pinned hash %s from cache database",
1240
1
                   hash_str.c_str());
1241
          sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1242
1
                            hash_str.length(), SQLITE_STATIC);
1243
          int retval;
1244
1
          if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1245
1
            uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1246
            sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1247
1
                              hash_str.length(), SQLITE_STATIC);
1248
1
            retval = sqlite3_step(quota_mgr->stmt_rm_);
1249

2
            if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1250
1
              quota_mgr->gauge_ -= size;
1251
            } else {
1252
              LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
1253
                       "failed to delete %s (%d)", hash_str.c_str(), retval);
1254
            }
1255
1
            sqlite3_reset(quota_mgr->stmt_rm_);
1256
          }
1257
1
          sqlite3_reset(quota_mgr->stmt_size_);
1258
        }
1259
      } else {
1260
        LogCvmfs(kLogQuota, kLogDebug, "this chunk was not pinned");
1261
      }
1262
    }
1263
1264
    // Immediate commands trigger flushing of the buffer
1265
    bool immediate_command = (command_type == kCleanup) ||
1266
      (command_type == kList) || (command_type == kListPinned) ||
1267
      (command_type == kListCatalogs) || (command_type == kListVolatile) ||
1268
      (command_type == kRemove) || (command_type == kStatus) ||
1269




122
      (command_type == kLimits) || (command_type == kPid);
1270
122
    if (!immediate_command) num_commands++;
1271
1272

122
    if ((num_commands == kCommandBufferSize) || immediate_command)
1273
    {
1274
      quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1275
65
                                     description_buffer);
1276
65
      if (!immediate_command) num_commands = 0;
1277
    }
1278
1279
122
    if (immediate_command) {
1280
      // Process cleanup, listings
1281
      int return_pipe =
1282
65
        quota_mgr->BindReturnPipe(command_buffer[num_commands].return_pipe);
1283
65
      if (return_pipe < 0) {
1284
        num_commands = 0;
1285
        continue;
1286
      }
1287
1288
      int retval;
1289
65
      sqlite3_stmt *this_stmt_list = NULL;
1290


65
      switch (command_type) {
1291
        case kRemove: {
1292
3
          const shash::Any hash = command_buffer[num_commands].RetrieveHash();
1293
3
          const string hash_str = hash.ToString();
1294
          LogCvmfs(kLogQuota, kLogDebug, "manually removing %s",
1295
3
                   hash_str.c_str());
1296
3
          bool success = false;
1297
1298
          sqlite3_bind_text(quota_mgr->stmt_size_, 1, &hash_str[0],
1299
3
                            hash_str.length(), SQLITE_STATIC);
1300
          int retval;
1301
3
          if ((retval = sqlite3_step(quota_mgr->stmt_size_)) == SQLITE_ROW) {
1302
2
            uint64_t size = sqlite3_column_int64(quota_mgr->stmt_size_, 0);
1303
2
            uint64_t is_pinned = sqlite3_column_int64(quota_mgr->stmt_size_, 1);
1304
1305
            sqlite3_bind_text(quota_mgr->stmt_rm_, 1, &(hash_str[0]),
1306
2
                              hash_str.length(), SQLITE_STATIC);
1307
2
            retval = sqlite3_step(quota_mgr->stmt_rm_);
1308

4
            if ((retval == SQLITE_DONE) || (retval == SQLITE_OK)) {
1309
2
              success = true;
1310
2
              quota_mgr->gauge_ -= size;
1311
2
              if (is_pinned) {
1312
1
                quota_mgr->pinned_chunks_.erase(hash);
1313
1
                quota_mgr->pinned_ -= size;
1314
              }
1315
            } else {
1316
              LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
1317
                       "failed to delete %s (%d)", hash_str.c_str(), retval);
1318
            }
1319
2
            sqlite3_reset(quota_mgr->stmt_rm_);
1320
          } else {
1321
            // File does not exist
1322
1
            success = true;
1323
          }
1324
3
          sqlite3_reset(quota_mgr->stmt_size_);
1325
1326
3
          WritePipe(return_pipe, &success, sizeof(success));
1327
          break; }
1328
        case kCleanup:
1329
9
          retval = quota_mgr->DoCleanup(size);
1330
9
          WritePipe(return_pipe, &retval, sizeof(retval));
1331
9
          break;
1332
        case kList:
1333
21
          if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_;
1334
        case kListPinned:
1335
29
          if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_pinned_;
1336
        case kListCatalogs:
1337
32
          if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_catalogs_;
1338
        case kListVolatile:
1339
35
          if (!this_stmt_list) this_stmt_list = quota_mgr->stmt_list_volatile_;
1340
1341
          // Pipe back the list, one by one
1342
          int length;
1343
35
          while (sqlite3_step(this_stmt_list) == SQLITE_ROW) {
1344
51
            string path = "(NULL)";
1345
51
            if (sqlite3_column_type(this_stmt_list, 0) != SQLITE_NULL) {
1346
              path = string(
1347
                reinterpret_cast<const char *>(
1348
51
                  sqlite3_column_text(this_stmt_list, 0)));
1349
            }
1350
51
            length = path.length();
1351
51
            WritePipe(return_pipe, &length, sizeof(length));
1352
51
            if (length > 0)
1353
51
              WritePipe(return_pipe, &path[0], length);
1354
          }
1355
35
          length = -1;
1356
35
          WritePipe(return_pipe, &length, sizeof(length));
1357
35
          sqlite3_reset(this_stmt_list);
1358
35
          break;
1359
        case kStatus:
1360
18
          WritePipe(return_pipe, &quota_mgr->gauge_, sizeof(quota_mgr->gauge_));
1361
          WritePipe(return_pipe, &quota_mgr->pinned_,
1362
18
                    sizeof(quota_mgr->pinned_));
1363
18
          break;
1364
        case kLimits:
1365
          WritePipe(return_pipe, &quota_mgr->limit_, sizeof(quota_mgr->limit_));
1366
          WritePipe(return_pipe, &quota_mgr->cleanup_threshold_,
1367
                    sizeof(quota_mgr->cleanup_threshold_));
1368
          break;
1369
        case kPid: {
1370
          pid_t pid = getpid();
1371
          WritePipe(return_pipe, &pid, sizeof(pid));
1372
          break;
1373
        }
1374
        default:
1375
          abort();  // other types are handled by the bunch processor
1376
      }
1377
65
      quota_mgr->UnbindReturnPipe(return_pipe);
1378
65
      num_commands = 0;
1379
    }
1380
  }
1381
1382
30
  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager (%d)", errno);
1383
30
  close(quota_mgr->pipe_lru_[0]);
1384
  quota_mgr->ProcessCommandBunch(num_commands, command_buffer,
1385
30
                                 description_buffer);
1386
1387
  // Unpin
1388
30
  command_buffer[0].command_type = kTouch;
1389
40
  for (map<shash::Any, uint64_t>::const_iterator i =
1390
30
       quota_mgr->pinned_chunks_.begin(),
1391
30
       iEnd = quota_mgr->pinned_chunks_.end(); i != iEnd; ++i)
1392
  {
1393
10
    command_buffer[0].StoreHash(i->first);
1394
10
    quota_mgr->ProcessCommandBunch(1, command_buffer, description_buffer);
1395
  }
1396
1397
30
  return NULL;
1398
}
1399
1400
1401
91
void PosixQuotaManager::MakeReturnPipe(int pipe[2]) {
1402
91
  if (!shared_) {
1403
88
    MakePipe(pipe);
1404
88
    return;
1405
  }
1406
1407
  // Create FIFO in cache directory, store path name (number) in pipe write end
1408
3
  int i = 0;
1409
  int retval;
1410

4
  do {
1411
4
    retval = mkfifo((workspace_dir_ + "/pipe" + StringifyInt(i)).c_str(), 0600);
1412
4
    pipe[1] = i;
1413
4
    i++;
1414
  } while ((retval == -1) && (errno == EEXIST));
1415
3
  assert(retval == 0);
1416
1417
  // Connect reader's end
1418
3
  pipe[0] = open((workspace_dir_ + "/pipe" + StringifyInt(pipe[1])).c_str(),
1419
3
                 O_RDONLY | O_NONBLOCK);
1420
3
  assert(pipe[0] >= 0);
1421
3
  Nonblock2Block(pipe[0]);
1422
}
1423
1424
1425
115
void PosixQuotaManager::ParseDirectories(
1426
  const std::string cache_workspace,
1427
  std::string *cache_dir,
1428
  std::string *workspace_dir)
1429
{
1430
115
  vector<string> dir_tokens(SplitString(cache_workspace, ':'));
1431
115
  switch (dir_tokens.size()) {
1432
    case 1:
1433
113
      *cache_dir = *workspace_dir = dir_tokens[0];
1434
113
      break;
1435
    case 2:
1436
2
      *cache_dir = dir_tokens[0];
1437
2
      *workspace_dir = dir_tokens[1];
1438
2
      break;
1439
    default:
1440
      abort();
1441
115
  }
1442
115
}
1443
1444
1445
/**
1446
 * Immediately inserts a new pinned catalog. Does cache cleanup if necessary.
1447
 *
1448
 * \return True on success, false otherwise
1449
 */
1450
54
bool PosixQuotaManager::Pin(
1451
  const shash::Any &hash,
1452
  const uint64_t size,
1453
  const string &description,
1454
  const bool is_catalog)
1455
{
1456

54
  assert((size > 0) || !is_catalog);
1457
1458
54
  const string hash_str = hash.ToString();
1459
  LogCvmfs(kLogQuota, kLogDebug, "pin into lru %s, path %s",
1460
54
           hash_str.c_str(), description.c_str());
1461
1462
  // Has to run when not yet spawned (cvmfs initialization)
1463
54
  if (!spawned_) {
1464
    // Code duplication here
1465
40
    if (pinned_chunks_.find(hash) == pinned_chunks_.end()) {
1466
24
      if (pinned_ + size > cleanup_threshold_) {
1467
        LogCvmfs(kLogQuota, kLogDebug, "failed to insert %s (pinned), no space",
1468
2
                 hash_str.c_str());
1469
2
        return false;
1470
      } else {
1471
22
        pinned_chunks_[hash] = size;
1472
22
        pinned_ += size;
1473
22
        CheckHighPinWatermark();
1474
      }
1475
    }
1476
38
    bool exists = Contains(hash_str);
1477

38
    if (!exists && (gauge_ + size > limit_)) {
1478
      LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1479
1
               gauge_, size);
1480
1
      int retval = DoCleanup(cleanup_threshold_);
1481
1
      assert(retval != 0);
1482
    }
1483
    sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1484
38
                      SQLITE_STATIC);
1485
38
    sqlite3_bind_int64(stmt_new_, 2, size);
1486
38
    sqlite3_bind_int64(stmt_new_, 3, seq_++);
1487
    sqlite3_bind_text(stmt_new_, 4, &description[0], description.length(),
1488
38
                      SQLITE_STATIC);
1489
38
    sqlite3_bind_int64(stmt_new_, 5, is_catalog ? kFileCatalog : kFileRegular);
1490
38
    sqlite3_bind_int64(stmt_new_, 6, 1);
1491
38
    int retval = sqlite3_step(stmt_new_);
1492

38
    assert((retval == SQLITE_DONE) || (retval == SQLITE_OK));
1493
38
    sqlite3_reset(stmt_new_);
1494
38
    if (!exists) gauge_ += size;
1495
38
    return true;
1496
  }
1497
1498
  int pipe_reserve[2];
1499
14
  MakeReturnPipe(pipe_reserve);
1500
1501
14
  LruCommand cmd;
1502
14
  cmd.command_type = kReserve;
1503
14
  cmd.SetSize(size);
1504
14
  cmd.StoreHash(hash);
1505
14
  cmd.return_pipe = pipe_reserve[1];
1506
14
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1507
  bool result;
1508
14
  ReadHalfPipe(pipe_reserve[0], &result, sizeof(result));
1509
14
  CloseReturnPipe(pipe_reserve);
1510
1511
14
  if (!result) return false;
1512
13
  DoInsert(hash, size, description, is_catalog ? kPin : kPinRegular);
1513
1514
13
  return true;
1515
}
1516
1517
1518
111
PosixQuotaManager::PosixQuotaManager(
1519
  const uint64_t limit,
1520
  const uint64_t cleanup_threshold,
1521
  const string &cache_workspace)
1522
  : shared_(false)
1523
  , spawned_(false)
1524
  , limit_(limit)
1525
  , cleanup_threshold_(cleanup_threshold)
1526
  , gauge_(0)
1527
  , pinned_(0)
1528
  , seq_(0)
1529
  , cache_dir_()  // initialized in body
1530
  , workspace_dir_()  // initialized in body
1531
  , fd_lock_cachedb_(-1)
1532
  , async_delete_(true)
1533
  , database_(NULL)
1534
  , stmt_touch_(NULL)
1535
  , stmt_unpin_(NULL)
1536
  , stmt_block_(NULL)
1537
  , stmt_unblock_(NULL)
1538
  , stmt_new_(NULL)
1539
  , stmt_lru_(NULL)
1540
  , stmt_size_(NULL)
1541
  , stmt_rm_(NULL)
1542
  , stmt_list_(NULL)
1543
  , stmt_list_pinned_(NULL)
1544
  , stmt_list_catalogs_(NULL)
1545
  , stmt_list_volatile_(NULL)
1546
111
  , initialized_(false)
1547
{
1548
111
  ParseDirectories(cache_workspace, &cache_dir_, &workspace_dir_);
1549
111
  pipe_lru_[0] = pipe_lru_[1] = -1;
1550
111
  cleanup_recorder_.AddRecorder(1, 90);  // last 1.5 min with second resolution
1551
  // last 1.5 h with minute resolution
1552
111
  cleanup_recorder_.AddRecorder(60, 90*60);
1553
  // last 18 hours with 20 min resolution
1554
111
  cleanup_recorder_.AddRecorder(20*60, 60*60*18);
1555
  // last 4 days with hour resolution
1556
111
  cleanup_recorder_.AddRecorder(60*60, 60*60*24*4);
1557
}
1558
1559
1560
218
PosixQuotaManager::~PosixQuotaManager() {
1561
109
  if (!initialized_) return;
1562
1563
104
  if (shared_) {
1564
    // Most of cleanup is done elsewhen by shared cache manager
1565
    close(pipe_lru_[1]);
1566
    return;
1567
  }
1568
1569
104
  if (spawned_) {
1570
30
    char fin = 0;
1571
30
    WritePipe(pipe_lru_[1], &fin, 1);
1572
30
    close(pipe_lru_[1]);
1573
30
    pthread_join(thread_lru_, NULL);
1574
  } else {
1575
74
    ClosePipe(pipe_lru_);
1576
  }
1577
1578
104
  CloseDatabase();
1579


109
}
1580
1581
1582
105
void PosixQuotaManager::ProcessCommandBunch(
1583
  const unsigned num,
1584
  const LruCommand *commands,
1585
  const char *descriptions)
1586
{
1587
105
  int retval = sqlite3_exec(database_, "BEGIN", NULL, NULL, NULL);
1588
105
  assert(retval == SQLITE_OK);
1589
1590
172
  for (unsigned i = 0; i < num; ++i) {
1591
67
    const shash::Any hash = commands[i].RetrieveHash();
1592
67
    const string hash_str = hash.ToString();
1593
67
    const unsigned size = commands[i].GetSize();
1594
    LogCvmfs(kLogQuota, kLogDebug, "processing %s (%d)",
1595
67
             hash_str.c_str(), commands[i].command_type);
1596
1597
    bool exists;
1598

67
    switch (commands[i].command_type) {
1599
      case kTouch:
1600
19
        sqlite3_bind_int64(stmt_touch_, 1, seq_++);
1601
        sqlite3_bind_text(stmt_touch_, 2, &hash_str[0], hash_str.length(),
1602
19
                          SQLITE_STATIC);
1603
19
        retval = sqlite3_step(stmt_touch_);
1604
        LogCvmfs(kLogQuota, kLogDebug, "touching %s (%ld): %d",
1605
19
                 hash_str.c_str(), seq_-1, retval);
1606

19
        if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1607
          LogCvmfs(kLogQuota, kLogSyslogErr,
1608
                   "failed to update %s in cachedb, error %d",
1609
                   hash_str.c_str(), retval);
1610
          abort();
1611
        }
1612
19
        sqlite3_reset(stmt_touch_);
1613
19
        break;
1614
      case kUnpin:
1615
        sqlite3_bind_text(stmt_unpin_, 1, &hash_str[0], hash_str.length(),
1616
2
                          SQLITE_STATIC);
1617
2
        retval = sqlite3_step(stmt_unpin_);
1618
        LogCvmfs(kLogQuota, kLogDebug, "unpinning %s: %d",
1619
2
                 hash_str.c_str(), retval);
1620

2
        if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1621
          LogCvmfs(kLogQuota, kLogSyslogErr,
1622
                   "failed to unpin %s in cachedb, error %d",
1623
                   hash_str.c_str(), retval);
1624
          abort();
1625
        }
1626
2
        sqlite3_reset(stmt_unpin_);
1627
2
        break;
1628
      case kPin:
1629
      case kPinRegular:
1630
      case kInsert:
1631
      case kInsertVolatile:
1632
        // It could already be in, check
1633
46
        exists = Contains(hash_str);
1634
1635
        // Cleanup, move to trash and unlink
1636

46
        if (!exists && (gauge_ + size > limit_)) {
1637
          LogCvmfs(kLogQuota, kLogDebug, "over limit, gauge %lu, file size %lu",
1638
                   gauge_, size);
1639
          retval = DoCleanup(cleanup_threshold_);
1640
          assert(retval != 0);
1641
        }
1642
1643
        // Insert or replace
1644
        sqlite3_bind_text(stmt_new_, 1, &hash_str[0], hash_str.length(),
1645
46
                          SQLITE_STATIC);
1646
46
        sqlite3_bind_int64(stmt_new_, 2, size);
1647
46
        if (commands[i].command_type == kInsertVolatile) {
1648
4
          sqlite3_bind_int64(stmt_new_, 3, (seq_++) | kVolatileFlag);
1649
        } else {
1650
42
          sqlite3_bind_int64(stmt_new_, 3, seq_++);
1651
        }
1652
        sqlite3_bind_text(stmt_new_, 4, &descriptions[i*kMaxDescription],
1653
46
                          commands[i].desc_length, SQLITE_STATIC);
1654
46
        sqlite3_bind_int64(stmt_new_, 5, (commands[i].command_type == kPin) ?
1655
46
                           kFileCatalog : kFileRegular);
1656
        sqlite3_bind_int64(stmt_new_, 6,
1657
46
          ((commands[i].command_type == kPin) ||
1658

46
           (commands[i].command_type == kPinRegular)) ? 1 : 0);
1659
46
        retval = sqlite3_step(stmt_new_);
1660
        LogCvmfs(kLogQuota, kLogDebug, "insert or replace %s, method %d: %d",
1661
46
                 hash_str.c_str(), commands[i].command_type, retval);
1662

46
        if ((retval != SQLITE_DONE) && (retval != SQLITE_OK)) {
1663
          LogCvmfs(kLogQuota, kLogSyslogErr,
1664
                   "failed to insert %s in cachedb, error %d",
1665
                   hash_str.c_str(), retval);
1666
          abort();
1667
        }
1668
46
        sqlite3_reset(stmt_new_);
1669
1670
46
        if (!exists) gauge_ += size;
1671
46
        break;
1672
      default:
1673
        abort();  // other types should have been taken care of by event loop
1674
    }
1675
  }
1676
1677
105
  retval = sqlite3_exec(database_, "COMMIT", NULL, NULL, NULL);
1678
105
  if (retval != SQLITE_OK) {
1679
    LogCvmfs(kLogQuota, kLogSyslogErr,
1680
             "failed to commit to cachedb, error %d", retval);
1681
    abort();
1682
  }
1683
105
}
1684
1685
1686
107
bool PosixQuotaManager::RebuildDatabase() {
1687
107
  bool result = false;
1688
107
  string sql;
1689
107
  sqlite3_stmt *stmt_select = NULL;
1690
107
  sqlite3_stmt *stmt_insert = NULL;
1691
  int sqlerr;
1692
107
  int seq = 0;
1693
  char hex[4];
1694
  struct stat info;
1695
  platform_dirent64 *d;
1696
107
  DIR *dirp = NULL;
1697
107
  string path;
1698
1699
107
  LogCvmfs(kLogQuota, kLogSyslog | kLogDebug, "re-building cache database");
1700
1701
  // Empty cache catalog and fscache
1702
107
  sql = "DELETE FROM cache_catalog; DELETE FROM fscache;";
1703
107
  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1704
107
  if (sqlerr != SQLITE_OK) {
1705
    LogCvmfs(kLogQuota, kLogDebug, "could not clear cache database");
1706
    goto build_return;
1707
  }
1708
1709
107
  gauge_ = 0;
1710
1711
  // Insert files from cache sub-directories 00 - ff
1712
  // TODO(jblomer): fs_traversal
1713
  sqlite3_prepare_v2(database_, "INSERT INTO fscache (sha1, size, actime) "
1714
107
                     "VALUES (:sha1, :s, :t);", -1, &stmt_insert, NULL);
1715
1716
26731
  for (int i = 0; i <= 0xff; i++) {
1717
26627
    snprintf(hex, sizeof(hex), "%02x", i);
1718
26627
    path = cache_dir_ + "/" + string(hex);
1719
26627
    if ((dirp = opendir(path.c_str())) == NULL) {
1720
      LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
1721
               "failed to open directory %s (tmpwatch interfering?)",
1722
3
               path.c_str());
1723
3
      goto build_return;
1724
    }
1725

26624
    while ((d = platform_readdir(dirp)) != NULL) {
1726
53250
      string file_path = path + "/" + string(d->d_name);
1727
53250
      if (stat(file_path.c_str(), &info) == 0) {
1728
53250
        if (!S_ISREG(info.st_mode))
1729
53248
          continue;
1730
2
        if (info.st_size == 0) {
1731
          LogCvmfs(kLogQuota, kLogSyslog | kLogDebug,
1732
                   "removing empty file %s during automatic cache db rebuild",
1733
1
                   file_path.c_str());
1734
1
          unlink(file_path.c_str());
1735
1
          continue;
1736
        }
1737
1738
1
        string hash = string(hex) + string(d->d_name);
1739
        sqlite3_bind_text(stmt_insert, 1, hash.data(), hash.length(),
1740
1
                          SQLITE_STATIC);
1741
1
        sqlite3_bind_int64(stmt_insert, 2, info.st_size);
1742
1
        sqlite3_bind_int64(stmt_insert, 3, info.st_atime);
1743
1
        if (sqlite3_step(stmt_insert) != SQLITE_DONE) {
1744
          LogCvmfs(kLogQuota, kLogDebug, "could not insert into temp table");
1745
          goto build_return;
1746
        }
1747
1
        sqlite3_reset(stmt_insert);
1748
1749
1
        gauge_ += info.st_size;
1750
      } else {
1751
        LogCvmfs(kLogQuota, kLogDebug, "could not stat %s", file_path.c_str());
1752
      }
1753
    }
1754
26624
    closedir(dirp);
1755
26624
    dirp = NULL;
1756
  }
1757
104
  sqlite3_finalize(stmt_insert);
1758
104
  stmt_insert = NULL;
1759
1760
  // Transfer from temp table in cache catalog
1761
  sqlite3_prepare_v2(database_,
1762
                     "SELECT sha1, size FROM fscache ORDER BY actime;",
1763
104
                     -1, &stmt_select, NULL);
1764
  sqlite3_prepare_v2(database_,
1765
    "INSERT INTO cache_catalog (sha1, size, acseq, path, type, pinned) "
1766
    "VALUES (:sha1, :s, :seq, 'unknown (automatic rebuild)', :t, 0);",
1767
104
    -1, &stmt_insert, NULL);
1768

104
  while (sqlite3_step(stmt_select) == SQLITE_ROW) {
1769
    const string hash = string(
1770
1
      reinterpret_cast<const char *>(sqlite3_column_text(stmt_select, 0)));
1771
1
    sqlite3_bind_text(stmt_insert, 1, &hash[0], hash.length(), SQLITE_STATIC);
1772
1
    sqlite3_bind_int64(stmt_insert, 2, sqlite3_column_int64(stmt_select, 1));
1773
1
    sqlite3_bind_int64(stmt_insert, 3, seq++);
1774
    // Might also be a catalog (information is lost)
1775
1
    sqlite3_bind_int64(stmt_insert, 4, kFileRegular);
1776
1777
1
    int retval = sqlite3_step(stmt_insert);
1778
1
    if (retval != SQLITE_DONE) {
1779
      // If the file system hosting the cache is full, we'll likely notice here
1780
      LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
1781
               "could not insert into cache catalog (%d - %s)",
1782
               retval, sqlite3_errstr(retval));
1783
      goto build_return;
1784
    }
1785
1
    sqlite3_reset(stmt_insert);
1786
  }
1787
1788
  // Delete temporary table
1789
104
  sql = "DELETE FROM fscache;";
1790
104
  sqlerr = sqlite3_exec(database_, sql.c_str(), NULL, NULL, NULL);
1791
104
  if (sqlerr != SQLITE_OK) {
1792
    LogCvmfs(kLogQuota, kLogDebug, "could not clear temporary table (%d)",
1793
             sqlerr);
1794
    goto build_return;
1795
  }
1796
1797
104
  seq_ = seq;
1798
104
  result = true;
1799
  LogCvmfs(kLogQuota, kLogDebug,
1800
           "rebuilding finished, seqence %" PRIu64 ", gauge %" PRIu64,
1801
104
           seq_, gauge_);
1802
1803
 build_return:
1804
107
  if (stmt_insert) sqlite3_finalize(stmt_insert);
1805
107
  if (stmt_select) sqlite3_finalize(stmt_select);
1806
107
  if (dirp) closedir(dirp);
1807
107
  return result;
1808
}
1809
1810
1811
/**
1812
 * Register a channel that allows the cache manager to trigger action to its
1813
 * clients.  Currently used for releasing pinned catalogs.
1814
 */
1815
4
void PosixQuotaManager::RegisterBackChannel(
1816
  int back_channel[2],
1817
  const string &channel_id)
1818
{
1819
4
  if (protocol_revision_ >= 1) {
1820
4
    shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1821
4
    MakeReturnPipe(back_channel);
1822
1823
4
    LruCommand cmd;
1824
4
    cmd.command_type = kRegisterBackChannel;
1825
4
    cmd.return_pipe = back_channel[1];
1826
    // Not StoreHash().  This is an MD5 hash.
1827
4
    memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1828
4
    WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1829
1830
    char success;
1831
4
    ReadHalfPipe(back_channel[0], &success, sizeof(success));
1832
    // At this point, the named FIFO is unlinked, so don't use CloseReturnPipe
1833
4
    if (success != 'S') {
1834
      LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
1835
               "failed to register quota back channel (%c)", success);
1836
      abort();
1837
    }
1838
  } else {
1839
    // Dummy pipe to return valid file descriptors
1840
    MakePipe(back_channel);
1841
  }
1842
4
}
1843
1844
1845
/**
1846
 * Removes a chunk from cache, if it exists.
1847
 */
1848
3
void PosixQuotaManager::Remove(const shash::Any &hash) {
1849
3
  string hash_str = hash.ToString();
1850
1851
  int pipe_remove[2];
1852
3
  MakeReturnPipe(pipe_remove);
1853
1854
3
  LruCommand cmd;
1855
3
  cmd.command_type = kRemove;
1856
3
  cmd.return_pipe = pipe_remove[1];
1857
3
  cmd.StoreHash(hash);
1858
3
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1859
1860
  bool success;
1861
3
  ReadHalfPipe(pipe_remove[0], &success, sizeof(success));
1862
3
  CloseReturnPipe(pipe_remove);
1863
1864
3
  unlink((cache_dir_ + "/" + hash.MakePathWithoutSuffix()).c_str());
1865
3
}
1866
1867
1868
32
void PosixQuotaManager::Spawn() {
1869
32
  if (spawned_)
1870
2
    return;
1871
1872
30
  if (pthread_create(&thread_lru_, NULL, MainCommandServer,
1873
      static_cast<void *>(this)) != 0)
1874
  {
1875
    LogCvmfs(kLogQuota, kLogDebug, "could not create lru thread");
1876
    abort();
1877
  }
1878
1879
30
  spawned_ = true;
1880
}
1881
1882
1883
/**
1884
 * Updates the sequence number of the file specified by the hash.
1885
 */
1886
30
void PosixQuotaManager::Touch(const shash::Any &hash) {
1887
30
  LruCommand cmd;
1888
30
  cmd.command_type = kTouch;
1889
30
  cmd.StoreHash(hash);
1890
30
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1891
30
}
1892
1893
1894
85
void PosixQuotaManager::UnbindReturnPipe(int pipe_wronly) {
1895
85
  if (shared_)
1896
1
    close(pipe_wronly);
1897
85
}
1898
1899
1900
7
void PosixQuotaManager::UnlinkReturnPipe(int pipe_wronly) {
1901
7
  if (shared_)
1902
3
    unlink((workspace_dir_ + "/pipe" + StringifyInt(pipe_wronly)).c_str());
1903
7
}
1904
1905
1906
37
void PosixQuotaManager::Unpin(const shash::Any &hash) {
1907
37
  LogCvmfs(kLogQuota, kLogDebug, "Unpin %s", hash.ToString().c_str());
1908
1909
37
  LruCommand cmd;
1910
37
  cmd.command_type = kUnpin;
1911
37
  cmd.StoreHash(hash);
1912
37
  WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1913
37
}
1914
1915
1916
2
void PosixQuotaManager::UnregisterBackChannel(
1917
  int back_channel[2],
1918
  const string &channel_id)
1919
{
1920
2
  if (protocol_revision_ >= 1) {
1921
2
    shash::Md5 hash = shash::Md5(shash::AsciiPtr(channel_id));
1922
1923
2
    LruCommand cmd;
1924
2
    cmd.command_type = kUnregisterBackChannel;
1925
    // Not StoreHash().  This is an MD5 hash.
1926
2
    memcpy(cmd.digest, hash.digest, hash.GetDigestSize());
1927
2
    WritePipe(pipe_lru_[1], &cmd, sizeof(cmd));
1928
1929
    // Writer's end will be closed by cache manager, FIFO is already unlinked
1930
2
    close(back_channel[0]);
1931
  } else {
1932
    ClosePipe(back_channel);
1933
  }
1934
2
}