25 , server_url_(server_url +
"/notifications/subscribe")
28 , should_quit_(false) { }
37 "SubscriberSSE - could not parse notification server url: %s\n",
44 const std::string request =
45 "{\"version\":1,\"repository\":\"" + topic +
"\"}";
47 const char *user_agent_string =
"cvmfs/" CVMFS_VERSION;
49 CURL *h_curl = curl_easy_init();
56 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 0L);
57 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, user_agent_string);
58 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
59 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST,
"GET");
64 "SubscriberSSE - error initializing CURL context\n");
69 curl_easy_setopt(h_curl, CURLOPT_URL,
server_url_.c_str());
70 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
71 static_cast<curl_off_t>(request.length()));
72 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, request.c_str());
73 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION,
CurlRecvCB);
74 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA,
this);
75 curl_easy_setopt(h_curl, CURLOPT_XFERINFOFUNCTION,
CurlProgressCB);
76 curl_easy_setopt(h_curl, CURLOPT_XFERINFODATA,
this);
79 const CURLcode ret = curl_easy_perform(h_curl);
80 if (ret && ret != CURLE_ABORTED_BY_CALLBACK) {
82 "SubscriberSSE - event loop finished with error: %d. Reply: %s\n",
87 curl_easy_cleanup(h_curl);
99 if (s.substr(0, 6) ==
"data: ") {
111 if (size * nmemb < 1) {
115 const std::string buf(static_cast<char *>(buffer));
117 std::vector<std::string> lines =
SplitString(buf,
'\n');
119 if (lines.size() == 1) {
126 for (
size_t i = 1; i < lines.size(); ++i) {
127 if (lines[i].substr(0, 5) ==
"data: ") {
146 curl_off_t dlnow, curl_off_t ultotal,
151 "SubscriberSSE - quit request received. Stopping\n");
virtual void Unsubscribe()
const LogFacilities & kLogError
const LogFacilities & kLogInfo
static int CurlProgressCB(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
vector< string > SplitString(const string &str, char delim)
void AppendToBuffer(const std::string &s)
atomic_int32 should_quit_
virtual Status Consume(const std::string &topic, const std::string &msg_text)=0
static size_t CurlRecvCB(void *buffer, size_t size, size_t nmemb, void *userp)
virtual bool Subscribe(const std::string &topic)
SubscriberSSE(const std::string &server_url)
static Url * Parse(const std::string &url, const std::string &default_protocol=kDefaultProtocol, int default_port=kDefaultPort)
static LogFacilities info
static LogFacilities error
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)