GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/notify/subscriber_sse.cc
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 0 80 0.0%
Branches: 0 113 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "subscriber_sse.h"
6
7 #include "cvmfs_config.h"
8
9 #include <vector>
10
11 #include "url.h"
12 #include "util/logging.h"
13 #include "util/pointer.h"
14 #include "util/string.h"
15
16 namespace {
17
18 const LogFacilities& kLogInfo = DefaultLogging::info;
19 const LogFacilities& kLogError = DefaultLogging::error;
20
21 } // namespace
22
23 namespace notify {
24
25 SubscriberSSE::SubscriberSSE(const std::string& server_url)
26 : Subscriber(),
27 server_url_(server_url + "/notifications/subscribe"),
28 topic_(),
29 buffer_(),
30 should_quit_(false) {}
31
32 SubscriberSSE::~SubscriberSSE() {}
33
34 bool SubscriberSSE::Subscribe(const std::string& topic) {
35 UniquePtr<Url> url(Url::Parse(server_url_));
36
37 if (!url.IsValid()) {
38 LogCvmfs(kLogCvmfs, kLogError,
39 "SubscriberSSE - could not parse notification server url: %s\n",
40 server_url_.c_str());
41 return false;
42 }
43
44 this->topic_ = topic;
45
46 std::string request = "{\"version\":1,\"repository\":\"" + topic + "\"}";
47
48 const char* user_agent_string = "cvmfs/" VERSION;
49
50 CURL* h_curl = curl_easy_init();
51 if (h_curl == NULL) {
52 LogCvmfs(kLogCvmfs, kLogError, "Could not create Curl handle\n");
53 return false;
54 }
55
56 if (h_curl) {
57 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 0L);
58 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, user_agent_string);
59 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
60 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "GET");
61 }
62
63 if (!h_curl) {
64 LogCvmfs(kLogCvmfs, kLogError,
65 "SubscriberSSE - error initializing CURL context\n");
66 return false;
67 }
68
69 // Make request to acquire lease from repo services
70 curl_easy_setopt(h_curl, CURLOPT_URL, server_url_.c_str());
71 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
72 static_cast<curl_off_t>(request.length()));
73 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, request.c_str());
74 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, CurlRecvCB);
75 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, this);
76 curl_easy_setopt(h_curl, CURLOPT_XFERINFOFUNCTION, CurlProgressCB);
77 curl_easy_setopt(h_curl, CURLOPT_XFERINFODATA, this);
78
79 bool success = true;
80 CURLcode ret = curl_easy_perform(h_curl);
81 if (ret && ret != CURLE_ABORTED_BY_CALLBACK) {
82 LogCvmfs(kLogCvmfs, kLogError,
83 "SubscriberSSE - event loop finished with error: %d. Reply: %s\n",
84 ret, this->buffer_.c_str());
85 success = false;
86 }
87
88 curl_easy_cleanup(h_curl);
89 h_curl = NULL;
90
91 return success;
92 }
93
94 void SubscriberSSE::Unsubscribe() {
95 atomic_write32(&should_quit_, 1);
96 }
97
98 bool SubscriberSSE::ShouldQuit() const {
99 return atomic_read32(&should_quit_);
100 }
101
102 void SubscriberSSE::AppendToBuffer(const std::string& s) {
103 size_t start = 0;
104 if (s.substr(0, 6) == "data: ") {
105 start = 6;
106 }
107 buffer_ += s.substr(start);
108 }
109
110 void SubscriberSSE::ClearBuffer() { buffer_.clear(); }
111
112 size_t SubscriberSSE::CurlRecvCB(void* buffer, size_t size, size_t nmemb,
113 void* userp) {
114 notify::SubscriberSSE* sub = static_cast<notify::SubscriberSSE*>(userp);
115
116 if (size * nmemb < 1) {
117 return 0;
118 }
119
120 std::string buf(static_cast<char*>(buffer));
121
122 std::vector<std::string> lines = SplitString(buf, '\n');
123
124 if (lines.size() == 1) {
125 sub->AppendToBuffer(lines[0]);
126 } else {
127 sub->AppendToBuffer(lines[0]);
128 notify::Subscriber::Status st = sub->Consume(sub->topic_, sub->buffer_);
129 sub->ClearBuffer();
130 for (size_t i = 1; i < lines.size(); ++i) {
131 if (lines[i].substr(0, 5) == "data: ") {
132 sub->AppendToBuffer(lines[i]);
133 }
134 }
135 switch (st) {
136 case notify::Subscriber::kFinish:
137 sub->Unsubscribe();
138 break;
139 case notify::Subscriber::kError:
140 return 0;
141 default:
142 break;
143 }
144 }
145
146 return size * nmemb;
147 }
148
149 int SubscriberSSE::CurlProgressCB(void* clientp, curl_off_t dltotal,
150 curl_off_t dlnow, curl_off_t ultotal,
151 curl_off_t ulnow) {
152 notify::SubscriberSSE* sub = static_cast<notify::SubscriberSSE*>(clientp);
153 if (sub->ShouldQuit()) {
154 LogCvmfs(kLogCvmfs, kLogInfo,
155 "SubscriberSSE - quit request received. Stopping\n");
156 return 1;
157 }
158
159 return 0;
160 }
161
162 } // namespace notify
163