CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
subscriber_sse.cc
Go to the documentation of this file.
1 
5 #include "subscriber_sse.h"
6 
7 #include <vector>
8 
9 #include "url.h"
10 #include "util/logging.h"
11 #include "util/pointer.h"
12 #include "util/string.h"
13 
14 namespace {
15 
18 
19 } // namespace
20 
21 namespace notify {
22 
23 SubscriberSSE::SubscriberSSE(const std::string &server_url)
24  : Subscriber()
25  , server_url_(server_url + "/notifications/subscribe")
26  , topic_()
27  , buffer_()
28  , should_quit_(false) { }
29 
31 
32 bool SubscriberSSE::Subscribe(const std::string &topic) {
34 
35  if (!url.IsValid()) {
37  "SubscriberSSE - could not parse notification server url: %s\n",
38  server_url_.c_str());
39  return false;
40  }
41 
42  this->topic_ = topic;
43 
44  const std::string request =
45  "{\"version\":1,\"repository\":\"" + topic + "\"}";
46 
47  const char *user_agent_string = "cvmfs/" CVMFS_VERSION;
48 
49  CURL *h_curl = curl_easy_init();
50  if (h_curl == NULL) {
51  LogCvmfs(kLogCvmfs, kLogError, "Could not create Curl handle\n");
52  return false;
53  }
54 
55  if (h_curl) {
56  curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 0L);
57  curl_easy_setopt(h_curl, CURLOPT_USERAGENT, user_agent_string);
58  curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
59  curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "GET");
60  }
61 
62  if (!h_curl) {
64  "SubscriberSSE - error initializing CURL context\n");
65  return false;
66  }
67 
68  // Make request to acquire lease from repo services
69  curl_easy_setopt(h_curl, CURLOPT_URL, server_url_.c_str());
70  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
71  static_cast<curl_off_t>(request.length()));
72  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, request.c_str());
73  curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, CurlRecvCB);
74  curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, this);
75  curl_easy_setopt(h_curl, CURLOPT_XFERINFOFUNCTION, CurlProgressCB);
76  curl_easy_setopt(h_curl, CURLOPT_XFERINFODATA, this);
77 
78  bool success = true;
79  const CURLcode ret = curl_easy_perform(h_curl);
80  if (ret && ret != CURLE_ABORTED_BY_CALLBACK) {
82  "SubscriberSSE - event loop finished with error: %d. Reply: %s\n",
83  ret, this->buffer_.c_str());
84  success = false;
85  }
86 
87  curl_easy_cleanup(h_curl);
88  h_curl = NULL;
89 
90  return success;
91 }
92 
93 void SubscriberSSE::Unsubscribe() { atomic_write32(&should_quit_, 1); }
94 
95 bool SubscriberSSE::ShouldQuit() const { return atomic_read32(&should_quit_); }
96 
97 void SubscriberSSE::AppendToBuffer(const std::string &s) {
98  size_t start = 0;
99  if (s.substr(0, 6) == "data: ") {
100  start = 6;
101  }
102  buffer_ += s.substr(start);
103 }
104 
106 
107 size_t SubscriberSSE::CurlRecvCB(void *buffer, size_t size, size_t nmemb,
108  void *userp) {
109  notify::SubscriberSSE *sub = static_cast<notify::SubscriberSSE *>(userp);
110 
111  if (size * nmemb < 1) {
112  return 0;
113  }
114 
115  const std::string buf(static_cast<char *>(buffer));
116 
117  std::vector<std::string> lines = SplitString(buf, '\n');
118 
119  if (lines.size() == 1) {
120  sub->AppendToBuffer(lines[0]);
121  } else {
122  sub->AppendToBuffer(lines[0]);
123  const notify::Subscriber::Status st =
124  sub->Consume(sub->topic_, sub->buffer_);
125  sub->ClearBuffer();
126  for (size_t i = 1; i < lines.size(); ++i) {
127  if (lines[i].substr(0, 5) == "data: ") {
128  sub->AppendToBuffer(lines[i]);
129  }
130  }
131  switch (st) {
133  sub->Unsubscribe();
134  break;
136  return 0;
137  default:
138  break;
139  }
140  }
141 
142  return size * nmemb;
143 }
144 
145 int SubscriberSSE::CurlProgressCB(void *clientp, curl_off_t dltotal,
146  curl_off_t dlnow, curl_off_t ultotal,
147  curl_off_t ulnow) {
148  notify::SubscriberSSE *sub = static_cast<notify::SubscriberSSE *>(clientp);
149  if (sub->ShouldQuit()) {
151  "SubscriberSSE - quit request received. Stopping\n");
152  return 1;
153  }
154 
155  return 0;
156 }
157 
158 } // namespace notify
virtual void Unsubscribe()
const LogFacilities & kLogError
Definition: cmd_pub.cc:23
const LogFacilities & kLogInfo
Definition: cmd_pub.cc:22
static int CurlProgressCB(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
void AppendToBuffer(const std::string &s)
atomic_int32 should_quit_
bool ShouldQuit() const
virtual Status Consume(const std::string &topic, const std::string &msg_text)=0
static size_t CurlRecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
virtual bool Subscribe(const std::string &topic)
SubscriberSSE(const std::string &server_url)
LogFacilities
static Url * Parse(const std::string &url, const std::string &default_protocol=kDefaultProtocol, int default_port=kDefaultPort)
Definition: url.cc:14
static LogFacilities info
static void size_t size
Definition: smalloc.h:54
static LogFacilities error
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545