CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
notification_client.cc
Go to the documentation of this file.
1 
5 #ifndef __STDC_FORMAT_MACROS
6 // NOLINTNEXTLINE
7 #define __STDC_FORMAT_MACROS
8 #endif
9 
10 #include "notification_client.h"
11 
12 #include <inttypes.h>
13 
14 #include <string>
15 #include <vector>
16 
17 #include "crypto/signature.h"
18 #include "manifest.h"
19 #include "manifest_fetch.h"
20 #include "notify/messages.h"
21 #include "notify/subscriber_sse.h"
23 #include "supervisor.h"
24 #include "util/logging.h"
25 #include "util/posix.h"
26 
27 namespace {
28 
30  public:
31  ActivitySubscriber(const std::string &server_url, FuseRemounter *remounter,
34  : SubscriberSSE(server_url)
35  , remounter_(remounter)
36  , dl_mgr_(dl_mgr)
37  , sig_mgr_(sig_mgr) { }
38 
39  virtual ~ActivitySubscriber() { }
40 
41  virtual notify::Subscriber::Status Consume(const std::string &repo_name,
42  const std::string &msg_text) {
44  if (!msg.FromJSONString(msg_text)) {
46  "NotificationClient - could not decode message.");
48  }
49 
52  reinterpret_cast<unsigned char *>(&(msg.manifest_[0])),
53  msg.manifest_.size(), "", repo_name, 0, NULL, sig_mgr_, dl_mgr_,
54  &ensemble);
55 
56  if (res != manifest::kFailOk) {
58  "NotificationClient - manifest has invalid signature.");
60  }
61 
63  reinterpret_cast<const unsigned char *>(msg.manifest_.data()),
64  msg.manifest_.size()));
65 
66  if (!manifest.IsValid()) {
68  "NotificationClient - could not parse manifest.");
70  }
71 
72  uint64_t new_revision = manifest->revision();
74  "NotificationClient - repository %s is now at revision %" PRIu64
75  ", root hash: %s",
76  repo_name.c_str(), new_revision,
77  manifest->catalog_hash().ToString().c_str());
78 
79  FuseRemounter::Status status = remounter_->CheckSynchronously();
80  switch (status) {
82  LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - remount failed");
83  break;
86  "NotificationClient - remount failed (no space)");
87  break;
90  "NotificationClient - catalog up to date");
91  break;
94  "NotificationClient - in maintenance mode");
95  break;
96  default:
97  LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - internal error");
98  }
100  }
101 
102  private:
106 };
107 
108 } // namespace
109 
110 NotificationClient::NotificationClient(const std::string &config,
111  const std::string &repo_name,
112  FuseRemounter *remounter,
115  : config_(config)
116  , repo_name_(repo_name)
117  , remounter_(remounter)
118  , dl_mgr_(dl_mgr)
119  , sig_mgr_(sig_mgr)
120  , subscriber_()
121  , thread_()
122  , spawned_(false) { }
123 
125  if (subscriber_.IsValid()) {
126  subscriber_->Unsubscribe();
127  }
128  if (spawned_) {
129  pthread_join(thread_, NULL);
130  spawned_ = false;
131  }
132 }
133 
135  if (!spawned_) {
136  if (pthread_create(&thread_, NULL, NotificationClient::Run, this)) {
138  "NotificationClient - Could not start background thread");
139  }
140  spawned_ = true;
141  }
142 }
143 
144 void *NotificationClient::Run(void *data) {
145  NotificationClient *cl = static_cast<NotificationClient *>(data);
146 
147  cl->subscriber_ = new ActivitySubscriber(cl->config_, cl->remounter_,
148  cl->dl_mgr_, cl->sig_mgr_);
149 
150  LogCvmfs(
152  "NotificationClient - Entering subscription loop for repository: %s.",
153  cl->repo_name_.c_str());
154 
155  // Retry settings: accept no more than 10 failures in the last minute
156  const int num_retries = 10;
157  const uint64_t interval = 60;
158  notify::SubscriberSupervisor supervisor(
159  cl->subscriber_.weak_ref(), cl->repo_name_, num_retries, interval);
160  supervisor.Run();
161 
162  return NULL;
163 }
std::string manifest_
Definition: messages.h:51
const manifest::Manifest * manifest() const
Definition: repository.h:125
static Manifest * LoadMem(const unsigned char *buffer, const unsigned length)
Definition: manifest.cc:82
Failures Verify(unsigned char *manifest_data, size_t manifest_size, const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
T * weak_ref() const
Definition: pointer.h:46
NotificationClient(const std::string &config, const std::string &repo_name, FuseRemounter *remounter, download::DownloadManager *dl_mgr, signature::SignatureManager *sig_mgr)
bool Run()
Definition: supervisor.cc:15
static void * Run(void *instance)
signature::SignatureManager * sig_mgr_
virtual notify::Subscriber::Status Consume(const std::string &repo_name, const std::string &msg_text)
FuseRemounter * remounter_
bool IsValid() const
Definition: pointer.h:47
download::DownloadManager * dl_mgr_
virtual bool FromJSONString(const std::string &s)
Definition: messages.cc:44
UniquePtr< notify::Subscriber > subscriber_
ActivitySubscriber(const std::string &server_url, FuseRemounter *remounter, download::DownloadManager *dl_mgr, signature::SignatureManager *sig_mgr)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545