CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
notification_client.cc
Go to the documentation of this file.
1 
5 #ifndef __STDC_FORMAT_MACROS
6 // NOLINTNEXTLINE
7 #define __STDC_FORMAT_MACROS
8 #endif
9 
10 #include "notification_client.h"
11 
12 #include <inttypes.h>
13 
14 #include <string>
15 #include <vector>
16 
17 #include "crypto/signature.h"
18 #include "manifest.h"
19 #include "manifest_fetch.h"
20 #include "notify/messages.h"
21 #include "notify/subscriber_sse.h"
23 #include "supervisor.h"
24 #include "util/logging.h"
25 #include "util/posix.h"
26 
27 namespace {
28 
30  public:
31  ActivitySubscriber(const std::string& server_url, FuseRemounter* remounter,
34  : SubscriberSSE(server_url),
35  remounter_(remounter),
36  dl_mgr_(dl_mgr),
37  sig_mgr_(sig_mgr) {}
38 
39  virtual ~ActivitySubscriber() {}
40 
41  virtual notify::Subscriber::Status Consume(const std::string& repo_name,
42  const std::string& msg_text) {
44  if (!msg.FromJSONString(msg_text)) {
46  "NotificationClient - could not decode message.");
48  }
49 
51  manifest::Failures res =
52  manifest::Verify(reinterpret_cast<unsigned char*>(&(msg.manifest_[0])),
53  msg.manifest_.size(), "",
54  repo_name, 0, NULL, sig_mgr_, dl_mgr_, &ensemble);
55 
56  if (res != manifest::kFailOk) {
58  "NotificationClient - manifest has invalid signature.");
60  }
61 
63  reinterpret_cast<const unsigned char*>(msg.manifest_.data()),
64  msg.manifest_.size()));
65 
66  if (!manifest.IsValid()) {
68  "NotificationClient - could not parse manifest.");
70  }
71 
72  uint64_t new_revision = manifest->revision();
74  "NotificationClient - repository %s is now at revision %" PRIu64
75  ", root hash: %s", repo_name.c_str(), new_revision,
76  manifest->catalog_hash().ToString().c_str());
77 
78  FuseRemounter::Status status = remounter_->CheckSynchronously();
79  switch (status) {
81  LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - remount failed");
82  break;
85  "NotificationClient - remount failed (no space)");
86  break;
89  "NotificationClient - catalog up to date");
90  break;
93  "NotificationClient - in maintenance mode");
94  break;
95  default:
96  LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - internal error");
97  }
99  }
100 
101  private:
105 };
106 
107 } // namespace
108 
109 NotificationClient::NotificationClient(const std::string& config,
110  const std::string& repo_name,
111  FuseRemounter* remounter,
114  : config_(config),
115  repo_name_(repo_name),
116  remounter_(remounter),
117  dl_mgr_(dl_mgr),
118  sig_mgr_(sig_mgr),
119  subscriber_(),
120  thread_(),
121  spawned_(false) {}
122 
124  if (subscriber_.IsValid()) {
125  subscriber_->Unsubscribe();
126  }
127  if (spawned_) {
128  pthread_join(thread_, NULL);
129  spawned_ = false;
130  }
131 }
132 
134  if (!spawned_) {
135  if (pthread_create(&thread_, NULL, NotificationClient::Run, this)) {
137  "NotificationClient - Could not start background thread");
138  }
139  spawned_ = true;
140  }
141 }
142 
143 void* NotificationClient::Run(void* data) {
144  NotificationClient* cl = static_cast<NotificationClient*>(data);
145 
146  cl->subscriber_ = new ActivitySubscriber(cl->config_, cl->remounter_,
147  cl->dl_mgr_, cl->sig_mgr_);
148 
149  LogCvmfs(
151  "NotificationClient - Entering subscription loop for repository: %s.",
152  cl->repo_name_.c_str());
153 
154  // Retry settings: accept no more than 10 failures in the last minute
155  const int num_retries = 10;
156  const uint64_t interval = 60;
157  notify::SubscriberSupervisor supervisor(
158  cl->subscriber_.weak_ref(), cl->repo_name_, num_retries, interval);
159  supervisor.Run();
160 
161  return NULL;
162 }
std::string manifest_
Definition: messages.h:49
const manifest::Manifest * manifest() const
Definition: repository.h:125
static Manifest * LoadMem(const unsigned char *buffer, const unsigned length)
Definition: manifest.cc:82
Failures Verify(unsigned char *manifest_data, size_t manifest_size, const std::string &base_url, const std::string &repository_name, const uint64_t minimum_timestamp, const shash::Any *base_catalog, signature::SignatureManager *signature_manager, download::DownloadManager *download_manager, ManifestEnsemble *ensemble)
T * weak_ref() const
Definition: pointer.h:42
NotificationClient(const std::string &config, const std::string &repo_name, FuseRemounter *remounter, download::DownloadManager *dl_mgr, signature::SignatureManager *sig_mgr)
bool Run()
Definition: supervisor.cc:15
static void * Run(void *instance)
signature::SignatureManager * sig_mgr_
virtual notify::Subscriber::Status Consume(const std::string &repo_name, const std::string &msg_text)
FuseRemounter * remounter_
bool IsValid() const
Definition: pointer.h:43
download::DownloadManager * dl_mgr_
virtual bool FromJSONString(const std::string &s)
Definition: messages.cc:45
UniquePtr< notify::Subscriber > subscriber_
ActivitySubscriber(const std::string &server_url, FuseRemounter *remounter, download::DownloadManager *dl_mgr, signature::SignatureManager *sig_mgr)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528