GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/stratum_agent/stratum_agent.cc Lines: 0 319 0.0 %
Date: 2019-02-03 02:48:13 Branches: 0 193 0.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 *
4
 * The stratum agent runs on stratum 1 servers and provides a web service to
5
 * release manager machines.  Release manager machines can send signed messages
6
 * to the stratum agent to trigger a replication run ('cvmfs_server snapshot').
7
 *
8
 * The service offers the following API
9
 *   - /cvmfs/<repo name>/api/v1/replicate/new (POST):
10
 *     create new snapshot run.  Returns JSON with the job id
11
 *   - /cvmfs/<repo name>/api/v1/replicate/<job id>/
12
 *       {status,stdout,stderr,tail} (GET):
13
 *     Retrieve information about running and finished jobs.  The status is
14
 *     JSON encoded.  Finished jobs are cleaned up after a while.
15
 *
16
 * TODO(jblomer): add support for synchronized application of a new revision.
17
 * In this case, the snapshot would run but the new manifest would only be
18
 * applied on another signal.
19
 */
20
21
#include <arpa/inet.h>
22
#include <poll.h>
23
#include <pthread.h>
24
#include <unistd.h>
25
26
#include <cassert>
27
#include <cstdio>
28
#include <cstring>
29
#include <ctime>
30
#include <map>
31
#include <string>
32
#include <vector>
33
34
#include "download.h"
35
#include "fence.h"
36
#include "json.h"
37
#include "letter.h"
38
#include "logging.h"
39
#include "mongoose.h"
40
#include "options.h"
41
#include "platform.h"
42
#include "signature.h"
43
#include "stratum_agent/uri_map.h"
44
#include "util/pointer.h"
45
#include "util/posix.h"
46
#include "util/string.h"
47
#include "util_concurrency.h"
48
#include "uuid.h"
49
#include "whitelist.h"
50
51
using namespace std;  // NOLINT
52
53
// Upon successful testing, it might adopt the mainline cvmfs version
54
const unsigned kVersionMajor = 1;
55
const unsigned kVersionMinor = 0;
56
const unsigned kVersionPatch = 0;
57
58
const char *kDefaultPidFile = "/var/run/cvmfs_stratum_agent.pid";
59
// Mongoose takes the port as string in its options array
60
const char *kDefaultPort = "7999";
61
const char *kDefaultNumThreads = "2";
62
// Keep finished jobs for 3 hours
63
const unsigned kJobRetentionDuration = 3 * 3600;
64
const unsigned kCleanupInterval = 60;  // Scan job table every minute
65
66
67
/**
68
 * Everything that's needed to verify requests for a single repository
69
 */
70
struct RepositoryConfig : SingleCopy {
71
  RepositoryConfig()
72
    : download_mgr(NULL)
73
    , signature_mgr(NULL)
74
    , options_mgr(NULL)
75
    , statistics(NULL)
76
  { }
77
  ~RepositoryConfig() {
78
    if (download_mgr) download_mgr->Fini();
79
    if (signature_mgr) signature_mgr->Fini();
80
    delete download_mgr;
81
    delete signature_mgr;
82
    delete options_mgr;
83
    delete statistics;
84
  }
85
  string alias;  // stratum 1s can have an alias different from the fqrn
86
  string fqrn;
87
  string stratum0_url;
88
  download::DownloadManager *download_mgr;
89
  signature::SignatureManager *signature_mgr;
90
  OptionsManager *options_mgr;
91
  perf::Statistics *statistics;
92
};
93
94
95
/**
96
 * Catures a run of 'cvmfs_server snapshot <reponame>'
97
 */
