CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quota_listener.cc
Go to the documentation of this file.
1 
6 #include "quota_listener.h"
7 
8 #include <poll.h>
9 #include <pthread.h>
10 
11 #include <cstdlib>
12 
13 #include "catalog_mgr.h"
14 #include "quota.h"
15 #include "util/exception.h"
16 #include "util/logging.h"
17 #include "util/posix.h"
18 #include "util/smalloc.h"
19 
20 using namespace std; // NOLINT
21 
22 namespace quota {
23 
25  int pipe_backchannel[2];
26  int pipe_terminate[2];
30  pthread_t thread_listener;
31 };
32 
33 
34 static void *MainUnpinListener(void *data) {
35  ListenerHandle *handle = static_cast<ListenerHandle *>(data);
36  LogCvmfs(kLogQuota, kLogDebug, "starting unpin listener for %s",
37  handle->repository_name.c_str());
38 
39  struct pollfd *watch_fds = static_cast<struct pollfd *>(
40  smalloc(2 * sizeof(struct pollfd)));
41  watch_fds[0].fd = handle->pipe_terminate[0];
42  watch_fds[0].events = POLLIN | POLLPRI;
43  watch_fds[0].revents = 0;
44  watch_fds[1].fd = handle->pipe_backchannel[0];
45  watch_fds[1].events = POLLIN | POLLPRI;
46  watch_fds[1].revents = 0;
47  while (true) {
48  const int retval = poll(watch_fds, 2, -1);
49  if (retval < 0) {
50  continue;
51  }
52 
53  // Terminate I/O thread
54  if (watch_fds[0].revents)
55  break;
56 
57  // Release pinned catalogs
58  if (watch_fds[1].revents) {
59  watch_fds[1].revents = 0;
60  char cmd;
61  ReadPipe(handle->pipe_backchannel[0], &cmd, sizeof(cmd));
62  if (cmd == 'R') {
63  handle->catalog_manager->DetachNested();
64  LogCvmfs(kLogQuota, kLogDebug | kLogSyslog, "released nested catalogs");
65  }
66  }
67  }
68  free(watch_fds);
69 
70  LogCvmfs(kLogQuota, kLogDebug, "stopping unpin listener for %s",
71  handle->repository_name.c_str());
72  return NULL;
73 }
74 
75 
76 static void *MainWatchdogListener(void *data) {
77  ListenerHandle *handle = static_cast<ListenerHandle *>(data);
78  LogCvmfs(kLogQuota, kLogDebug, "starting cache manager watchdog for %s",
79  handle->repository_name.c_str());
80 
81  struct pollfd *watch_fds = static_cast<struct pollfd *>(
82  smalloc(2 * sizeof(struct pollfd)));
83  watch_fds[0].fd = handle->pipe_terminate[0];
84  watch_fds[0].events = POLLIN | POLLPRI;
85  watch_fds[0].revents = 0;
86  watch_fds[1].fd = handle->pipe_backchannel[0];
87  watch_fds[1].events = POLLIN | POLLPRI;
88  watch_fds[1].revents = 0;
89  while (true) {
90  const int retval = poll(watch_fds, 2, -1);
91  if (retval < 0) {
92  continue;
93  }
94 
95  // Terminate I/O thread
96  if (watch_fds[0].revents)
97  break;
98 
99  // Release pinned catalogs
100  if (watch_fds[1].revents) {
101  if ((watch_fds[1].revents & POLLERR) || (watch_fds[1].revents & POLLHUP)
102  || (watch_fds[1].revents & POLLNVAL)) {
103  PANIC(kLogDebug | kLogSyslogErr, "cache manager disappeared, aborting");
104  }
105  // Clean the pipe
106  watch_fds[1].revents = 0;
107  char dummy;
108  ReadPipe(handle->pipe_backchannel[0], &dummy, sizeof(dummy));
109  }
110  }
111  free(watch_fds);
112 
113  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager watchdog for %s",
114  handle->repository_name.c_str());
115  return NULL;
116 }
117 
118 
123  CatalogManager *catalog_manager,
124  const string &repository_name) {
125  ListenerHandle *handle = new ListenerHandle();
126  quota_manager->RegisterBackChannel(handle->pipe_backchannel, repository_name);
127  MakePipe(handle->pipe_terminate);
128  handle->quota_manager = quota_manager;
129  handle->catalog_manager = catalog_manager;
130  handle->repository_name = repository_name;
131  const int retval =
132  pthread_create(&handle->thread_listener, NULL, MainUnpinListener,
133  static_cast<void *>(handle));
134  assert(retval == 0);
135  return handle;
136 }
137 
138 
144  const string &repository_name) {
145  ListenerHandle *handle = new ListenerHandle();
146  quota_manager->RegisterBackChannel(handle->pipe_backchannel, repository_name);
147  MakePipe(handle->pipe_terminate);
148  handle->quota_manager = quota_manager;
149  handle->catalog_manager = NULL;
150  handle->repository_name = repository_name;
151  const int retval =
152  pthread_create(&handle->thread_listener, NULL, MainWatchdogListener,
153  static_cast<void *>(handle));
154  assert(retval == 0);
155  return handle;
156 }
157 
158 
160  const char terminate = 'T';
161  WritePipe(handle->pipe_terminate[1], &terminate, sizeof(terminate));
162  pthread_join(handle->thread_listener, NULL);
163  ClosePipe(handle->pipe_terminate);
164 
166  handle->repository_name);
167 
168  delete handle;
169 }
170 
171 } // namespace quota
#define PANIC(...)
Definition: exception.h:29
static void * MainWatchdogListener(void *data)
assert((mem||(size==0))&&"Out Of Memory")
void MakePipe(int pipe_fd[2])
Definition: posix.cc:487
CatalogManager * catalog_manager
ListenerHandle * RegisterWatchdogListener(QuotaManager *quota_manager, const string &repository_name)
QuotaManager * quota_manager
ListenerHandle * RegisterUnpinListener(QuotaManager *quota_manager, CatalogManager *catalog_manager, const string &repository_name)
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)=0
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)=0
void UnregisterListener(ListenerHandle *handle)
static void * MainUnpinListener(void *data)
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:496
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:508
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:559
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545