7 #include "cvmfs_config.h"
27 server_url_(server_url +
"/notifications/subscribe"),
30 should_quit_(false) {}
39 "SubscriberSSE - could not parse notification server url: %s\n",
46 std::string request =
"{\"version\":1,\"repository\":\"" + topic +
"\"}";
48 const char* user_agent_string =
"cvmfs/" VERSION;
50 CURL* h_curl = curl_easy_init();
57 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 0L);
58 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, user_agent_string);
59 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
60 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST,
"GET");
65 "SubscriberSSE - error initializing CURL context\n");
70 curl_easy_setopt(h_curl, CURLOPT_URL,
server_url_.c_str());
71 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
72 static_cast<curl_off_t>(request.length()));
73 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, request.c_str());
74 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION,
CurlRecvCB);
75 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA,
this);
76 curl_easy_setopt(h_curl, CURLOPT_XFERINFOFUNCTION,
CurlProgressCB);
77 curl_easy_setopt(h_curl, CURLOPT_XFERINFODATA,
this);
80 CURLcode ret = curl_easy_perform(h_curl);
81 if (ret && ret != CURLE_ABORTED_BY_CALLBACK) {
83 "SubscriberSSE - event loop finished with error: %d. Reply: %s\n",
88 curl_easy_cleanup(h_curl);
104 if (s.substr(0, 6) ==
"data: ") {
116 if (size * nmemb < 1) {
120 std::string buf(static_cast<char*>(buffer));
122 std::vector<std::string> lines =
SplitString(buf,
'\n');
124 if (lines.size() == 1) {
130 for (
size_t i = 1; i < lines.size(); ++i) {
131 if (lines[i].substr(0, 5) ==
"data: ") {
150 curl_off_t dlnow, curl_off_t ultotal,
155 "SubscriberSSE - quit request received. Stopping\n");
virtual void Unsubscribe()
const LogFacilities & kLogError
const LogFacilities & kLogInfo
static int CurlProgressCB(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
vector< string > SplitString(const string &str, char delim)
void AppendToBuffer(const std::string &s)
atomic_int32 should_quit_
virtual Status Consume(const std::string &topic, const std::string &msg_text)=0
static size_t CurlRecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
virtual bool Subscribe(const std::string &topic)
SubscriberSSE(const std::string &server_url)
static Url * Parse(const std::string &url, const std::string &default_protocol=kDefaultProtocol, int default_port=kDefaultPort)
static LogFacilities info
static LogFacilities error
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)