98
struct Job : SingleCopy {
99
  Job() : remote_ip(0), fd_stdin(-1), fd_stdout(-1), fd_stderr(-1),
100
          status(kStatusLimbo), exit_code(-1),
101
          birth(platform_monotonic_time()), death(0), finish_timestamp(0),
102
          pid(0)
103
  {
104
    memset(&thread_job, 0, sizeof(thread_job));
105
    int retval = pthread_mutex_init(&lock, NULL);
106
    assert(retval == 0);
107
  }
108
  ~Job() {
109
    pthread_mutex_destroy(&lock);
110
  }
111
  enum Status {
112
    kStatusLimbo,
113
    kStatusRunning,
114
    kStatusDone
115
  };
116
117
  string id;
118
  string alias;  // Usually the fqrn
119
  // long type imposed by Mongoose
120
  long remote_ip;  // NOLINT
121
122
  int fd_stdin;
123
  int fd_stdout;
124
  int fd_stderr;
125
  string stdout;
126
  string stderr;
127
  Status status;
128
  int exit_code;
129
  pthread_t thread_job;
130
  uint64_t birth;
131
  uint64_t death;
132
  time_t finish_timestamp;
133
  pid_t pid;
134
  pthread_mutex_t lock;
135
};
136
137
class UriHandlerReplicate;
138
class UriHandlerJob;
139
class UriHandlerInfo;
140
141
/**
142
 * Handler for requests to start a new snapshot run.
143
 */
144
UriHandlerReplicate *g_handler_replicate;
145
/**
146
 * Handler to query jobs from the job table.
147
 */
148
UriHandlerJob *g_handler_job;
149
UriHandlerInfo *g_handler_info;
150
/**
151
 * Routes URIs to handlers.
152
 */
153
UriMap g_uri_map;
154
/**
155
 * Maps fqrn to manager classes constructed from the configuration in /etc.
156
 */
157
map<string, RepositoryConfig *> g_configurations;
158
/**
159
 * Prevents handlers to run while the configuration is updated
160
 */
161
Fence g_fence_configurations;
162
/**
163
 * Table of jobs, maps the job id to job information.
164
 */
165
map<string, Job *> g_jobs;
166
pthread_mutex_t g_lock_jobs = PTHREAD_MUTEX_INITIALIZER;
167
/**
168
 * Used to control the main thread from signals.
169
 */
170
int g_pipe_ctrl[2];
171
172
173
/**
174
 * Parse /etc/cvmfs/repositories.d/<fqrn>/.conf and search for stratum 1s
175
 */
176
static void ReadConfigurations() {
177
  vector<string> repo_config_dirs =
178
    FindDirectories("/etc/cvmfs/repositories.d");
179
  for (unsigned i = 0; i < repo_config_dirs.size(); ++i) {
180
    string name = GetFileName(repo_config_dirs[i]);
181
    string optarg;
182
    UniquePtr<OptionsManager> options_mgr(
183
      new BashOptionsManager(new DefaultOptionsTemplateManager(
184
        repo_config_dirs[i])));
185
    options_mgr->set_taint_environment(false);
186
    options_mgr->ParsePath(repo_config_dirs[i] + "/server.conf",
187
      false);
188
    if (!options_mgr->GetValue("CVMFS_REPOSITORY_TYPE", &optarg) ||
189
        (optarg != "stratum1"))
190
    {
191
      continue;
192
    }
193
    options_mgr->ParsePath(repo_config_dirs[i] + "/replica.conf",
194
      false);
195
    if (options_mgr->GetValue("CVMFS_STRATUM_AGENT", &optarg) &&
196
        !options_mgr->IsOn(optarg))
197
    {
198
      continue;
199
    }
200
201
    UniquePtr<signature::SignatureManager>
202
      signature_mgr(new signature::SignatureManager());
203
    signature_mgr->Init();
204
    options_mgr->GetValue("CVMFS_PUBLIC_KEY", &optarg);
205
    if (DirectoryExists(optarg))
206
      optarg = JoinStrings(FindFilesBySuffix(optarg, ".pub"), ":");
207
    if (!signature_mgr->LoadPublicRsaKeys(optarg)) {
208
      LogCvmfs(kLogCvmfs, kLogStderr | kLogSyslogErr,
209
               "(%s) could not load public key %s",
210
               name.c_str(), optarg.c_str());
211
      continue;
212
    }
213
214
    UniquePtr<perf::Statistics> statistics(new perf::Statistics());
215
    UniquePtr<download::DownloadManager>
216
      download_mgr(new download::DownloadManager());
217
    download_mgr->Init(4, false, perf::StatisticsTemplate("agent", statistics));
218
    if (options_mgr->GetValue("CVMFS_HTTP_TIMEOUT", &optarg))
219
      download_mgr->SetTimeout(String2Uint64(optarg), String2Uint64(optarg));
220
    if (options_mgr->GetValue("CVMFS_HTTP_RETRIES", &optarg))
221
      download_mgr->SetRetryParameters(String2Uint64(optarg), 1000, 2000);
222
    RepositoryConfig *config = new RepositoryConfig();
223
    config->alias = name;
224
    options_mgr->GetValue("CVMFS_REPOSITORY_NAME", &(config->fqrn));
225
    options_mgr->GetValue("CVMFS_STRATUM0", &(config->stratum0_url));
226
    config->signature_mgr = signature_mgr.Release();
227
    config->download_mgr = download_mgr.Release();
228
    config->options_mgr = options_mgr.Release();
229
    config->statistics = statistics.Release();
230
    g_configurations[config->fqrn] = config;
231
    LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, "watching %s",
232
             config->fqrn.c_str());
233
  }
