CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
notification_client.cc
Go to the documentation of this file.
1 
5 #include "notification_client.h"
6 
7 #include <string>
8 #include <vector>
9 
10 #include "logging.h"
11 #include "manifest.h"
12 #include "manifest_fetch.h"
13 #include "notify/messages.h"
14 #include "notify/subscriber_sse.h"
16 #include "signature.h"
17 #include "supervisor.h"
18 #include "util/posix.h"
19 
20 namespace {
21 
23  public:
24  ActivitySubscriber(const std::string& server_url, FuseRemounter* remounter,
27  : SubscriberSSE(server_url),
28  remounter_(remounter),
29  dl_mgr_(dl_mgr),
30  sig_mgr_(sig_mgr) {}
31 
32  virtual ~ActivitySubscriber() {}
33 
34  virtual notify::Subscriber::Status Consume(const std::string& repo_name,
35  const std::string& msg_text) {
37  if (!msg.FromJSONString(msg_text)) {
39  "NotificationClient - could not decode message.");
41  }
42 
44  manifest::Failures res =
45  manifest::Verify(&(msg.manifest_[0]), msg.manifest_.size(), "",
46  repo_name, 0, NULL, sig_mgr_, dl_mgr_, &ensemble);
47 
48  if (res != manifest::kFailOk) {
50  "NotificationClient - manifest has invalid signature.");
52  }
53 
55  reinterpret_cast<const unsigned char*>(msg.manifest_.data()),
56  msg.manifest_.size()));
57 
58  if (!manifest.IsValid()) {
60  "NotificationClient - could not parse manifest.");
62  }
63 
64  uint64_t new_revision = manifest->revision();
66  "NotificationClient - repository %s is now at revision %lu, root "
67  "hash: %s",
68  repo_name.c_str(), new_revision,
69  manifest->catalog_hash().ToString().c_str());
70 
71  FuseRemounter::Status status = remounter_->CheckSynchronously();
72  switch (status) {
74  LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - remount failed");
75  break;
78  "NotificationClient - remount failed (no space)");
79  break;
82  "NotificationClient - catalog up to date");
83  break;
86  "NotificationClient - in maintenance mode");
87  break;
88  default:
89  LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - internal error");
90  }
92  }
93 
94  private:
98 };
99 
100 } // namespace
101 
102 NotificationClient::NotificationClient(const std::string& config,
103  const std::string& repo_name,
104  FuseRemounter* remounter,
107  : config_(config),
108  repo_name_(repo_name),
109  remounter_(remounter),
110  dl_mgr_(dl_mgr),
111  sig_mgr_(sig_mgr),
112  subscriber_(),
113  thread_(),
114  spawned_(false) {}
115 
117  if (subscriber_.IsValid()) {
118  subscriber_->Unsubscribe();
119  }
120  if (spawned_) {
121  pthread_join(thread_, NULL);
122  spawned_ = false;
123  }
124 }
125 
127  if (!spawned_) {
128  if (pthread_create(&thread_, NULL, NotificationClient::Run, this)) {
130  "NotificationClient - Could not start background thread");
131  }
132  spawned_ = true;
133  }
134 }
135 
136 void* NotificationClient::Run(void* data) {
137  NotificationClient* cl = static_cast<NotificationClient*>(data);
138 
139  cl->subscriber_ = new ActivitySubscriber(cl->config_, cl->remounter_,
140  cl->dl_mgr_, cl->sig_mgr_);
141 
142  LogCvmfs(
144  "NotificationClient - Entering subscription loop for repository: %s.",
145  cl->repo_name_.c_str());
146 
147  // Retry settings: accept no more than 10 failures in the last minute
148  const int num_retries = 10;
149  const uint64_t interval = 60;
150  notify::SubscriberSupervisor supervisor(
151  cl->subscriber_.weak_ref(), cl->repo_name_, num_retries, interval);
152  supervisor.Run();
153 
154  return NULL;
155 }
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
std::string manifest_
Definition: messages.h:49
const manifest::Manifest * manifest() const
Definition: repository.h:123
static Manifest * LoadMem(const unsigned char *buffer, const unsigned length)
Definition: manifest.cc:68
T * weak_ref() const
Definition: pointer.h:42
NotificationClient(const std::string &config, const std::string &repo_name, FuseRemounter *remounter, download::DownloadManager *dl_mgr, signature::SignatureManager *sig_mgr)
bool Run()
Definition: supervisor.cc:15
static void * Run(void *instance)
Failures Verify(char *manifest_data, size_t manifest_size, const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
signature::SignatureManager * sig_mgr_
virtual notify::Subscriber::Status Consume(const std::string &repo_name, const std::string &msg_text)
FuseRemounter * remounter_
bool IsValid() const
Definition: pointer.h:43
download::DownloadManager * dl_mgr_
virtual bool FromJSONString(const std::string &s)
Definition: messages.cc:45
UniquePtr< notify::Subscriber > subscriber_
ActivitySubscriber(const std::string &server_url, FuseRemounter *remounter, download::DownloadManager *dl_mgr, signature::SignatureManager *sig_mgr)