1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
* |
4 |
|
|
* The stratum agent runs on stratum 1 servers and provides a web service to |
5 |
|
|
* release manager machines. Release manager machines can send signed messages |
6 |
|
|
* to the stratum agent to trigger a replication run ('cvmfs_server snapshot'). |
7 |
|
|
* |
8 |
|
|
* The service offers the following API |
9 |
|
|
* - /cvmfs/<repo name>/api/v1/replicate/new (POST): |
10 |
|
|
* create new snapshot run. Returns JSON with the job id |
11 |
|
|
* - /cvmfs/<repo name>/api/v1/replicate/<job id>/ |
12 |
|
|
* {status,stdout,stderr,tail} (GET): |
13 |
|
|
* Retrieve information about running and finished jobs. The status is |
14 |
|
|
* JSON encoded. Finished jobs are cleaned up after a while. |
15 |
|
|
* |
16 |
|
|
* TODO(jblomer): add support for synchronized application of a new revision. |
17 |
|
|
* In this case, the snapshot would run but the new manifest would only be |
18 |
|
|
* applied on another signal. |
19 |
|
|
*/ |
20 |
|
|
|
21 |
|
|
#include <arpa/inet.h> |
22 |
|
|
#include <poll.h> |
23 |
|
|
#include <pthread.h> |
24 |
|
|
#include <unistd.h> |
25 |
|
|
|
26 |
|
|
#include <cassert> |
27 |
|
|
#include <cstdio> |
28 |
|
|
#include <cstring> |
29 |
|
|
#include <ctime> |
30 |
|
|
#include <map> |
31 |
|
|
#include <string> |
32 |
|
|
#include <vector> |
33 |
|
|
|
34 |
|
|
#include "download.h" |
35 |
|
|
#include "fence.h" |
36 |
|
|
#include "json.h" |
37 |
|
|
#include "letter.h" |
38 |
|
|
#include "logging.h" |
39 |
|
|
#include "mongoose.h" |
40 |
|
|
#include "options.h" |
41 |
|
|
#include "platform.h" |
42 |
|
|
#include "signature.h" |
43 |
|
|
#include "stratum_agent/uri_map.h" |
44 |
|
|
#include "util/pointer.h" |
45 |
|
|
#include "util/posix.h" |
46 |
|
|
#include "util/string.h" |
47 |
|
|
#include "util_concurrency.h" |
48 |
|
|
#include "uuid.h" |
49 |
|
|
#include "whitelist.h" |
50 |
|
|
|
51 |
|
|
using namespace std; // NOLINT |
52 |
|
|
|
53 |
|
|
// Upon successful testing, it might adopt the mainline cvmfs version |
54 |
|
|
const unsigned kVersionMajor = 1; |
55 |
|
|
const unsigned kVersionMinor = 0; |
56 |
|
|
const unsigned kVersionPatch = 0; |
57 |
|
|
|
58 |
|
|
const char *kDefaultPidFile = "/var/run/cvmfs_stratum_agent.pid"; |
59 |
|
|
// Mongoose takes the port as string in its options array |
60 |
|
|
const char *kDefaultPort = "7999"; |
61 |
|
|
const char *kDefaultNumThreads = "2"; |
62 |
|
|
// Keep finished jobs for 3 hours |
63 |
|
|
const unsigned kJobRetentionDuration = 3 * 3600; |
64 |
|
|
const unsigned kCleanupInterval = 60; // Scan job table every minute |
65 |
|
|
|
66 |
|
|
|
67 |
|
|
/** |
68 |
|
|
* Everything that's needed to verify requests for a single repository |
69 |
|
|
*/ |
70 |
|
|
struct RepositoryConfig : SingleCopy { |
71 |
|
|
RepositoryConfig() |
72 |
|
|
: download_mgr(NULL) |
73 |
|
|
, signature_mgr(NULL) |
74 |
|
|
, options_mgr(NULL) |
75 |
|
|
, statistics(NULL) |
76 |
|
|
{ } |
77 |
|
|
~RepositoryConfig() { |
78 |
|
|
if (download_mgr) download_mgr->Fini(); |
79 |
|
|
if (signature_mgr) signature_mgr->Fini(); |
80 |
|
|
delete download_mgr; |
81 |
|
|
delete signature_mgr; |
82 |
|
|
delete options_mgr; |
83 |
|
|
delete statistics; |
84 |
|
|
} |
85 |
|
|
string alias; // stratum 1s can have an alias different from the fqrn |
86 |
|
|
string fqrn; |
87 |
|
|
string stratum0_url; |
88 |
|
|
download::DownloadManager *download_mgr; |
89 |
|
|
signature::SignatureManager *signature_mgr; |
90 |
|
|
OptionsManager *options_mgr; |
91 |
|
|
perf::Statistics *statistics; |
92 |
|
|
}; |
93 |
|
|
|
94 |
|
|
|
95 |
|
|
/** |
96 |
|
|
* Catures a run of 'cvmfs_server snapshot <reponame>' |
97 |
|
|
*/ |
98 |
|
|
struct Job : SingleCopy { |
99 |
|
|
Job() : remote_ip(0), fd_stdin(-1), fd_stdout(-1), fd_stderr(-1), |
100 |
|
|
status(kStatusLimbo), exit_code(-1), |
101 |
|
|
birth(platform_monotonic_time()), death(0), finish_timestamp(0), |
102 |
|
|
pid(0) |
103 |
|
|
{ |
104 |
|
|
memset(&thread_job, 0, sizeof(thread_job)); |
105 |
|
|
int retval = pthread_mutex_init(&lock, NULL); |
106 |
|
|
assert(retval == 0); |
107 |
|
|
} |
108 |
|
|
~Job() { |
109 |
|
|
pthread_mutex_destroy(&lock); |
110 |
|
|
} |
111 |
|
|
enum Status { |
112 |
|
|
kStatusLimbo, |
113 |
|
|
kStatusRunning, |
114 |
|
|
kStatusDone |
115 |
|
|
}; |
116 |
|
|
|
117 |
|
|
string id; |
118 |
|
|
string alias; // Usually the fqrn |
119 |
|
|
// long type imposed by Mongoose |
120 |
|
|
long remote_ip; // NOLINT |
121 |
|
|
|
122 |
|
|
int fd_stdin; |
123 |
|
|
int fd_stdout; |
124 |
|
|
int fd_stderr; |
125 |
|
|
string stdout; |
126 |
|
|
string stderr; |
127 |
|
|
Status status; |
128 |
|
|
int exit_code; |
129 |
|
|
pthread_t thread_job; |
130 |
|
|
uint64_t birth; |
131 |
|
|
uint64_t death; |
132 |
|
|
time_t finish_timestamp; |
133 |
|
|
pid_t pid; |
134 |
|
|
pthread_mutex_t lock; |
135 |
|
|
}; |
136 |
|
|
|
137 |
|
|
class UriHandlerReplicate; |
138 |
|
|
class UriHandlerJob; |
139 |
|
|
class UriHandlerInfo; |
140 |
|
|
|
141 |
|
|
/** |
142 |
|
|
* Handler for requests to start a new snapshot run. |
143 |
|
|
*/ |
144 |
|
|
UriHandlerReplicate *g_handler_replicate; |
145 |
|
|
/** |
146 |
|
|
* Handler to query jobs from the job table. |
147 |
|
|
*/ |
148 |
|
|
UriHandlerJob *g_handler_job; |
149 |
|
|
UriHandlerInfo *g_handler_info; |
150 |
|
|
/** |
151 |
|
|
* Routes URIs to handlers. |
152 |
|
|
*/ |
153 |
|
|
UriMap g_uri_map; |
154 |
|
|
/** |
155 |
|
|
* Maps fqrn to manager classes constructed from the configuration in /etc. |
156 |
|
|
*/ |
157 |
|
|
map<string, RepositoryConfig *> g_configurations; |
158 |
|
|
/** |
159 |
|
|
* Prevents handlers to run while the configuration is updated |
160 |
|
|
*/ |
161 |
|
|
Fence g_fence_configurations; |
162 |
|
|
/** |
163 |
|
|
* Table of jobs, maps the job id to job information. |
164 |
|
|
*/ |
165 |
|
|
map<string, Job *> g_jobs; |
166 |
|
|
pthread_mutex_t g_lock_jobs = PTHREAD_MUTEX_INITIALIZER; |
167 |
|
|
/** |
168 |
|
|
* Used to control the main thread from signals. |
169 |
|
|
*/ |
170 |
|
|
int g_pipe_ctrl[2]; |
171 |
|
|
|
172 |
|
|
|
173 |
|
|
/** |
174 |
|
|
* Parse /etc/cvmfs/repositories.d/<fqrn>/.conf and search for stratum 1s |
175 |
|
|
*/ |
176 |
|
|
static void ReadConfigurations() { |
177 |
|
|
vector<string> repo_config_dirs = |
178 |
|
|
FindDirectories("/etc/cvmfs/repositories.d"); |
179 |
|
|
for (unsigned i = 0; i < repo_config_dirs.size(); ++i) { |
180 |
|
|
string name = GetFileName(repo_config_dirs[i]); |
181 |
|
|
string optarg; |
182 |
|
|
UniquePtr<OptionsManager> options_mgr( |
183 |
|
|
new BashOptionsManager(new DefaultOptionsTemplateManager( |
184 |
|
|
repo_config_dirs[i]))); |
185 |
|
|
options_mgr->set_taint_environment(false); |
186 |
|
|
options_mgr->ParsePath(repo_config_dirs[i] + "/server.conf", |
187 |
|
|
false); |
188 |
|
|
if (!options_mgr->GetValue("CVMFS_REPOSITORY_TYPE", &optarg) || |
189 |
|
|
(optarg != "stratum1")) |
190 |
|
|
{ |
191 |
|
|
continue; |
192 |
|
|
} |
193 |
|
|
options_mgr->ParsePath(repo_config_dirs[i] + "/replica.conf", |
194 |
|
|
false); |
195 |
|
|
if (options_mgr->GetValue("CVMFS_STRATUM_AGENT", &optarg) && |
196 |
|
|
!options_mgr->IsOn(optarg)) |
197 |
|
|
{ |
198 |
|
|
continue; |
199 |
|
|
} |
200 |
|
|
|
201 |
|
|
UniquePtr<signature::SignatureManager> |
202 |
|
|
signature_mgr(new signature::SignatureManager()); |
203 |
|
|
signature_mgr->Init(); |
204 |
|
|
options_mgr->GetValue("CVMFS_PUBLIC_KEY", &optarg); |
205 |
|
|
if (DirectoryExists(optarg)) |
206 |
|
|
optarg = JoinStrings(FindFilesBySuffix(optarg, ".pub"), ":"); |
207 |
|
|
if (!signature_mgr->LoadPublicRsaKeys(optarg)) { |
208 |
|
|
LogCvmfs(kLogCvmfs, kLogStderr | kLogSyslogErr, |
209 |
|
|
"(%s) could not load public key %s", |
210 |
|
|
name.c_str(), optarg.c_str()); |
211 |
|
|
continue; |
212 |
|
|
} |
213 |
|
|
|
214 |
|
|
UniquePtr<perf::Statistics> statistics(new perf::Statistics()); |
215 |
|
|
UniquePtr<download::DownloadManager> |
216 |
|
|
download_mgr(new download::DownloadManager()); |
217 |
|
|
download_mgr->Init(4, false, perf::StatisticsTemplate("agent", statistics)); |
218 |
|
|
if (options_mgr->GetValue("CVMFS_HTTP_TIMEOUT", &optarg)) |
219 |
|
|
download_mgr->SetTimeout(String2Uint64(optarg), String2Uint64(optarg)); |
220 |
|
|
if (options_mgr->GetValue("CVMFS_HTTP_RETRIES", &optarg)) |
221 |
|
|
download_mgr->SetRetryParameters(String2Uint64(optarg), 1000, 2000); |
222 |
|
|
RepositoryConfig *config = new RepositoryConfig(); |
223 |
|
|
config->alias = name; |
224 |
|
|
options_mgr->GetValue("CVMFS_REPOSITORY_NAME", &(config->fqrn)); |
225 |
|
|
options_mgr->GetValue("CVMFS_STRATUM0", &(config->stratum0_url)); |
226 |
|
|
config->signature_mgr = signature_mgr.Release(); |
227 |
|
|
config->download_mgr = download_mgr.Release(); |
228 |
|
|
config->options_mgr = options_mgr.Release(); |
229 |
|
|
config->statistics = statistics.Release(); |
230 |
|
|
g_configurations[config->fqrn] = config; |
231 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, "watching %s", |
232 |
|
|
config->fqrn.c_str()); |
233 |
|
|
} |
234 |
|
|
if (g_configurations.empty()) { |
235 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslogWarn, |
236 |
|
|
"Warning: no stratum 1 repositories found"); |
237 |
|
|
} |
238 |
|
|
} |
239 |
|
|
|
240 |
|
|
|
241 |
|
|
/** |
242 |
|
|
* New replication job. Responds on /cvmfs/<repo>/api/v1/replicate/new |
243 |
|
|
*/ |
244 |
|
|
class UriHandlerReplicate : public UriHandler { |
245 |
|
|
public: |
246 |
|
|
virtual ~UriHandlerReplicate() { } |
247 |
|
|
|
248 |
|
|
virtual void OnRequest(const struct mg_request_info *req_info, |
249 |
|
|
struct mg_connection *conn) |
250 |
|
|
{ |
251 |
|
|
vector<string> uri_tokens = SplitString(req_info->uri, '/'); |
252 |
|
|
// strip api/v1/replicate/new |
253 |
|
|
string fqrn = uri_tokens[uri_tokens.size() - 5]; |
254 |
|
|
char post_data[kMaxPostData]; post_data[0] = '\0'; |
255 |
|
|
unsigned post_data_len = mg_read(conn, post_data, sizeof(post_data) - 1); |
256 |
|
|
post_data[post_data_len] = '\0'; |
257 |
|
|
// Trim trailing newline |
258 |
|
|
if (post_data_len && (post_data[post_data_len - 1] == '\n')) |
259 |
|
|
post_data[post_data_len - 1] = '\0'; |
260 |
|
|
|
261 |
|
|
FenceGuard guard_configurations(&g_fence_configurations); |
262 |
|
|
RepositoryConfig *config = g_configurations[fqrn]; |
263 |
|
|
assert(fqrn == config->fqrn); |
264 |
|
|
|
265 |
|
|
// Verify letter |
266 |
|
|
string message; |
267 |
|
|
string cert; |
268 |
|
|
letter::Letter letter(config->fqrn, post_data, config->signature_mgr); |
269 |
|
|
letter::Failures retval_lt = letter.Verify(kTimeoutLetter, &message, &cert); |
270 |
|
|
if (retval_lt != letter::kFailOk) { |
271 |
|
|
WebReply::Send(WebReply::k400, MkJsonError(letter::Code2Ascii(retval_lt)), |
272 |
|
|
conn); |
273 |
|
|
return; |
274 |
|
|
} |
275 |
|
|
whitelist::Whitelist whitelist(config->fqrn, config->download_mgr, |
276 |
|
|
config->signature_mgr); |
277 |
|
|
whitelist::Failures retval_wl = whitelist.Load(config->stratum0_url); |
278 |
|
|
if (retval_wl == whitelist::kFailOk) |
279 |
|
|
retval_wl = whitelist.VerifyLoadedCertificate(); |
280 |
|
|
if (retval_wl != whitelist::kFailOk) { |
281 |
|
|
WebReply::Send(WebReply::k400, |
282 |
|
|
MkJsonError(whitelist::Code2Ascii(retval_wl)), conn); |
283 |
|
|
return; |
284 |
|
|
} |
285 |
|
|
|
286 |
|
|
UniquePtr<Job> job(new Job()); |
287 |
|
|
string exe = "/usr/bin/cvmfs_server"; |
288 |
|
|
vector<string> argv; |
289 |
|
|
argv.push_back("snapshot"); |
290 |
|
|
argv.push_back(config->alias); |
291 |
|
|
bool retval_b = ExecuteBinary( |
292 |
|
|
&job->fd_stdin, &job->fd_stdout, &job->fd_stderr, |
293 |
|
|
exe, argv, false, &job->pid); |
294 |
|
|
if (!retval_b) { |
295 |
|
|
WebReply::Send(WebReply::k500, |
296 |
|
|
MkJsonError("could not spawn snapshot process"), conn); |
297 |
|
|
return; |
298 |
|
|
} |
299 |
|
|
job->status = Job::kStatusRunning; |
300 |
|
|
string uuid = cvmfs::Uuid::CreateOneTime(); |
301 |
|
|
job->id = uuid; |
302 |
|
|
job->alias = config->alias; |
303 |
|
|
job->remote_ip = ntohl(req_info->remote_ip); |
304 |
|
|
|
305 |
|
|
int retval_i = pthread_create(&job->thread_job, NULL, MainJobMgr, job); |
306 |
|
|
assert(retval_i == 0); |
307 |
|
|
retval_i = pthread_detach(job->thread_job); |
308 |
|
|
assert(retval_i == 0); |
309 |
|
|
{ |
310 |
|
|
MutexLockGuard guard_jobs(&g_lock_jobs); |
311 |
|
|
g_jobs[uuid] = job.Release(); |
312 |
|
|
} |
313 |
|
|
WebReply::Send(WebReply::k200, "{\"job_id\":\"" + uuid + "\"}", conn); |
314 |
|
|
} |
315 |
|
|
|
316 |
|
|
private: |
317 |
|
|
static const unsigned kMaxPostData = 32 * 1024; // 32kB |
318 |
|
|
static const unsigned kTimeoutLetter = 180; // 3 minutes |
319 |
|
|
|
320 |
|
|
// Polls stdout/stderr of running jobs |
321 |
|
|
static void *MainJobMgr(void *data) { |
322 |
|
|
Job *job = reinterpret_cast<Job *>(data); |
323 |
|
|
string job_id = job->id; |
324 |
|
|
string alias = job->alias; |
325 |
|
|
char ipv4_buf[INET_ADDRSTRLEN]; |
326 |
|
|
|
327 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, |
328 |
|
|
"(%s) starting replication job %s from %s", |
329 |
|
|
alias.c_str(), job_id.c_str(), |
330 |
|
|
inet_ntop(AF_INET, &job->remote_ip, ipv4_buf, INET_ADDRSTRLEN)); |
331 |
|
|
Block2Nonblock(job->fd_stdout); |
332 |
|
|
Block2Nonblock(job->fd_stderr); |
333 |
|
|
char buf_stdout[kPageSize]; |
334 |
|
|
char buf_stderr[kPageSize]; |
335 |
|
|
struct pollfd watch_fds[2]; |
336 |
|
|
watch_fds[0].fd = job->fd_stdout; |
337 |
|
|
watch_fds[1].fd = job->fd_stderr; |
338 |
|
|
watch_fds[0].events = watch_fds[1].events = POLLIN | POLLPRI | POLLHUP; |
339 |
|
|
watch_fds[0].revents = watch_fds[1].revents = 0; |
340 |
|
|
bool terminate = false; |
341 |
|
|
while (!terminate) { |
342 |
|
|
int retval = poll(watch_fds, 2, -1); |
343 |
|
|
if (retval < 0) |
344 |
|
|
continue; |
345 |
|
|
if (watch_fds[0].revents) { |
346 |
|
|
watch_fds[0].revents = 0; |
347 |
|
|
int nbytes = read(watch_fds[0].fd, buf_stdout, kPageSize); |
348 |
|
|
if ((nbytes <= 0) && (errno != EINTR)) |
349 |
|
|
terminate = true; |
350 |
|
|
if (nbytes > 0) { |
351 |
|
|
MutexLockGuard guard_job(&job->lock); |
352 |
|
|
job->stdout += string(buf_stdout, nbytes); |
353 |
|
|
} |
354 |
|
|
} |
355 |
|
|
if (watch_fds[1].revents) { |
356 |
|
|
watch_fds[1].revents = 0; |
357 |
|
|
int nbytes = read(watch_fds[1].fd, buf_stderr, kPageSize); |
358 |
|
|
if ((nbytes <= 0) && (errno != EINTR)) |
359 |
|
|
terminate = true; |
360 |
|
|
if (nbytes > 0) { |
361 |
|
|
MutexLockGuard guard_job(&job->lock); |
362 |
|
|
job->stderr += string(buf_stderr, nbytes); |
363 |
|
|
} |
364 |
|
|
} |
365 |
|
|
} |
366 |
|
|
|
367 |
|
|
{ |
368 |
|
|
MutexLockGuard guard_job(&job->lock); |
369 |
|
|
close(job->fd_stdin); job->fd_stdin = -1; |
370 |
|
|
close(job->fd_stdout); job->fd_stdout = -1; |
371 |
|
|
close(job->fd_stderr); job->fd_stderr = -1; |
372 |
|
|
job->exit_code = WaitForChild(job->pid); |
373 |
|
|
job->death = platform_monotonic_time(); |
374 |
|
|
job->finish_timestamp = time(NULL); |
375 |
|
|
job->status = Job::kStatusDone; |
376 |
|
|
job_id = job->id; |
377 |
|
|
} |
378 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, |
379 |
|
|
"(%s) finished replication job %s", alias.c_str(), job_id.c_str()); |
380 |
|
|
return NULL; |
381 |
|
|
} |
382 |
|
|
|
383 |
|
|
string MkJsonError(const string &msg) { |
384 |
|
|
return "{\"error\": \"" + msg + "\"}"; |
385 |
|
|
} |
386 |
|
|
}; |
387 |
|
|
|
388 |
|
|
|
389 |
|
|
/** |
390 |
|
|
* Handler to query job information under |
391 |
|
|
* /cvmfs/<repo>/api/v1/replicate/<jobid>/{stdout,stderr,tail,status} |
392 |
|
|
*/ |
393 |
|
|
class UriHandlerJob : public UriHandler { |
394 |
|
|
public: |
395 |
|
|
virtual ~UriHandlerJob() { } |
396 |
|
|
|
397 |
|
|
virtual void OnRequest(const struct mg_request_info *req_info, |
398 |
|
|
struct mg_connection *conn) |
399 |
|
|
{ |
400 |
|
|
string what = GetFileName(req_info->uri); |
401 |
|
|
string job_id = GetFileName(GetParentPath(req_info->uri)); |
402 |
|
|
|
403 |
|
|
MutexLockGuard guard_jobs(&g_lock_jobs); |
404 |
|
|
map<string, Job *>::const_iterator iter = g_jobs.find(job_id); |
405 |
|
|
if (iter == g_jobs.end()) { |
406 |
|
|
WebReply::Send(WebReply::k404, "{\"error\":\"no such job\"}", conn); |
407 |
|
|
return; |
408 |
|
|
} |
409 |
|
|
Job *job = iter->second; |
410 |
|
|
MutexLockGuard guard_this_job(&job->lock); |
411 |
|
|
if (what == "stdout") { |
412 |
|
|
WebReply::Send(WebReply::k200, job->stdout, conn); |
413 |
|
|
} else if (what == "stderr") { |
414 |
|
|
WebReply::Send(WebReply::k200, job->stderr, conn); |
415 |
|
|
} else if (what == "tail") { |
416 |
|
|
WebReply::Send(WebReply::k200, Tail(job->stdout, 4), conn); |
417 |
|
|
} else if (what == "status") { |
418 |
|
|
string reply = "{\"status\":"; |
419 |
|
|
switch (job->status) { |
420 |
|
|
case Job::kStatusLimbo: reply += "\"limbo\""; break; |
421 |
|
|
case Job::kStatusRunning: reply += "\"running\""; break; |
422 |
|
|
case Job::kStatusDone: reply += "\"done\""; break; |
423 |
|
|
default: assert(false); |
424 |
|
|
} |
425 |
|
|
if (job->status == Job::kStatusDone) { |
426 |
|
|
reply += ",\"exit_code\":" + StringifyInt(job->exit_code); |
427 |
|
|
reply += ",\"duration\":" + StringifyInt(job->death - job->birth); |
428 |
|
|
reply += ",\"finish_timestamp\":\"" + |
429 |
|
|
StringifyTime(job->finish_timestamp, false) + " UTC\""; |
430 |
|
|
} |
431 |
|
|
reply += "}"; |
432 |
|
|
WebReply::Send(WebReply::k200, reply, conn); |
433 |
|
|
} else { |
434 |
|
|
WebReply::Send(WebReply::k404, "{\"error\":\"internal error\"}", conn); |
435 |
|
|
} |
436 |
|
|
} |
437 |
|
|
}; |
438 |
|
|
|
439 |
|
|
|
440 |
|
|
/** |
441 |
|
|
* Handler to query service information at /cvmfs/<repo>/api/v1/replicate/info |
442 |
|
|
*/ |
443 |
|
|
class UriHandlerInfo : public UriHandler { |
444 |
|
|
public: |
445 |
|
|
virtual ~UriHandlerInfo() { } |
446 |
|
|
|
447 |
|
|
virtual void OnRequest(const struct mg_request_info *req_info, |
448 |
|
|
struct mg_connection *conn) |
449 |
|
|
{ |
450 |
|
|
string version = StringifyInt(kVersionMajor) + "." + |
451 |
|
|
StringifyInt(kVersionMinor) + "." + |
452 |
|
|
StringifyInt(kVersionPatch); |
453 |
|
|
WebReply::Send(WebReply::k200, "{\"version\":\"" + version + "\"}", conn); |
454 |
|
|
} |
455 |
|
|
}; |
456 |
|
|
|
457 |
|
|
|
458 |
|
|
/** |
459 |
|
|
* Create the REST API from configuration |
460 |
|
|
*/ |
461 |
|
|
static void GenerateUriMap() { |
462 |
|
|
for (map<string, RepositoryConfig *>::const_iterator i = |
463 |
|
|
g_configurations.begin(), i_end = g_configurations.end(); |
464 |
|
|
i != i_end; ++i) |
465 |
|
|
{ |
466 |
|
|
string fqrn = i->second->fqrn; |
467 |
|
|
g_uri_map.Register(WebRequest( |
468 |
|
|
"/cvmfs/" + fqrn + "/api/v1/replicate/*/stdout", WebRequest::kGet), |
469 |
|
|
g_handler_job); |
470 |
|
|
g_uri_map.Register(WebRequest( |
471 |
|
|
"/cvmfs/" + fqrn + "/api/v1/replicate/*/stderr", WebRequest::kGet), |
472 |
|
|
g_handler_job); |
473 |
|
|
g_uri_map.Register(WebRequest( |
474 |
|
|
"/cvmfs/" + fqrn + "/api/v1/replicate/*/tail", WebRequest::kGet), |
475 |
|
|
g_handler_job); |
476 |
|
|
g_uri_map.Register(WebRequest( |
477 |
|
|
"/cvmfs/" + fqrn + "/api/v1/replicate/*/status", WebRequest::kGet), |
478 |
|
|
g_handler_job); |
479 |
|
|
g_uri_map.Register(WebRequest("/cvmfs/" + fqrn + "/api/v1/replicate/new", |
480 |
|
|
WebRequest::kPost), |
481 |
|
|
g_handler_replicate); |
482 |
|
|
g_uri_map.Register(WebRequest( |
483 |
|
|
"/cvmfs/" + fqrn + "/api/v1/replicate/info", WebRequest::kGet), |
484 |
|
|
g_handler_info); |
485 |
|
|
} |
486 |
|
|
} |
487 |
|
|
|
488 |
|
|
|
489 |
|
|
static void ClearConfigurations() { |
490 |
|
|
for (map<string, RepositoryConfig *>::const_iterator i = |
491 |
|
|
g_configurations.begin(), i_end = g_configurations.end(); |
492 |
|
|
i != i_end; ++i) |
493 |
|
|
{ |
494 |
|
|
delete i->second; |
495 |
|
|
} |
496 |
|
|
g_configurations.clear(); |
497 |
|
|
} |
498 |
|
|
|
499 |
|
|
|
500 |
|
|
/** |
501 |
|
|
* Embedded web server's request callbacks |
502 |
|
|
*/ |
503 |
|
|
static int MongooseOnRequest(struct mg_connection *conn) { |
504 |
|
|
const struct mg_request_info *request_info = mg_get_request_info(conn); |
505 |
|
|
WebRequest request(request_info); |
506 |
|
|
UriHandler *handler = g_uri_map.Route(request); |
507 |
|
|
if (handler == NULL) { |
508 |
|
|
if (g_uri_map.IsKnownUri(request.uri())) |
509 |
|
|
WebReply::Send(WebReply::k405, "", conn); |
510 |
|
|
else |
511 |
|
|
WebReply::Send(WebReply::k404, "", conn); |
512 |
|
|
return 1; |
513 |
|
|
} |
514 |
|
|
|
515 |
|
|
handler->OnRequest(request_info, conn); |
516 |
|
|
return 1; |
517 |
|
|
} |
518 |
|
|
|
519 |
|
|
|
520 |
|
|
/** |
521 |
|
|
* Redirect Mongoose's logging to LogCvmfs |
522 |
|
|
*/ |
523 |
|
|
static int MongooseOnLog(const struct mg_connection *, const char *message) { |
524 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, "(web server) %s", message); |
525 |
|
|
return 1; |
526 |
|
|
} |
527 |
|
|
|
528 |
|
|
|
529 |
|
|
// Signals write to a pipe on which the main thread is listening |
530 |
|
|
void SignalCleanup(int signal) { |
531 |
|
|
char c = 'C'; |
532 |
|
|
WritePipe(g_pipe_ctrl[1], &c, 1); |
533 |
|
|
} |
534 |
|
|
|
535 |
|
|
void SignalExit(int signal) { |
536 |
|
|
char c = 'T'; |
537 |
|
|
WritePipe(g_pipe_ctrl[1], &c, 1); |
538 |
|
|
} |
539 |
|
|
|
540 |
|
|
void SignalReload(int signal) { |
541 |
|
|
char c = 'R'; |
542 |
|
|
WritePipe(g_pipe_ctrl[1], &c, 1); |
543 |
|
|
} |
544 |
|
|
|
545 |
|
|
|
546 |
|
|
|
547 |
|
|
void Usage(const char *progname) { |
548 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout, "%s version %u.%u.%u\n" |
549 |
|
|
"Provides a REST service to release manager machines to let them " |
550 |
|
|
"trigger repository replication\n" |
551 |
|
|
"\n" |
552 |
|
|
"Usage: %s [-f(oreground)] [-p port (default: %s)]\n" |
553 |
|
|
" [-P pid file (default: %s)] [-u username]", |
554 |
|
|
progname, kVersionMajor, kVersionMinor, kVersionPatch, |
555 |
|
|
progname, kDefaultPort, kDefaultPidFile); |
556 |
|
|
} |
557 |
|
|
|
558 |
|
|
|
559 |
|
|
int main(int argc, char **argv) { |
560 |
|
|
const char *port = kDefaultPort; |
561 |
|
|
const char *pid_file = kDefaultPidFile; |
562 |
|
|
bool foreground = false; |
563 |
|
|
string persona; |
564 |
|
|
uid_t original_uid = 0, drop_to_uid = 0; |
565 |
|
|
gid_t original_gid = 0, drop_to_gid = 0; |
566 |
|
|
|
567 |
|
|
int c; |
568 |
|
|
while ((c = getopt(argc, argv, "hvfp:P:u:")) != -1) { |
569 |
|
|
switch (c) { |
570 |
|
|
case 'f': |
571 |
|
|
foreground = true; |
572 |
|
|
break; |
573 |
|
|
case 'p': |
574 |
|
|
port = optarg; |
575 |
|
|
break; |
576 |
|
|
case 'P': |
577 |
|
|
pid_file = optarg; |
578 |
|
|
break; |
579 |
|
|
case 'u': { |
580 |
|
|
persona = optarg; |
581 |
|
|
bool retval = GetUidOf(persona, &drop_to_uid, &drop_to_gid); |
582 |
|
|
if (!retval) { |
583 |
|
|
LogCvmfs(kLogCvmfs, kLogStderr | kLogSyslogErr, |
584 |
|
|
"cannot find user %s", persona.c_str()); |
585 |
|
|
return 1; |
586 |
|
|
} |
587 |
|
|
break; |
588 |
|
|
} |
589 |
|
|
case 'v': |
590 |
|
|
break; |
591 |
|
|
case 'h': |
592 |
|
|
Usage(argv[0]); |
593 |
|
|
return 0; |
594 |
|
|
default: |
595 |
|
|
Usage(argv[0]); |
596 |
|
|
return 1; |
597 |
|
|
} |
598 |
|
|
} |
599 |
|
|
|
600 |
|
|
if (!foreground) |
601 |
|
|
Daemonize(); |
602 |
|
|
int fd_pid_file = WritePidFile(pid_file); |
603 |
|
|
if (fd_pid_file < 0) { |
604 |
|
|
string reason; |
605 |
|
|
if (fd_pid_file == -2) |
606 |
|
|
reason = " (another daemon is already running)"; |
607 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslogErr, |
608 |
|
|
"failed to write pid file %s%s", pid_file, reason.c_str()); |
609 |
|
|
return 1; |
610 |
|
|
} |
611 |
|
|
if (!persona.empty()) { |
612 |
|
|
original_uid = geteuid(); |
613 |
|
|
original_gid = getegid(); |
614 |
|
|
bool retval = SwitchCredentials(drop_to_uid, drop_to_gid, true); |
615 |
|
|
if (!retval) { |
616 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslogErr, |
617 |
|
|
"failed to drop credentials to %u:%u", drop_to_uid, drop_to_gid); |
618 |
|
|
return 1; |
619 |
|
|
} |
620 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, |
621 |
|
|
"using credentials %u:%u", drop_to_uid, drop_to_gid); |
622 |
|
|
} |
623 |
|
|
|
624 |
|
|
g_handler_job = new UriHandlerJob(); |
625 |
|
|
g_handler_replicate = new UriHandlerReplicate(); |
626 |
|
|
g_handler_info = new UriHandlerInfo(); |
627 |
|
|
ReadConfigurations(); |
628 |
|
|
GenerateUriMap(); |
629 |
|
|
|
630 |
|
|
// Start the embedded web server |
631 |
|
|
struct mg_context *ctx; |
632 |
|
|
struct mg_callbacks callbacks; |
633 |
|
|
memset(&callbacks, 0, sizeof(callbacks)); |
634 |
|
|
callbacks.begin_request = MongooseOnRequest; |
635 |
|
|
callbacks.log_message = MongooseOnLog; |
636 |
|
|
|
637 |
|
|
// List of Mongoose options. Last element must be NULL. |
638 |
|
|
const char *mg_options[] = |
639 |
|
|
{"num_threads", kDefaultNumThreads, "listening_ports", port, NULL}; |
640 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, |
641 |
|
|
"starting CernVM-FS stratum agent on port %s", port); |
642 |
|
|
ctx = mg_start(&callbacks, NULL, mg_options); |
643 |
|
|
if (ctx == NULL) { |
644 |
|
|
LogCvmfs(kLogCvmfs, kLogStderr | kLogSyslogErr, |
645 |
|
|
"failed to start web server"); |
646 |
|
|
return 1; |
647 |
|
|
} |
648 |
|
|
|
649 |
|
|
MakePipe(g_pipe_ctrl); |
650 |
|
|
signal(SIGHUP, SignalReload); |
651 |
|
|
signal(SIGTERM, SignalExit); |
652 |
|
|
signal(SIGINT, SignalExit); |
653 |
|
|
signal(SIGALRM, SignalCleanup); |
654 |
|
|
alarm(kCleanupInterval); |
655 |
|
|
char ctrl; |
656 |
|
|
do { |
657 |
|
|
ReadPipe(g_pipe_ctrl[0], &ctrl, 1); |
658 |
|
|
if (ctrl == 'C') { |
659 |
|
|
MutexLockGuard guard_jobs(&g_lock_jobs); |
660 |
|
|
// Cleanup job table |
661 |
|
|
for (map<string, Job *>::iterator i = g_jobs.begin(), |
662 |
|
|
i_end = g_jobs.end(); i != i_end; ) |
663 |
|
|
{ |
664 |
|
|
if ( (i->second->status == Job::kStatusDone) && |
665 |
|
|
((platform_monotonic_time() - i->second->death) > |
666 |
|
|
kJobRetentionDuration) ) |
667 |
|
|
{ |
668 |
|
|
map<string, Job *>::iterator delete_me = i++; |
669 |
|
|
delete delete_me->second; |
670 |
|
|
g_jobs.erase(delete_me); |
671 |
|
|
} else { |
672 |
|
|
++i; |
673 |
|
|
} |
674 |
|
|
} |
675 |
|
|
alarm(kCleanupInterval); |
676 |
|
|
} |
677 |
|
|
if (ctrl == 'R') { |
678 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, "reloading configuration"); |
679 |
|
|
g_fence_configurations.Drain(); |
680 |
|
|
ClearConfigurations(); |
681 |
|
|
ReadConfigurations(); |
682 |
|
|
GenerateUriMap(); |
683 |
|
|
g_fence_configurations.Open(); |
684 |
|
|
} |
685 |
|
|
} while (ctrl != 'T'); |
686 |
|
|
LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, |
687 |
|
|
"stopping CernVM-FS stratum agent"); |
688 |
|
|
|
689 |
|
|
mg_stop(ctx); |
690 |
|
|
ClearConfigurations(); |
691 |
|
|
delete g_handler_job; |
692 |
|
|
delete g_handler_replicate; |
693 |
|
|
delete g_handler_info; |
694 |
|
|
|
695 |
|
|
SwitchCredentials(original_uid, original_gid, true); |
696 |
|
|
UnlockFile(fd_pid_file); |
697 |
|
|
unlink(pid_file); |
698 |
|
|
|
699 |
|
|
return 0; |
700 |
|
|
} |