234
  if (g_configurations.empty()) {
235
    LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslogWarn,
236
             "Warning: no stratum 1 repositories found");
237
  }
238
}
239
240
241
/**
242
 * New replication job.  Responds on /cvmfs/<repo>/api/v1/replicate/new
243
 */
244
class UriHandlerReplicate : public UriHandler {
245
 public:
246
  virtual ~UriHandlerReplicate() { }
247
248
  virtual void OnRequest(const struct mg_request_info *req_info,
249
                         struct mg_connection *conn)
250
  {
251
    vector<string> uri_tokens = SplitString(req_info->uri, '/');
252
    // strip api/v1/replicate/new
253
    string fqrn = uri_tokens[uri_tokens.size() - 5];
254
    char post_data[kMaxPostData];  post_data[0] = '\0';
255
    unsigned post_data_len = mg_read(conn, post_data, sizeof(post_data) - 1);
256
    post_data[post_data_len] = '\0';
257
    // Trim trailing newline
258
    if (post_data_len && (post_data[post_data_len - 1] == '\n'))
259
      post_data[post_data_len - 1] = '\0';
260
261
    FenceGuard guard_configurations(&g_fence_configurations);
262
    RepositoryConfig *config = g_configurations[fqrn];
263
    assert(fqrn == config->fqrn);
264
265
    // Verify letter
266
    string message;
267
    string cert;
268
    letter::Letter letter(config->fqrn, post_data, config->signature_mgr);
269
    letter::Failures retval_lt = letter.Verify(kTimeoutLetter, &message, &cert);
270
    if (retval_lt != letter::kFailOk) {
271
      WebReply::Send(WebReply::k400, MkJsonError(letter::Code2Ascii(retval_lt)),
272
                     conn);
273
      return;
274
    }
275
    whitelist::Whitelist whitelist(config->fqrn, config->download_mgr,
276
                                   config->signature_mgr);
277
    whitelist::Failures retval_wl = whitelist.Load(config->stratum0_url);
278
    if (retval_wl == whitelist::kFailOk)
279
      retval_wl = whitelist.VerifyLoadedCertificate();
280
    if (retval_wl != whitelist::kFailOk) {
281
      WebReply::Send(WebReply::k400,
282
                     MkJsonError(whitelist::Code2Ascii(retval_wl)), conn);
283
      return;
284
    }
285
286
    UniquePtr<Job> job(new Job());
287
    string exe = "/usr/bin/cvmfs_server";
288
    vector<string> argv;
289
    argv.push_back("snapshot");
290
    argv.push_back(config->alias);
291
    bool retval_b = ExecuteBinary(
292
      &job->fd_stdin, &job->fd_stdout, &job->fd_stderr,
293
      exe, argv, false, &job->pid);
294
    if (!retval_b) {
295
      WebReply::Send(WebReply::k500,
296
                     MkJsonError("could not spawn snapshot process"), conn);
297
      return;
298
    }
299
    job->status = Job::kStatusRunning;
300
    string uuid = cvmfs::Uuid::CreateOneTime();
301
    job->id = uuid;
302
    job->alias = config->alias;
303
    job->remote_ip = ntohl(req_info->remote_ip);
304
305
    int retval_i = pthread_create(&job->thread_job, NULL, MainJobMgr, job);
306
    assert(retval_i == 0);
307
    retval_i = pthread_detach(job->thread_job);
308
    assert(retval_i == 0);
309
    {
310
      MutexLockGuard guard_jobs(&g_lock_jobs);
311
      g_jobs[uuid] = job.Release();
312
    }
313
    WebReply::Send(WebReply::k200, "{\"job_id\":\"" + uuid + "\"}", conn);
314
  }
315
316
 private:
317
  static const unsigned kMaxPostData = 32 * 1024;  // 32kB
318
  static const unsigned kTimeoutLetter = 180;  // 3 minutes
319
320
  // Polls stdout/stderr of running jobs
321
  static void *MainJobMgr(void *data) {
322
    Job *job = reinterpret_cast<Job *>(data);
323
    string job_id = job->id;
324
    string alias = job->alias;
325
    char ipv4_buf[INET_ADDRSTRLEN];
326
327
    LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog,
328
             "(%s) starting replication job %s from %s",
329
             alias.c_str(), job_id.c_str(),
330
             inet_ntop(AF_INET, &job->remote_ip, ipv4_buf, INET_ADDRSTRLEN));
331
    Block2Nonblock(job->fd_stdout);
332
    Block2Nonblock(job->fd_stderr);
333
    char buf_stdout[kPageSize];
334
    char buf_stderr[kPageSize];
335
    struct pollfd watch_fds[2];
336
    watch_fds[0].fd = job->fd_stdout;
337
    watch_fds[1].fd = job->fd_stderr;
338
    watch_fds[0].events = watch_fds[1].events = POLLIN | POLLPRI | POLLHUP;
339
    watch_fds[0].revents = watch_fds[1].revents = 0;
340
    bool terminate = false;
341
    while (!terminate) {
342
      int retval = poll(watch_fds, 2, -1);
343
      if (retval < 0)
344
        continue;
345
      if (watch_fds[0].revents) {
346
        watch_fds[0].revents = 0;
347
        int nbytes = read(watch_fds[0].fd, buf_stdout, kPageSize);
348
        if ((nbytes <= 0) && (errno != EINTR))
349
          terminate = true;
350
        if (nbytes > 0) {
351
          MutexLockGuard guard_job(&job->lock);
352
          job->stdout += string(buf_stdout, nbytes);
353
        }
354
      }
355
      if (watch_fds[1].revents) {
356
        watch_fds[1].revents = 0;
357
        int nbytes = read(watch_fds[1].fd, buf_stderr, kPageSize);
358
        if ((nbytes <= 0) && (errno != EINTR))
359
          terminate = true;
360
        if (nbytes > 0) {
361
          MutexLockGuard guard_job(&job->lock);
362
          job->stderr += string(buf_stderr, nbytes);
363
        }
364
      }
365
    }
366
367
    {
368
      MutexLockGuard guard_job(&job->lock);
369
      close(job->fd_stdin);  job->fd_stdin = -1;
370
      close(job->fd_stdout);  job->fd_stdout = -1;
371
      close(job->fd_stderr);  job->fd_stderr = -1;
372
      job->exit_code = WaitForChild(job->pid);
373
      job->death = platform_monotonic_time();
374
      job->finish_timestamp = time(NULL);
375
      job->status = Job::kStatusDone;
376
      job_id = job->id;
377
    }
378
    LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog,
379
             "(%s) finished replication job %s", alias.c_str(), job_id.c_str());
