CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cmd_sub.cc
Go to the documentation of this file.
1 
5 #ifndef __STDC_FORMAT_MACROS
6 // NOLINTNEXTLINE
7 #define __STDC_FORMAT_MACROS
8 #endif
9 
10 #include "cmd_sub.h"
11 
12 #include <inttypes.h>
13 
14 #include "crypto/signature.h"
15 #include "manifest.h"
16 #include "manifest_fetch.h"
17 #include "network/download.h"
18 #include "notify/messages.h"
19 #include "options.h"
20 #include "subscriber_sse.h"
21 #include "subscriber_supervisor.h"
22 #include "supervisor.h"
23 #include "util/logging.h"
24 #include "util/pointer.h"
25 #include "util/posix.h"
26 #include "util/string.h"
27 
28 namespace {
29 
32 
33 const int kMaxPoolHandles = 1;
34 
36  public:
37  SwissknifeSubscriber(const std::string &server_url,
38  const std::string &repository, uint64_t min_revision,
39  bool continuous, bool verbose)
40  : notify::SubscriberSSE(server_url)
41  , repository_(repository)
42  , stats_()
43  , dl_mgr_(new download::DownloadManager(
44  kMaxPoolHandles, perf::StatisticsTemplate("download", &stats_)))
45  , sig_mgr_(new signature::SignatureManager())
46  , revision_(min_revision)
47  , continuous_(continuous)
48  , verbose_(verbose) { }
49  virtual ~SwissknifeSubscriber() { sig_mgr_->Fini(); }
50 
51  bool Init() {
52  const std::string config_file = "/etc/cvmfs/repositories.d/" + repository_
53  + "/client.conf";
54  SimpleOptionsParser options;
55  if (!options.TryParsePath(config_file)) {
56  LogCvmfs(kLogCvmfs, kLogError,
57  "SwissknifeSubscriber - could not parse configuration file");
58  return false;
59  }
60 
61  std::string arg;
62  if (options.GetValue("CVMFS_SERVER_URL", &arg)) {
63  dl_mgr_->SetHostChain(arg);
64  }
65 
66  sig_mgr_->Init();
67 
68  std::string public_keys = JoinStrings(
69  FindFilesBySuffix("/etc/cvmfs/keys", ".pub"), ":");
70  if (!sig_mgr_->LoadPublicRsaKeys(public_keys)) {
71  LogCvmfs(kLogCvmfs, kLogError,
72  "SwissknifeSubscriber - could not load public keys");
73  return false;
74  }
75 
76  return true;
77  }
78 
79  private:
80  virtual notify::Subscriber::Status Consume(const std::string &repo,
81  const std::string &msg_text) {
83  if (!msg.FromJSONString(msg_text)) {
84  LogCvmfs(kLogCvmfs, kLogError,
85  "SwissknifeSubscriber - could not decode message.");
87  }
88 
91  reinterpret_cast<unsigned char *>(&(msg.manifest_[0])),
92  msg.manifest_.size(), "", repo, 0, NULL, sig_mgr_.weak_ref(),
93  dl_mgr_.weak_ref(), &ensemble);
94 
95  if (res != manifest::kFailOk) {
96  LogCvmfs(kLogCvmfs, kLogError,
97  "SwissknifeSubscriber - manifest has invalid signature: %d",
98  res);
100  }
101 
103  reinterpret_cast<const unsigned char *>(msg.manifest_.data()),
104  msg.manifest_.size()));
105 
106  if (!manifest.IsValid()) {
107  LogCvmfs(kLogCvmfs, kLogError,
108  "SwissknifeSubscriber - could not parse manifest.");
110  }
111 
112  uint64_t new_revision = manifest->revision();
113  bool triggered = false;
114  if (new_revision > revision_) {
115  LogCvmfs(
116  kLogCvmfs, kLogInfo,
117  "SwissknifeSubscriber - repository %s is now at revision %" PRIu64
118  ".",
119  repo.c_str(), new_revision);
120  if (verbose_) {
121  LogCvmfs(kLogCvmfs, kLogInfo, "%s", msg_text.c_str());
122  }
123  revision_ = new_revision;
124  triggered = true;
125  }
126 
127  if (!continuous_ && triggered) {
129  }
130 
132  }
133 
134  std::string repository_;
135 
139 
140  uint64_t revision_;
142  bool verbose_;
143 };
144 
145 } // namespace
146 
147 namespace notify {
148 
149 int DoSubscribe(const std::string &server_url, const std::string &repo,
150  uint64_t min_revision, bool continuous, bool verbose) {
151  SwissknifeSubscriber subscriber(server_url, repo, min_revision, continuous,
152  verbose);
153 
154  if (!subscriber.Init()) {
155  LogCvmfs(kLogCvmfs, kLogError, "Could not initialize SwissknifeSubscriber");
156  return 1;
157  }
158 
159  // Retry settings: accept no more than 10 failures in the last minute
160  const int num_retries = 10;
161  const uint64_t interval = 60;
162  SubscriberSupervisor supervisor(&subscriber, repo, num_retries, interval);
163  supervisor.Run();
164 
165  return 0;
166 }
167 
168 } // namespace notify
std::string manifest_
Definition: messages.h:51
const manifest::Manifest * manifest() const
Definition: repository.h:125
static Manifest * LoadMem(const unsigned char *buffer, const unsigned length)
Definition: manifest.cc:82
Failures Verify(unsigned char *manifest_data, size_t manifest_size, const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:356
UniquePtr< download::DownloadManager > dl_mgr_
Definition: cmd_sub.cc:137
UniquePtr< signature::SignatureManager > sig_mgr_
Definition: cmd_sub.cc:138
bool Run()
Definition: supervisor.cc:15
const LogFacilities & kLogError
Definition: cmd_pub.cc:23
const LogFacilities & kLogInfo
Definition: cmd_pub.cc:22
virtual notify::Subscriber::Status Consume(const std::string &repo, const std::string &msg_text)
Definition: cmd_sub.cc:80
int DoSubscribe(const std::string &server_url, const std::string &repo, uint64_t min_revision, bool continuous, bool verbose)
Definition: cmd_sub.cc:149
bool TryParsePath(const std::string &config_file)
Definition: options.cc:109
bool IsValid() const
Definition: pointer.h:47
bool GetValue(const std::string &key, std::string *value) const
Definition: options.cc:368
SwissknifeSubscriber(const std::string &server_url, const std::string &repository, uint64_t min_revision, bool continuous, bool verbose)
Definition: cmd_sub.cc:37
virtual bool FromJSONString(const std::string &s)
Definition: messages.cc:44
LogFacilities
static LogFacilities info
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1128
static LogFacilities error
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545