CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cmd_sub.cc
Go to the documentation of this file.
1 
5 #ifndef __STDC_FORMAT_MACROS
6 // NOLINTNEXTLINE
7 #define __STDC_FORMAT_MACROS
8 #endif
9 
10 #include "cmd_sub.h"
11 
12 #include <inttypes.h>
13 
14 #include "crypto/signature.h"
15 #include "manifest.h"
16 #include "manifest_fetch.h"
17 #include "network/download.h"
18 #include "notify/messages.h"
19 #include "options.h"
20 #include "subscriber_sse.h"
21 #include "subscriber_supervisor.h"
22 #include "supervisor.h"
23 #include "util/logging.h"
24 #include "util/pointer.h"
25 #include "util/posix.h"
26 #include "util/string.h"
27 
28 namespace {
29 
32 
33 const int kMaxPoolHandles = 1;
34 
36  public:
37  SwissknifeSubscriber(const std::string& server_url,
38  const std::string& repository, uint64_t min_revision,
39  bool continuous, bool verbose)
40  : notify::SubscriberSSE(server_url),
41  repository_(repository),
42  stats_(),
43  dl_mgr_(new download::DownloadManager(kMaxPoolHandles,
44  perf::StatisticsTemplate("download", &stats_))),
45  sig_mgr_(new signature::SignatureManager()),
46  revision_(min_revision),
47  continuous_(continuous),
48  verbose_(verbose) {}
50  sig_mgr_->Fini();
51  }
52 
53  bool Init() {
54  const std::string config_file =
55  "/etc/cvmfs/repositories.d/" + repository_ + "/client.conf";
56  SimpleOptionsParser options;
57  if (!options.TryParsePath(config_file)) {
58  LogCvmfs(kLogCvmfs, kLogError,
59  "SwissknifeSubscriber - could not parse configuration file");
60  return false;
61  }
62 
63  std::string arg;
64  if (options.GetValue("CVMFS_SERVER_URL", &arg)) {
65  dl_mgr_->SetHostChain(arg);
66  }
67 
68  sig_mgr_->Init();
69 
70  std::string public_keys =
71  JoinStrings(FindFilesBySuffix("/etc/cvmfs/keys", ".pub"), ":");
72  if (!sig_mgr_->LoadPublicRsaKeys(public_keys)) {
73  LogCvmfs(kLogCvmfs, kLogError,
74  "SwissknifeSubscriber - could not load public keys");
75  return false;
76  }
77 
78  return true;
79  }
80 
81  private:
82  virtual notify::Subscriber::Status Consume(const std::string& repo,
83  const std::string& msg_text) {
85  if (!msg.FromJSONString(msg_text)) {
86  LogCvmfs(kLogCvmfs, kLogError,
87  "SwissknifeSubscriber - could not decode message.");
89  }
90 
93  reinterpret_cast<unsigned char*>(&(msg.manifest_[0])),
94  msg.manifest_.size(), "", repo, 0, NULL,
95  sig_mgr_.weak_ref(), dl_mgr_.weak_ref(), &ensemble);
96 
97  if (res != manifest::kFailOk) {
98  LogCvmfs(kLogCvmfs, kLogError,
99  "SwissknifeSubscriber - manifest has invalid signature: %d",
100  res);
102  }
103 
105  reinterpret_cast<const unsigned char*>(msg.manifest_.data()),
106  msg.manifest_.size()));
107 
108  if (!manifest.IsValid()) {
109  LogCvmfs(kLogCvmfs, kLogError,
110  "SwissknifeSubscriber - could not parse manifest.");
112  }
113 
114  uint64_t new_revision = manifest->revision();
115  bool triggered = false;
116  if (new_revision > revision_) {
117  LogCvmfs(kLogCvmfs, kLogInfo,
118  "SwissknifeSubscriber - repository %s is now at revision %"
119  PRIu64 ".", repo.c_str(), new_revision);
120  if (verbose_) {
121  LogCvmfs(kLogCvmfs, kLogInfo, "%s", msg_text.c_str());
122  }
123  revision_ = new_revision;
124  triggered = true;
125  }
126 
127  if (!continuous_ && triggered) {
129  }
130 
132  }
133 
134  std::string repository_;
135 
139 
140  uint64_t revision_;
142  bool verbose_;
143 };
144 
145 } // namespace
146 
147 namespace notify {
148 
149 int DoSubscribe(const std::string& server_url, const std::string& repo,
150  uint64_t min_revision, bool continuous, bool verbose) {
151  SwissknifeSubscriber subscriber(server_url, repo, min_revision, continuous,
152  verbose);
153 
154  if (!subscriber.Init()) {
155  LogCvmfs(kLogCvmfs, kLogError, "Could not initialize SwissknifeSubscriber");
156  return 1;
157  }
158 
159  // Retry settings: accept no more than 10 failures in the last minute
160  const int num_retries = 10;
161  const uint64_t interval = 60;
162  SubscriberSupervisor supervisor(&subscriber, repo, num_retries, interval);
163  supervisor.Run();
164 
165  return 0;
166 }
167 
168 } // namespace notify
std::string manifest_
Definition: messages.h:49
const manifest::Manifest * manifest() const
Definition: repository.h:125
static Manifest * LoadMem(const unsigned char *buffer, const unsigned length)
Definition: manifest.cc:82
Failures Verify(unsigned char *manifest_data, size_t manifest_size, const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:325
UniquePtr< download::DownloadManager > dl_mgr_
Definition: cmd_sub.cc:137
UniquePtr< signature::SignatureManager > sig_mgr_
Definition: cmd_sub.cc:138
bool Run()
Definition: supervisor.cc:15
const LogFacilities & kLogError
Definition: cmd_pub.cc:23
const LogFacilities & kLogInfo
Definition: cmd_pub.cc:22
virtual notify::Subscriber::Status Consume(const std::string &repo, const std::string &msg_text)
Definition: cmd_sub.cc:82
int DoSubscribe(const std::string &server_url, const std::string &repo, uint64_t min_revision, bool continuous, bool verbose)
Definition: cmd_sub.cc:149
bool TryParsePath(const std::string &config_file)
Definition: options.cc:110
bool IsValid() const
Definition: pointer.h:43
bool GetValue(const std::string &key, std::string *value) const
Definition: options.cc:376
SwissknifeSubscriber(const std::string &server_url, const std::string &repository, uint64_t min_revision, bool continuous, bool verbose)
Definition: cmd_sub.cc:37
virtual bool FromJSONString(const std::string &s)
Definition: messages.cc:45
LogFacilities
static LogFacilities info
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1124
static LogFacilities error
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528