380
    return NULL;
381
  }
382
383
  string MkJsonError(const string &msg) {
384
    return "{\"error\": \"" + msg + "\"}";
385
  }
386
};
387
388
389
/**
390
 * Handler to query job information under
391
 * /cvmfs/<repo>/api/v1/replicate/<jobid>/{stdout,stderr,tail,status}
392
 */
393
class UriHandlerJob : public UriHandler {
394
 public:
395
  virtual ~UriHandlerJob() { }
396
397
  virtual void OnRequest(const struct mg_request_info *req_info,
398
                         struct mg_connection *conn)
399
  {
400
    string what = GetFileName(req_info->uri);
401
    string job_id = GetFileName(GetParentPath(req_info->uri));
402
403
    MutexLockGuard guard_jobs(&g_lock_jobs);
404
    map<string, Job *>::const_iterator iter = g_jobs.find(job_id);
405
    if (iter == g_jobs.end()) {
406
      WebReply::Send(WebReply::k404, "{\"error\":\"no such job\"}", conn);
407
      return;
408
    }
409
    Job *job = iter->second;
410
    MutexLockGuard guard_this_job(&job->lock);
411
    if (what == "stdout") {
412
      WebReply::Send(WebReply::k200, job->stdout, conn);
413
    } else if (what == "stderr") {
414
      WebReply::Send(WebReply::k200, job->stderr, conn);
415
    } else if (what == "tail") {
416
      WebReply::Send(WebReply::k200, Tail(job->stdout, 4), conn);
417
    } else if (what == "status") {
418
      string reply = "{\"status\":";
419
      switch (job->status) {
420
        case Job::kStatusLimbo: reply += "\"limbo\""; break;
421
        case Job::kStatusRunning: reply += "\"running\""; break;
422
        case Job::kStatusDone: reply += "\"done\""; break;
423
        default: assert(false);
424
      }
425
      if (job->status == Job::kStatusDone) {
426
        reply += ",\"exit_code\":" + StringifyInt(job->exit_code);
427
        reply += ",\"duration\":" + StringifyInt(job->death - job->birth);
428
        reply += ",\"finish_timestamp\":\"" +
429
                  StringifyTime(job->finish_timestamp, false) + " UTC\"";
430
      }
431
      reply += "}";
432
      WebReply::Send(WebReply::k200, reply, conn);
433
    } else {
434
      WebReply::Send(WebReply::k404, "{\"error\":\"internal error\"}", conn);
435
    }
436
  }
437
};
438
439
440
/**
441
 * Handler to query service information at /cvmfs/<repo>/api/v1/replicate/info
442
 */
