CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
subscriber_sse.cc
Go to the documentation of this file.
1 
5 #include "subscriber_sse.h"
6 
7 #include <vector>
8 
9 #include "url.h"
10 #include "util/logging.h"
11 #include "util/pointer.h"
12 #include "util/string.h"
13 
14 namespace {
15 
18 
19 } // namespace
20 
21 namespace notify {
22 
23 SubscriberSSE::SubscriberSSE(const std::string &server_url)
24  : Subscriber()
25  , server_url_(server_url + "/notifications/subscribe")
26  , topic_()
27  , buffer_()
28  , should_quit_(false) { }
29 
31 
32 bool SubscriberSSE::Subscribe(const std::string &topic) {
34 
35  if (!url.IsValid()) {
37  "SubscriberSSE - could not parse notification server url: %s\n",
38  server_url_.c_str());
39  return false;
40  }
41 
42  this->topic_ = topic;
43 
44  std::string request = "{\"version\":1,\"repository\":\"" + topic + "\"}";
45 
46  const char *user_agent_string = "cvmfs/" CVMFS_VERSION;
47 
48  CURL *h_curl = curl_easy_init();
49  if (h_curl == NULL) {
50  LogCvmfs(kLogCvmfs, kLogError, "Could not create Curl handle\n");
51  return false;
52  }
53 
54  if (h_curl) {
55  curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 0L);
56  curl_easy_setopt(h_curl, CURLOPT_USERAGENT, user_agent_string);
57  curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
58  curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "GET");
59  }
60 
61  if (!h_curl) {
63  "SubscriberSSE - error initializing CURL context\n");
64  return false;
65  }
66 
67  // Make request to acquire lease from repo services
68  curl_easy_setopt(h_curl, CURLOPT_URL, server_url_.c_str());
69  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
70  static_cast<curl_off_t>(request.length()));
71  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, request.c_str());
72  curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, CurlRecvCB);
73  curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, this);
74  curl_easy_setopt(h_curl, CURLOPT_XFERINFOFUNCTION, CurlProgressCB);
75  curl_easy_setopt(h_curl, CURLOPT_XFERINFODATA, this);
76 
77  bool success = true;
78  CURLcode ret = curl_easy_perform(h_curl);
79  if (ret && ret != CURLE_ABORTED_BY_CALLBACK) {
81  "SubscriberSSE - event loop finished with error: %d. Reply: %s\n",
82  ret, this->buffer_.c_str());
83  success = false;
84  }
85 
86  curl_easy_cleanup(h_curl);
87  h_curl = NULL;
88 
89  return success;
90 }
91 
92 void SubscriberSSE::Unsubscribe() { atomic_write32(&should_quit_, 1); }
93 
94 bool SubscriberSSE::ShouldQuit() const { return atomic_read32(&should_quit_); }
95 
96 void SubscriberSSE::AppendToBuffer(const std::string &s) {
97  size_t start = 0;
98  if (s.substr(0, 6) == "data: ") {
99  start = 6;
100  }
101  buffer_ += s.substr(start);
102 }
103 
105 
106 size_t SubscriberSSE::CurlRecvCB(void *buffer, size_t size, size_t nmemb,
107  void *userp) {
108  notify::SubscriberSSE *sub = static_cast<notify::SubscriberSSE *>(userp);
109 
110  if (size * nmemb < 1) {
111  return 0;
112  }
113 
114  std::string buf(static_cast<char *>(buffer));
115 
116  std::vector<std::string> lines = SplitString(buf, '\n');
117 
118  if (lines.size() == 1) {
119  sub->AppendToBuffer(lines[0]);
120  } else {
121  sub->AppendToBuffer(lines[0]);
122  notify::Subscriber::Status st = sub->Consume(sub->topic_, sub->buffer_);
123  sub->ClearBuffer();
124  for (size_t i = 1; i < lines.size(); ++i) {
125  if (lines[i].substr(0, 5) == "data: ") {
126  sub->AppendToBuffer(lines[i]);
127  }
128  }
129  switch (st) {
131  sub->Unsubscribe();
132  break;
134  return 0;
135  default:
136  break;
137  }
138  }
139 
140  return size * nmemb;
141 }
142 
143 int SubscriberSSE::CurlProgressCB(void *clientp, curl_off_t dltotal,
144  curl_off_t dlnow, curl_off_t ultotal,
145  curl_off_t ulnow) {
146  notify::SubscriberSSE *sub = static_cast<notify::SubscriberSSE *>(clientp);
147  if (sub->ShouldQuit()) {
149  "SubscriberSSE - quit request received. Stopping\n");
150  return 1;
151  }
152 
153  return 0;
154 }
155 
156 } // namespace notify
virtual void Unsubscribe()
const LogFacilities & kLogError
Definition: cmd_pub.cc:23
const LogFacilities & kLogInfo
Definition: cmd_pub.cc:22
static int CurlProgressCB(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:306
void AppendToBuffer(const std::string &s)
atomic_int32 should_quit_
bool ShouldQuit() const
virtual Status Consume(const std::string &topic, const std::string &msg_text)=0
static size_t CurlRecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
virtual bool Subscribe(const std::string &topic)
SubscriberSSE(const std::string &server_url)
LogFacilities
static Url * Parse(const std::string &url, const std::string &default_protocol=kDefaultProtocol, int default_port=kDefaultPort)
Definition: url.cc:14
static LogFacilities info
static void size_t size
Definition: smalloc.h:54
static LogFacilities error
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545