CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cmd_sub.cc
Go to the documentation of this file.
1 
5 #include "cmd_sub.h"
6 
7 #include "download.h"
8 #include "logging.h"
9 #include "manifest.h"
10 #include "manifest_fetch.h"
11 #include "notify/messages.h"
12 #include "options.h"
13 #include "signature.h"
14 #include "subscriber_sse.h"
15 #include "subscriber_supervisor.h"
16 #include "supervisor.h"
17 #include "util/pointer.h"
18 #include "util/posix.h"
19 #include "util/string.h"
20 
21 namespace {
22 
25 
26 const int kMaxPoolHandles = 1;
27 const bool kUseSystemProxy = true;
28 
30  public:
31  SwissknifeSubscriber(const std::string& server_url,
32  const std::string& repository, uint64_t min_revision,
33  bool continuous, bool verbose)
34  : notify::SubscriberSSE(server_url),
35  repository_(repository),
36  stats_(),
37  dl_mgr_(new download::DownloadManager()),
38  sig_mgr_(new signature::SignatureManager()),
39  revision_(min_revision),
40  continuous_(continuous),
41  verbose_(verbose) {}
43  sig_mgr_->Fini();
44  dl_mgr_->Fini();
45  }
46 
47  bool Init() {
48  const std::string config_file =
49  "/etc/cvmfs/repositories.d/" + repository_ + "/client.conf";
50  SimpleOptionsParser options;
51  if (!options.TryParsePath(config_file)) {
52  LogCvmfs(kLogCvmfs, kLogError,
53  "SwissknifeSubscriber - could not parse configuration file");
54  return false;
55  }
56 
57  dl_mgr_->Init(kMaxPoolHandles, kUseSystemProxy,
58  perf::StatisticsTemplate("download", &stats_));
59 
60  std::string arg;
61  if (options.GetValue("CVMFS_SERVER_URL", &arg)) {
62  dl_mgr_->SetHostChain(arg);
63  }
64 
65  sig_mgr_->Init();
66 
67  std::string public_keys =
68  JoinStrings(FindFilesBySuffix("/etc/cvmfs/keys", ".pub"), ":");
69  if (!sig_mgr_->LoadPublicRsaKeys(public_keys)) {
70  LogCvmfs(kLogCvmfs, kLogError,
71  "SwissknifeSubscriber - could not load public keys");
72  return false;
73  }
74 
75  return true;
76  }
77 
78  private:
79  virtual notify::Subscriber::Status Consume(const std::string& repo,
80  const std::string& msg_text) {
82  if (!msg.FromJSONString(msg_text)) {
83  LogCvmfs(kLogCvmfs, kLogError,
84  "SwissknifeSubscriber - could not decode message.");
86  }
87 
90  &(msg.manifest_[0]), msg.manifest_.size(), "", repo, 0, NULL,
91  sig_mgr_.weak_ref(), dl_mgr_.weak_ref(), &ensemble);
92 
93  if (res != manifest::kFailOk) {
94  LogCvmfs(kLogCvmfs, kLogError,
95  "SwissknifeSubscriber - manifest has invalid signature: %d",
96  res);
98  }
99 
101  reinterpret_cast<const unsigned char*>(msg.manifest_.data()),
102  msg.manifest_.size()));
103 
104  if (!manifest.IsValid()) {
105  LogCvmfs(kLogCvmfs, kLogError,
106  "SwissknifeSubscriber - could not parse manifest.");
108  }
109 
110  uint64_t new_revision = manifest->revision();
111  bool triggered = false;
112  if (new_revision > revision_) {
113  LogCvmfs(kLogCvmfs, kLogInfo,
114  "SwissknifeSubscriber - repository %s is now at revision %lu.",
115  repo.c_str(), new_revision, revision_);
116  if (verbose_) {
117  LogCvmfs(kLogCvmfs, kLogInfo, "%s", msg_text.c_str());
118  }
119  revision_ = new_revision;
120  triggered = true;
121  }
122 
123  if (!continuous_ && triggered) {
125  }
126 
128  }
129 
130  std::string repository_;
131 
135 
136  uint64_t revision_;
138  bool verbose_;
139 };
140 
141 } // namespace
142 
143 namespace notify {
144 
145 int DoSubscribe(const std::string& server_url, const std::string& repo,
146  uint64_t min_revision, bool continuous, bool verbose) {
147  SwissknifeSubscriber subscriber(server_url, repo, min_revision, continuous,
148  verbose);
149 
150  if (!subscriber.Init()) {
151  LogCvmfs(kLogCvmfs, kLogError, "Could not initialize SwissknifeSubscriber");
152  return 1;
153  }
154 
155  // Retry settings: accept no more than 10 failures in the last minute
156  const int num_retries = 10;
157  const uint64_t interval = 60;
158  SubscriberSupervisor supervisor(&subscriber, repo, num_retries, interval);
159  supervisor.Run();
160 
161  return 0;
162 }
163 
164 } // namespace notify
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
std::string manifest_
Definition: messages.h:49
const manifest::Manifest * manifest() const
Definition: repository.h:123
static Manifest * LoadMem(const unsigned char *buffer, const unsigned length)
Definition: manifest.cc:68
bool GetValue(const std::string &key, std::string *value)
Definition: options.cc:376
string JoinStrings(const vector< string > &strings, const string &joint)
Definition: string.cc:318
UniquePtr< download::DownloadManager > dl_mgr_
Definition: cmd_sub.cc:133
UniquePtr< signature::SignatureManager > sig_mgr_
Definition: cmd_sub.cc:134
bool Run()
Definition: supervisor.cc:15
const LogFacilities & kLogError
Definition: cmd_pub.cc:23
const LogFacilities & kLogInfo
Definition: cmd_pub.cc:22
Failures Verify(char *manifest_data, size_t manifest_size, const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
virtual notify::Subscriber::Status Consume(const std::string &repo, const std::string &msg_text)
Definition: cmd_sub.cc:79
int DoSubscribe(const std::string &server_url, const std::string &repo, uint64_t min_revision, bool continuous, bool verbose)
Definition: cmd_sub.cc:145
bool TryParsePath(const std::string &config_file)
Definition: options.cc:110
bool IsValid() const
Definition: pointer.h:43
SwissknifeSubscriber(const std::string &server_url, const std::string &repository, uint64_t min_revision, bool continuous, bool verbose)
Definition: cmd_sub.cc:31
virtual bool FromJSONString(const std::string &s)
Definition: messages.cc:45
LogFacilities
static LogFacilities info
std::vector< std::string > FindFilesBySuffix(const std::string &dir, const std::string &suffix)
Definition: posix.cc:1149
static LogFacilities error