443
class UriHandlerInfo : public UriHandler {
444
 public:
445
  virtual ~UriHandlerInfo() { }
446
447
  virtual void OnRequest(const struct mg_request_info *req_info,
448
                         struct mg_connection *conn)
449
  {
450
    string version = StringifyInt(kVersionMajor) + "." +
451
                     StringifyInt(kVersionMinor) + "." +
452
                     StringifyInt(kVersionPatch);
453
    WebReply::Send(WebReply::k200, "{\"version\":\"" + version + "\"}", conn);
454
  }
455
};
456
457
458
/**
459
 * Create the REST API from configuration
460
 */
461
static void GenerateUriMap() {
462
  for (map<string, RepositoryConfig *>::const_iterator i =
463
       g_configurations.begin(), i_end = g_configurations.end();
464
       i != i_end; ++i)
465
  {
466
    string fqrn = i->second->fqrn;
467
    g_uri_map.Register(WebRequest(
468
      "/cvmfs/" + fqrn + "/api/v1/replicate/*/stdout", WebRequest::kGet),
469
      g_handler_job);
470
    g_uri_map.Register(WebRequest(
471
      "/cvmfs/" + fqrn + "/api/v1/replicate/*/stderr", WebRequest::kGet),
472
      g_handler_job);
473
    g_uri_map.Register(WebRequest(
474
      "/cvmfs/" + fqrn + "/api/v1/replicate/*/tail", WebRequest::kGet),
475
      g_handler_job);
476
    g_uri_map.Register(WebRequest(
477
      "/cvmfs/" + fqrn + "/api/v1/replicate/*/status", WebRequest::kGet),
478
      g_handler_job);
479
    g_uri_map.Register(WebRequest("/cvmfs/" + fqrn + "/api/v1/replicate/new",
480
                                  WebRequest::kPost),
481
                       g_handler_replicate);
482
    g_uri_map.Register(WebRequest(
483
      "/cvmfs/" + fqrn + "/api/v1/replicate/info", WebRequest::kGet),
484
      g_handler_info);
485
  }
486
}
487
488
489
static void ClearConfigurations() {
490
  for (map<string, RepositoryConfig *>::const_iterator i =
491
       g_configurations.begin(), i_end = g_configurations.end();
492
       i != i_end; ++i)
493
  {
494
    delete i->second;
495
  }
496
  g_configurations.clear();
497
}
498
499
500
/**
501
 * Embedded web server's request callbacks
502
 */
