GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/quota_listener.cc Lines: 0 80 0.0 %
Date: 2019-02-03 02:48:13 Branches: 0 26 0.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "cvmfs_config.h"
6
#include "quota_listener.h"
7
8
#include <poll.h>
9
#include <pthread.h>
10
11
#include <cstdlib>
12
13
#include "catalog_mgr.h"
14
#include "logging.h"
15
#include "quota.h"
16
#include "smalloc.h"
17
#include "util/posix.h"
18
19
using namespace std;  // NOLINT
20
21
namespace quota {
22
23
struct ListenerHandle {
24
  int pipe_backchannel[2];
25
  int pipe_terminate[2];
26
  QuotaManager *quota_manager;
27
  CatalogManager *catalog_manager;
28
  string repository_name;
29
  pthread_t thread_listener;
30
};
31
32
33
static void *MainUnpinListener(void *data) {
34
  ListenerHandle *handle = static_cast<ListenerHandle *>(data);
35
  LogCvmfs(kLogQuota, kLogDebug, "starting unpin listener for %s",
36
           handle->repository_name.c_str());
37
38
  struct pollfd *watch_fds =
39
    static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
40
  watch_fds[0].fd = handle->pipe_terminate[0];
41
  watch_fds[0].events = POLLIN | POLLPRI;
42
  watch_fds[0].revents = 0;
43
  watch_fds[1].fd = handle->pipe_backchannel[0];
44
  watch_fds[1].events = POLLIN | POLLPRI;
45
  watch_fds[1].revents = 0;
46
  while (true) {
47
    int retval = poll(watch_fds, 2, -1);
48
    if (retval < 0) {
49
      continue;
50
    }
51
52
    // Terminate I/O thread
53
    if (watch_fds[0].revents)
54
      break;
55
56
    // Release pinned catalogs
57
    if (watch_fds[1].revents) {
58
      watch_fds[1].revents = 0;
59
      char cmd;
60
      ReadPipe(handle->pipe_backchannel[0], &cmd, sizeof(cmd));
61
      if (cmd == 'R') {
62
        handle->catalog_manager->DetachNested();
63
        LogCvmfs(kLogQuota, kLogDebug | kLogSyslog, "released nested catalogs");
64
      }
65
    }
66
  }
67
  free(watch_fds);
68
69
  LogCvmfs(kLogQuota, kLogDebug, "stopping unpin listener for %s",
70
           handle->repository_name.c_str());
71
  return NULL;
72
}
73
74
75
static void *MainWatchdogListener(void *data) {
76
  ListenerHandle *handle = static_cast<ListenerHandle *>(data);
77
  LogCvmfs(kLogQuota, kLogDebug, "starting cache manager watchdog for %s",
78
           handle->repository_name.c_str());
79
80
  struct pollfd *watch_fds =
81
  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
82
  watch_fds[0].fd = handle->pipe_terminate[0];
83
  watch_fds[0].events = POLLIN | POLLPRI;
84
  watch_fds[0].revents = 0;
85
  watch_fds[1].fd = handle->pipe_backchannel[0];
86
  watch_fds[1].events = POLLIN | POLLPRI;
87
  watch_fds[1].revents = 0;
88
  while (true) {
89
    int retval = poll(watch_fds, 2, -1);
90
    if (retval < 0) {
91
      continue;
92
    }
93
94
    // Terminate I/O thread
95
    if (watch_fds[0].revents)
96
      break;
97
98
    // Release pinned catalogs
99
    if (watch_fds[1].revents) {
100
      if ((watch_fds[1].revents & POLLERR) ||
101
          (watch_fds[1].revents & POLLHUP) ||
102
          (watch_fds[1].revents & POLLNVAL))
103
      {
104
        LogCvmfs(kLogQuota, kLogDebug | kLogSyslogErr,
105
                 "cache manager disappeared, aborting");
106
        abort();
107
      }
108
      // Clean the pipe
109
      watch_fds[1].revents = 0;
110
      char dummy;
111
      ReadPipe(handle->pipe_backchannel[0], &dummy, sizeof(dummy));
112
    }
113
  }
114
  free(watch_fds);
115
116
  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager watchdog for %s",
117
           handle->repository_name.c_str());
118
  return NULL;
119
}
120
121
122
/**
123
 * Registers a back channel that reacts on high watermark of pinned chunks
124
 */
125
ListenerHandle *
126
RegisterUnpinListener(QuotaManager *quota_manager,
127
                      CatalogManager *catalog_manager,
128
                      const string &repository_name)
129
{
130
  ListenerHandle *handle = new ListenerHandle();
131
  quota_manager->RegisterBackChannel(handle->pipe_backchannel, repository_name);
132
  MakePipe(handle->pipe_terminate);
133
  handle->quota_manager = quota_manager;
134
  handle->catalog_manager = catalog_manager;
135
  handle->repository_name = repository_name;
136
  int retval = pthread_create(&handle->thread_listener, NULL, MainUnpinListener,
137
                              static_cast<void *>(handle));
138
  assert(retval == 0);
139
  return handle;
140
}
141
142
143
/**
144
 * Registers a back channel that kills the fuse module if the cache manager
145
 * disappears
146
 */
147
ListenerHandle *
148
RegisterWatchdogListener(
149
  QuotaManager *quota_manager,
150
  const string &repository_name)
151
{
152
  ListenerHandle *handle = new ListenerHandle();
153
  quota_manager->RegisterBackChannel(handle->pipe_backchannel, repository_name);
154
  MakePipe(handle->pipe_terminate);
155
  handle->quota_manager = quota_manager;
156
  handle->catalog_manager = NULL;
157
  handle->repository_name = repository_name;
158
  int retval = pthread_create(&handle->thread_listener, NULL,
159
                              MainWatchdogListener,
160
                              static_cast<void *>(handle));
161
  assert(retval == 0);
162
  return handle;
163
}
164
165
166
void UnregisterListener(ListenerHandle *handle) {
167
  const char terminate = 'T';
168
  WritePipe(handle->pipe_terminate[1], &terminate, sizeof(terminate));
169
  pthread_join(handle->thread_listener, NULL);
170
  ClosePipe(handle->pipe_terminate);
171
172
  handle->quota_manager->UnregisterBackChannel(
173
    handle->pipe_backchannel, handle->repository_name);
174
175
  delete handle;
176
}
177
178
}  // namespace quota