CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
quota_listener.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "quota_listener.h"
7 
8 #include <poll.h>
9 #include <pthread.h>
10 
11 #include <cstdlib>
12 
13 #include "catalog_mgr.h"
14 #include "quota.h"
15 #include "util/exception.h"
16 #include "util/logging.h"
17 #include "util/posix.h"
18 #include "util/smalloc.h"
19 
20 using namespace std; // NOLINT
21 
22 namespace quota {
23 
25  int pipe_backchannel[2];
26  int pipe_terminate[2];
30  pthread_t thread_listener;
31 };
32 
33 
34 static void *MainUnpinListener(void *data) {
35  ListenerHandle *handle = static_cast<ListenerHandle *>(data);
36  LogCvmfs(kLogQuota, kLogDebug, "starting unpin listener for %s",
37  handle->repository_name.c_str());
38 
39  struct pollfd *watch_fds =
40  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
41  watch_fds[0].fd = handle->pipe_terminate[0];
42  watch_fds[0].events = POLLIN | POLLPRI;
43  watch_fds[0].revents = 0;
44  watch_fds[1].fd = handle->pipe_backchannel[0];
45  watch_fds[1].events = POLLIN | POLLPRI;
46  watch_fds[1].revents = 0;
47  while (true) {
48  int retval = poll(watch_fds, 2, -1);
49  if (retval < 0) {
50  continue;
51  }
52 
53  // Terminate I/O thread
54  if (watch_fds[0].revents)
55  break;
56 
57  // Release pinned catalogs
58  if (watch_fds[1].revents) {
59  watch_fds[1].revents = 0;
60  char cmd;
61  ReadPipe(handle->pipe_backchannel[0], &cmd, sizeof(cmd));
62  if (cmd == 'R') {
63  handle->catalog_manager->DetachNested();
64  LogCvmfs(kLogQuota, kLogDebug | kLogSyslog, "released nested catalogs");
65  }
66  }
67  }
68  free(watch_fds);
69 
70  LogCvmfs(kLogQuota, kLogDebug, "stopping unpin listener for %s",
71  handle->repository_name.c_str());
72  return NULL;
73 }
74 
75 
76 static void *MainWatchdogListener(void *data) {
77  ListenerHandle *handle = static_cast<ListenerHandle *>(data);
78  LogCvmfs(kLogQuota, kLogDebug, "starting cache manager watchdog for %s",
79  handle->repository_name.c_str());
80 
81  struct pollfd *watch_fds =
82  static_cast<struct pollfd *>(smalloc(2 * sizeof(struct pollfd)));
83  watch_fds[0].fd = handle->pipe_terminate[0];
84  watch_fds[0].events = POLLIN | POLLPRI;
85  watch_fds[0].revents = 0;
86  watch_fds[1].fd = handle->pipe_backchannel[0];
87  watch_fds[1].events = POLLIN | POLLPRI;
88  watch_fds[1].revents = 0;
89  while (true) {
90  int retval = poll(watch_fds, 2, -1);
91  if (retval < 0) {
92  continue;
93  }
94 
95  // Terminate I/O thread
96  if (watch_fds[0].revents)
97  break;
98 
99  // Release pinned catalogs
100  if (watch_fds[1].revents) {
101  if ((watch_fds[1].revents & POLLERR) ||
102  (watch_fds[1].revents & POLLHUP) ||
103  (watch_fds[1].revents & POLLNVAL))
104  {
105  PANIC(kLogDebug | kLogSyslogErr, "cache manager disappeared, aborting");
106  }
107  // Clean the pipe
108  watch_fds[1].revents = 0;
109  char dummy;
110  ReadPipe(handle->pipe_backchannel[0], &dummy, sizeof(dummy));
111  }
112  }
113  free(watch_fds);
114 
115  LogCvmfs(kLogQuota, kLogDebug, "stopping cache manager watchdog for %s",
116  handle->repository_name.c_str());
117  return NULL;
118 }
119 
120 
124 ListenerHandle *
126  CatalogManager *catalog_manager,
127  const string &repository_name)
128 {
129  ListenerHandle *handle = new ListenerHandle();
130  quota_manager->RegisterBackChannel(handle->pipe_backchannel, repository_name);
131  MakePipe(handle->pipe_terminate);
132  handle->quota_manager = quota_manager;
133  handle->catalog_manager = catalog_manager;
134  handle->repository_name = repository_name;
135  int retval = pthread_create(&handle->thread_listener, NULL, MainUnpinListener,
136  static_cast<void *>(handle));
137  assert(retval == 0);
138  return handle;
139 }
140 
141 
146 ListenerHandle *
148  QuotaManager *quota_manager,
149  const string &repository_name)
150 {
151  ListenerHandle *handle = new ListenerHandle();
152  quota_manager->RegisterBackChannel(handle->pipe_backchannel, repository_name);
153  MakePipe(handle->pipe_terminate);
154  handle->quota_manager = quota_manager;
155  handle->catalog_manager = NULL;
156  handle->repository_name = repository_name;
157  int retval = pthread_create(&handle->thread_listener, NULL,
159  static_cast<void *>(handle));
160  assert(retval == 0);
161  return handle;
162 }
163 
164 
166  const char terminate = 'T';
167  WritePipe(handle->pipe_terminate[1], &terminate, sizeof(terminate));
168  pthread_join(handle->thread_listener, NULL);
169  ClosePipe(handle->pipe_terminate);
170 
172  handle->pipe_backchannel, handle->repository_name);
173 
174  delete handle;
175 }
176 
177 } // namespace quota
#define PANIC(...)
Definition: exception.h:29
static void * MainWatchdogListener(void *data)
assert((mem||(size==0))&&"Out Of Memory")
void MakePipe(int pipe_fd[2])
Definition: posix.cc:492
CatalogManager * catalog_manager
ListenerHandle * RegisterWatchdogListener(QuotaManager *quota_manager, const string &repository_name)
QuotaManager * quota_manager
ListenerHandle * RegisterUnpinListener(QuotaManager *quota_manager, CatalogManager *catalog_manager, const string &repository_name)
virtual void RegisterBackChannel(int back_channel[2], const std::string &channel_id)=0
virtual void UnregisterBackChannel(int back_channel[2], const std::string &channel_id)=0
void UnregisterListener(ListenerHandle *handle)
static void * MainUnpinListener(void *data)
void WritePipe(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:501
void ReadPipe(int fd, void *buf, size_t nbyte)
Definition: posix.cc:513
void ClosePipe(int pipe_fd[2])
Definition: posix.cc:551
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528