503
static int MongooseOnRequest(struct mg_connection *conn) {
504
  const struct mg_request_info *request_info = mg_get_request_info(conn);
505
  WebRequest request(request_info);
506
  UriHandler *handler = g_uri_map.Route(request);
507
  if (handler == NULL) {
508
    if (g_uri_map.IsKnownUri(request.uri()))
509
      WebReply::Send(WebReply::k405, "", conn);
510
    else
511
      WebReply::Send(WebReply::k404, "", conn);
512
    return 1;
513
  }
514
515
  handler->OnRequest(request_info, conn);
516
  return 1;
517
}
518
519
520
/**
521
 * Redirect Mongoose's logging to LogCvmfs
522
 */
523
static int MongooseOnLog(const struct mg_connection *, const char *message) {
524
  LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, "(web server) %s", message);
525
  return 1;
526
}
527
528
529
// Signals write to a pipe on which the main thread is listening
530
void SignalCleanup(int signal) {
531
  char c = 'C';
532
  WritePipe(g_pipe_ctrl[1], &c, 1);
533
}
534
535
void SignalExit(int signal) {
536
  char c = 'T';
537
  WritePipe(g_pipe_ctrl[1], &c, 1);
538
}
539
540
void SignalReload(int signal) {
541
  char c = 'R';
542
  WritePipe(g_pipe_ctrl[1], &c, 1);
543
}
544
545
546
547
void Usage(const char *progname) {
548
  LogCvmfs(kLogCvmfs, kLogStdout, "%s version %u.%u.%u\n"
549
           "Provides a REST service to release manager machines to let them "
550
           "trigger repository replication\n"
551
           "\n"
552
           "Usage: %s [-f(oreground)] [-p port (default: %s)]\n"
553
           "          [-P pid file (default: %s)] [-u username]",
554
           progname, kVersionMajor, kVersionMinor, kVersionPatch,
555
           progname, kDefaultPort, kDefaultPidFile);
556
}
557
558
559
int main(int argc, char **argv) {
560
  const char *port = kDefaultPort;
561
  const char *pid_file = kDefaultPidFile;
562
  bool foreground = false;
563
  string persona;
564
  uid_t original_uid = 0, drop_to_uid = 0;
565
  gid_t original_gid = 0, drop_to_gid = 0;
566
567
  int c;
568
  while ((c = getopt(argc, argv, "hvfp:P:u:")) != -1) {
569
    switch (c) {
570
      case 'f':
571
        foreground = true;
572
        break;
573
      case 'p':
574
        port = optarg;
575
        break;
576
      case 'P':
577
        pid_file = optarg;
578
        break;
579
      case 'u': {
580
        persona = optarg;
581
        bool retval = GetUidOf(persona, &drop_to_uid, &drop_to_gid);
582
        if (!retval) {
583
          LogCvmfs(kLogCvmfs, kLogStderr | kLogSyslogErr,
584
                   "cannot find user %s", persona.c_str());
585
          return 1;
586
        }
587
        break;
588
      }
589
      case 'v':
590
        break;
591
      case 'h':
592
        Usage(argv[0]);
593
        return 0;
594
      default:
595
        Usage(argv[0]);
596
        return 1;
597
    }
598
  }
599
600
  if (!foreground)
601
    Daemonize();
602
  int fd_pid_file = WritePidFile(pid_file);
603
  if (fd_pid_file < 0) {
604
    string reason;
605
    if (fd_pid_file == -2)
606
      reason = " (another daemon is already running)";
607
    LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslogErr,
608
             "failed to write pid file %s%s", pid_file, reason.c_str());
609
    return 1;
610
  }
