| Line | Branch | Exec | Source | 
    
      | 1 |  |  | /** | 
    
      | 2 |  |  | * This file is part of the CernVM File System. | 
    
      | 3 |  |  | */ | 
    
      | 4 |  |  |  | 
    
      | 5 |  |  | #ifndef __STDC_FORMAT_MACROS | 
    
      | 6 |  |  | // NOLINTNEXTLINE | 
    
      | 7 |  |  | #define __STDC_FORMAT_MACROS | 
    
      | 8 |  |  | #endif | 
    
      | 9 |  |  |  | 
    
      | 10 |  |  | #include "notification_client.h" | 
    
      | 11 |  |  |  | 
    
      | 12 |  |  | #include <inttypes.h> | 
    
      | 13 |  |  |  | 
    
      | 14 |  |  | #include <string> | 
    
      | 15 |  |  | #include <vector> | 
    
      | 16 |  |  |  | 
    
      | 17 |  |  | #include "crypto/signature.h" | 
    
      | 18 |  |  | #include "manifest.h" | 
    
      | 19 |  |  | #include "manifest_fetch.h" | 
    
      | 20 |  |  | #include "notify/messages.h" | 
    
      | 21 |  |  | #include "notify/subscriber_sse.h" | 
    
      | 22 |  |  | #include "notify/subscriber_supervisor.h" | 
    
      | 23 |  |  | #include "supervisor.h" | 
    
      | 24 |  |  | #include "util/logging.h" | 
    
      | 25 |  |  | #include "util/posix.h" | 
    
      | 26 |  |  |  | 
    
      | 27 |  |  | namespace { | 
    
      | 28 |  |  |  | 
    
      | 29 |  |  | class ActivitySubscriber : public notify::SubscriberSSE { | 
    
      | 30 |  |  | public: | 
    
      | 31 |  | ✗ | ActivitySubscriber(const std::string &server_url, FuseRemounter *remounter, | 
    
      | 32 |  |  | download::DownloadManager *dl_mgr, | 
    
      | 33 |  |  | signature::SignatureManager *sig_mgr) | 
    
      | 34 |  | ✗ | : SubscriberSSE(server_url) | 
    
      | 35 |  | ✗ | , remounter_(remounter) | 
    
      | 36 |  | ✗ | , dl_mgr_(dl_mgr) | 
    
      | 37 |  | ✗ | , sig_mgr_(sig_mgr) { } | 
    
      | 38 |  |  |  | 
    
      | 39 |  | ✗ | virtual ~ActivitySubscriber() { } | 
    
      | 40 |  |  |  | 
    
      | 41 |  | ✗ | virtual notify::Subscriber::Status Consume(const std::string &repo_name, | 
    
      | 42 |  |  | const std::string &msg_text) { | 
    
      | 43 |  | ✗ | notify::msg::Activity msg; | 
    
      | 44 |  | ✗ | if (!msg.FromJSONString(msg_text)) { | 
    
      | 45 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslogErr, | 
    
      | 46 |  |  | "NotificationClient - could not decode message."); | 
    
      | 47 |  | ✗ | return notify::Subscriber::kError; | 
    
      | 48 |  |  | } | 
    
      | 49 |  |  |  | 
    
      | 50 |  | ✗ | manifest::ManifestEnsemble ensemble; | 
    
      | 51 |  | ✗ | const manifest::Failures res = manifest::Verify( | 
    
      | 52 |  | ✗ | reinterpret_cast<unsigned char *>(&(msg.manifest_[0])), | 
    
      | 53 |  |  | msg.manifest_.size(), "", repo_name, 0, NULL, sig_mgr_, dl_mgr_, | 
    
      | 54 |  |  | &ensemble); | 
    
      | 55 |  |  |  | 
    
      | 56 |  | ✗ | if (res != manifest::kFailOk) { | 
    
      | 57 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslogErr, | 
    
      | 58 |  |  | "NotificationClient - manifest has invalid signature."); | 
    
      | 59 |  | ✗ | return notify::Subscriber::kError; | 
    
      | 60 |  |  | } | 
    
      | 61 |  |  |  | 
    
      | 62 |  |  | const UniquePtr<manifest::Manifest> manifest(manifest::Manifest::LoadMem( | 
    
      | 63 |  | ✗ | reinterpret_cast<const unsigned char *>(msg.manifest_.data()), | 
    
      | 64 |  | ✗ | msg.manifest_.size())); | 
    
      | 65 |  |  |  | 
    
      | 66 |  | ✗ | if (!manifest.IsValid()) { | 
    
      | 67 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslogErr, | 
    
      | 68 |  |  | "NotificationClient - could not parse manifest."); | 
    
      | 69 |  | ✗ | return notify::Subscriber::kError; | 
    
      | 70 |  |  | } | 
    
      | 71 |  |  |  | 
    
      | 72 |  | ✗ | const uint64_t new_revision = manifest->revision(); | 
    
      | 73 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslog, | 
    
      | 74 |  |  | "NotificationClient - repository %s is now at revision %" PRIu64 | 
    
      | 75 |  |  | ", root hash: %s", | 
    
      | 76 |  |  | repo_name.c_str(), new_revision, | 
    
      | 77 |  | ✗ | manifest->catalog_hash().ToString().c_str()); | 
    
      | 78 |  |  |  | 
    
      | 79 |  | ✗ | const FuseRemounter::Status status = remounter_->CheckSynchronously(); | 
    
      | 80 |  | ✗ | switch (status) { | 
    
      | 81 |  | ✗ | case FuseRemounter::kStatusFailGeneral: | 
    
      | 82 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - remount failed"); | 
    
      | 83 |  | ✗ | break; | 
    
      | 84 |  | ✗ | case FuseRemounter::kStatusFailNoSpace: | 
    
      | 85 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslog, | 
    
      | 86 |  |  | "NotificationClient - remount failed (no space)"); | 
    
      | 87 |  | ✗ | break; | 
    
      | 88 |  | ✗ | case FuseRemounter::kStatusUp2Date: | 
    
      | 89 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslog, | 
    
      | 90 |  |  | "NotificationClient - catalog up to date"); | 
    
      | 91 |  | ✗ | break; | 
    
      | 92 |  | ✗ | case FuseRemounter::kStatusMaintenance: | 
    
      | 93 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslog, | 
    
      | 94 |  |  | "NotificationClient - in maintenance mode"); | 
    
      | 95 |  | ✗ | break; | 
    
      | 96 |  | ✗ | default: | 
    
      | 97 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslog, "NotificationClient - internal error"); | 
    
      | 98 |  |  | } | 
    
      | 99 |  | ✗ | return notify::Subscriber::kContinue; | 
    
      | 100 |  |  | } | 
    
      | 101 |  |  |  | 
    
      | 102 |  |  | private: | 
    
      | 103 |  |  | FuseRemounter *remounter_; | 
    
      | 104 |  |  | download::DownloadManager *dl_mgr_; | 
    
      | 105 |  |  | signature::SignatureManager *sig_mgr_; | 
    
      | 106 |  |  | }; | 
    
      | 107 |  |  |  | 
    
      | 108 |  |  | }  // namespace | 
    
      | 109 |  |  |  | 
    
      | 110 |  | ✗ | NotificationClient::NotificationClient(const std::string &config, | 
    
      | 111 |  |  | const std::string &repo_name, | 
    
      | 112 |  |  | FuseRemounter *remounter, | 
    
      | 113 |  |  | download::DownloadManager *dl_mgr, | 
    
      | 114 |  | ✗ | signature::SignatureManager *sig_mgr) | 
    
      | 115 |  | ✗ | : config_(config) | 
    
      | 116 |  | ✗ | , repo_name_(repo_name) | 
    
      | 117 |  | ✗ | , remounter_(remounter) | 
    
      | 118 |  | ✗ | , dl_mgr_(dl_mgr) | 
    
      | 119 |  | ✗ | , sig_mgr_(sig_mgr) | 
    
      | 120 |  | ✗ | , subscriber_() | 
    
      | 121 |  | ✗ | , thread_() | 
    
      | 122 |  | ✗ | , spawned_(false) { } | 
    
      | 123 |  |  |  | 
    
      | 124 |  | ✗ | NotificationClient::~NotificationClient() { | 
    
      | 125 |  | ✗ | if (subscriber_.IsValid()) { | 
    
      | 126 |  | ✗ | subscriber_->Unsubscribe(); | 
    
      | 127 |  |  | } | 
    
      | 128 |  | ✗ | if (spawned_) { | 
    
      | 129 |  | ✗ | pthread_join(thread_, NULL); | 
    
      | 130 |  | ✗ | spawned_ = false; | 
    
      | 131 |  |  | } | 
    
      | 132 |  |  | } | 
    
      | 133 |  |  |  | 
    
      | 134 |  | ✗ | void NotificationClient::Spawn() { | 
    
      | 135 |  | ✗ | if (!spawned_) { | 
    
      | 136 |  | ✗ | if (pthread_create(&thread_, NULL, NotificationClient::Run, this)) { | 
    
      | 137 |  | ✗ | LogCvmfs(kLogCvmfs, kLogSyslogErr, | 
    
      | 138 |  |  | "NotificationClient - Could not start background thread"); | 
    
      | 139 |  |  | } | 
    
      | 140 |  | ✗ | spawned_ = true; | 
    
      | 141 |  |  | } | 
    
      | 142 |  |  | } | 
    
      | 143 |  |  |  | 
    
      | 144 |  | ✗ | void *NotificationClient::Run(void *data) { | 
    
      | 145 |  | ✗ | NotificationClient *cl = static_cast<NotificationClient *>(data); | 
    
      | 146 |  |  |  | 
    
      | 147 |  | ✗ | cl->subscriber_ = new ActivitySubscriber(cl->config_, cl->remounter_, | 
    
      | 148 |  | ✗ | cl->dl_mgr_, cl->sig_mgr_); | 
    
      | 149 |  |  |  | 
    
      | 150 |  | ✗ | LogCvmfs( | 
    
      | 151 |  |  | kLogCvmfs, kLogSyslog, | 
    
      | 152 |  |  | "NotificationClient - Entering subscription loop for repository: %s.", | 
    
      | 153 |  |  | cl->repo_name_.c_str()); | 
    
      | 154 |  |  |  | 
    
      | 155 |  |  | // Retry settings: accept no more than 10 failures in the last minute | 
    
      | 156 |  | ✗ | const int num_retries = 10; | 
    
      | 157 |  | ✗ | const uint64_t interval = 60; | 
    
      | 158 |  |  | notify::SubscriberSupervisor supervisor( | 
    
      | 159 |  | ✗ | cl->subscriber_.weak_ref(), cl->repo_name_, num_retries, interval); | 
    
      | 160 |  | ✗ | supervisor.Run(); | 
    
      | 161 |  |  |  | 
    
      | 162 |  | ✗ | return NULL; | 
    
      | 163 |  |  | } | 
    
      | 164 |  |  |  |