CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
subscriber_sse.cc
Go to the documentation of this file.
1 
5 #include "subscriber_sse.h"
6 
7 #include "cvmfs_config.h"
8 
9 #include <vector>
10 
11 #include "url.h"
12 #include "util/logging.h"
13 #include "util/pointer.h"
14 #include "util/string.h"
15 
16 namespace {
17 
20 
21 } // namespace
22 
23 namespace notify {
24 
25 SubscriberSSE::SubscriberSSE(const std::string& server_url)
26  : Subscriber(),
27  server_url_(server_url + "/notifications/subscribe"),
28  topic_(),
29  buffer_(),
30  should_quit_(false) {}
31 
33 
34 bool SubscriberSSE::Subscribe(const std::string& topic) {
36 
37  if (!url.IsValid()) {
39  "SubscriberSSE - could not parse notification server url: %s\n",
40  server_url_.c_str());
41  return false;
42  }
43 
44  this->topic_ = topic;
45 
46  std::string request = "{\"version\":1,\"repository\":\"" + topic + "\"}";
47 
48  const char* user_agent_string = "cvmfs/" VERSION;
49 
50  CURL* h_curl = curl_easy_init();
51  if (h_curl == NULL) {
52  LogCvmfs(kLogCvmfs, kLogError, "Could not create Curl handle\n");
53  return false;
54  }
55 
56  if (h_curl) {
57  curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 0L);
58  curl_easy_setopt(h_curl, CURLOPT_USERAGENT, user_agent_string);
59  curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
60  curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "GET");
61  }
62 
63  if (!h_curl) {
65  "SubscriberSSE - error initializing CURL context\n");
66  return false;
67  }
68 
69  // Make request to acquire lease from repo services
70  curl_easy_setopt(h_curl, CURLOPT_URL, server_url_.c_str());
71  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
72  static_cast<curl_off_t>(request.length()));
73  curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, request.c_str());
74  curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, CurlRecvCB);
75  curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, this);
76  curl_easy_setopt(h_curl, CURLOPT_XFERINFOFUNCTION, CurlProgressCB);
77  curl_easy_setopt(h_curl, CURLOPT_XFERINFODATA, this);
78 
79  bool success = true;
80  CURLcode ret = curl_easy_perform(h_curl);
81  if (ret && ret != CURLE_ABORTED_BY_CALLBACK) {
83  "SubscriberSSE - event loop finished with error: %d. Reply: %s\n",
84  ret, this->buffer_.c_str());
85  success = false;
86  }
87 
88  curl_easy_cleanup(h_curl);
89  h_curl = NULL;
90 
91  return success;
92 }
93 
95  atomic_write32(&should_quit_, 1);
96 }
97 
99  return atomic_read32(&should_quit_);
100 }
101 
102 void SubscriberSSE::AppendToBuffer(const std::string& s) {
103  size_t start = 0;
104  if (s.substr(0, 6) == "data: ") {
105  start = 6;
106  }
107  buffer_ += s.substr(start);
108 }
109 
111 
112 size_t SubscriberSSE::CurlRecvCB(void* buffer, size_t size, size_t nmemb,
113  void* userp) {
114  notify::SubscriberSSE* sub = static_cast<notify::SubscriberSSE*>(userp);
115 
116  if (size * nmemb < 1) {
117  return 0;
118  }
119 
120  std::string buf(static_cast<char*>(buffer));
121 
122  std::vector<std::string> lines = SplitString(buf, '\n');
123 
124  if (lines.size() == 1) {
125  sub->AppendToBuffer(lines[0]);
126  } else {
127  sub->AppendToBuffer(lines[0]);
128  notify::Subscriber::Status st = sub->Consume(sub->topic_, sub->buffer_);
129  sub->ClearBuffer();
130  for (size_t i = 1; i < lines.size(); ++i) {
131  if (lines[i].substr(0, 5) == "data: ") {
132  sub->AppendToBuffer(lines[i]);
133  }
134  }
135  switch (st) {
137  sub->Unsubscribe();
138  break;
140  return 0;
141  default:
142  break;
143  }
144  }
145 
146  return size * nmemb;
147 }
148 
149 int SubscriberSSE::CurlProgressCB(void* clientp, curl_off_t dltotal,
150  curl_off_t dlnow, curl_off_t ultotal,
151  curl_off_t ulnow) {
152  notify::SubscriberSSE* sub = static_cast<notify::SubscriberSSE*>(clientp);
153  if (sub->ShouldQuit()) {
155  "SubscriberSSE - quit request received. Stopping\n");
156  return 1;
157  }
158 
159  return 0;
160 }
161 
162 } // namespace notify
virtual void Unsubscribe()
const LogFacilities & kLogError
Definition: cmd_pub.cc:23
const LogFacilities & kLogInfo
Definition: cmd_pub.cc:22
static int CurlProgressCB(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
vector< string > SplitString(const string &str, char delim)
Definition: string.cc:290
void AppendToBuffer(const std::string &s)
atomic_int32 should_quit_
bool ShouldQuit() const
virtual Status Consume(const std::string &topic, const std::string &msg_text)=0
static size_t CurlRecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
virtual bool Subscribe(const std::string &topic)
SubscriberSSE(const std::string &server_url)
LogFacilities
static Url * Parse(const std::string &url, const std::string &default_protocol=kDefaultProtocol, int default_port=kDefaultPort)
Definition: url.cc:14
static LogFacilities info
static void size_t size
Definition: smalloc.h:54
static LogFacilities error
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528