611
  if (!persona.empty()) {
612
    original_uid = geteuid();
613
    original_gid = getegid();
614
    bool retval = SwitchCredentials(drop_to_uid, drop_to_gid, true);
615
    if (!retval) {
616
      LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslogErr,
617
               "failed to drop credentials to %u:%u", drop_to_uid, drop_to_gid);
618
      return 1;
619
    }
620
    LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog,
621
             "using credentials %u:%u", drop_to_uid, drop_to_gid);
622
  }
623
624
  g_handler_job = new UriHandlerJob();
625
  g_handler_replicate = new UriHandlerReplicate();
626
  g_handler_info = new UriHandlerInfo();
627
  ReadConfigurations();
628
  GenerateUriMap();
629
630
  // Start the embedded web server
631
  struct mg_context *ctx;
632
  struct mg_callbacks callbacks;
633
  memset(&callbacks, 0, sizeof(callbacks));
634
  callbacks.begin_request = MongooseOnRequest;
635
  callbacks.log_message = MongooseOnLog;
636
637
  // List of Mongoose options. Last element must be NULL.
638
  const char *mg_options[] =
639
    {"num_threads", kDefaultNumThreads, "listening_ports", port, NULL};
640
  LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog,
641
           "starting CernVM-FS stratum agent on port %s", port);
642
  ctx = mg_start(&callbacks, NULL, mg_options);
643
  if (ctx == NULL) {
644
    LogCvmfs(kLogCvmfs, kLogStderr | kLogSyslogErr,
645
             "failed to start web server");
646
    return 1;
647
  }
648
649
  MakePipe(g_pipe_ctrl);
650
  signal(SIGHUP, SignalReload);
651
  signal(SIGTERM, SignalExit);
652
  signal(SIGINT, SignalExit);
653
  signal(SIGALRM, SignalCleanup);
654
  alarm(kCleanupInterval);
655
  char ctrl;
656
  do {
657
    ReadPipe(g_pipe_ctrl[0], &ctrl, 1);
658
    if (ctrl == 'C') {
659
      MutexLockGuard guard_jobs(&g_lock_jobs);
660
      // Cleanup job table
661
      for (map<string, Job *>::iterator i = g_jobs.begin(),
662
           i_end = g_jobs.end(); i != i_end; )
663
      {
664
        if ( (i->second->status == Job::kStatusDone) &&
665
             ((platform_monotonic_time() - i->second->death) >
666
               kJobRetentionDuration) )
667
        {
668
          map<string, Job *>::iterator delete_me = i++;
669
          delete delete_me->second;
670
          g_jobs.erase(delete_me);
671
        } else {
672
          ++i;
673
        }
674
      }
675
      alarm(kCleanupInterval);
676
    }
677
    if (ctrl == 'R') {
678
      LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog, "reloading configuration");
679
      g_fence_configurations.Drain();
680
      ClearConfigurations();
681
      ReadConfigurations();
682
      GenerateUriMap();
683
      g_fence_configurations.Open();
684
    }
685
  } while (ctrl != 'T');
686
  LogCvmfs(kLogCvmfs, kLogStdout | kLogSyslog,
687
           "stopping CernVM-FS stratum agent");
688
689
  mg_stop(ctx);
690
  ClearConfigurations();
691
  delete g_handler_job;
692
  delete g_handler_replicate;
693
  delete g_handler_info;
694
695
  SwitchCredentials(original_uid, original_gid, true);
696
  UnlockFile(fd_pid_file);
697
  unlink(pid_file);
698
699
  return 0;
700
}