GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/notify/subscriber_sse.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 0 78 0.0%
Branches: 0 113 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "subscriber_sse.h"
6
7 #include <vector>
8
9 #include "url.h"
10 #include "util/logging.h"
11 #include "util/pointer.h"
12 #include "util/string.h"
13
14 namespace {
15
16 const LogFacilities &kLogInfo = DefaultLogging::info;
17 const LogFacilities &kLogError = DefaultLogging::error;
18
19 } // namespace
20
21 namespace notify {
22
23 SubscriberSSE::SubscriberSSE(const std::string &server_url)
24 : Subscriber()
25 , server_url_(server_url + "/notifications/subscribe")
26 , topic_()
27 , buffer_()
28 , should_quit_(false) { }
29
30 SubscriberSSE::~SubscriberSSE() { }
31
32 bool SubscriberSSE::Subscribe(const std::string &topic) {
33 const UniquePtr<Url> url(Url::Parse(server_url_));
34
35 if (!url.IsValid()) {
36 LogCvmfs(kLogCvmfs, kLogError,
37 "SubscriberSSE - could not parse notification server url: %s\n",
38 server_url_.c_str());
39 return false;
40 }
41
42 this->topic_ = topic;
43
44 const std::string request =
45 "{\"version\":1,\"repository\":\"" + topic + "\"}";
46
47 const char *user_agent_string = "cvmfs/" CVMFS_VERSION;
48
49 CURL *h_curl = curl_easy_init();
50 if (h_curl == NULL) {
51 LogCvmfs(kLogCvmfs, kLogError, "Could not create Curl handle\n");
52 return false;
53 }
54
55 if (h_curl) {
56 curl_easy_setopt(h_curl, CURLOPT_NOPROGRESS, 0L);
57 curl_easy_setopt(h_curl, CURLOPT_USERAGENT, user_agent_string);
58 curl_easy_setopt(h_curl, CURLOPT_MAXREDIRS, 50L);
59 curl_easy_setopt(h_curl, CURLOPT_CUSTOMREQUEST, "GET");
60 }
61
62 if (!h_curl) {
63 LogCvmfs(kLogCvmfs, kLogError,
64 "SubscriberSSE - error initializing CURL context\n");
65 return false;
66 }
67
68 // Make request to acquire lease from repo services
69 curl_easy_setopt(h_curl, CURLOPT_URL, server_url_.c_str());
70 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDSIZE_LARGE,
71 static_cast<curl_off_t>(request.length()));
72 curl_easy_setopt(h_curl, CURLOPT_POSTFIELDS, request.c_str());
73 curl_easy_setopt(h_curl, CURLOPT_WRITEFUNCTION, CurlRecvCB);
74 curl_easy_setopt(h_curl, CURLOPT_WRITEDATA, this);
75 curl_easy_setopt(h_curl, CURLOPT_XFERINFOFUNCTION, CurlProgressCB);
76 curl_easy_setopt(h_curl, CURLOPT_XFERINFODATA, this);
77
78 bool success = true;
79 const CURLcode ret = curl_easy_perform(h_curl);
80 if (ret && ret != CURLE_ABORTED_BY_CALLBACK) {
81 LogCvmfs(kLogCvmfs, kLogError,
82 "SubscriberSSE - event loop finished with error: %d. Reply: %s\n",
83 ret, this->buffer_.c_str());
84 success = false;
85 }
86
87 curl_easy_cleanup(h_curl);
88 h_curl = NULL;
89
90 return success;
91 }
92
93 void SubscriberSSE::Unsubscribe() { atomic_write32(&should_quit_, 1); }
94
95 bool SubscriberSSE::ShouldQuit() const { return atomic_read32(&should_quit_); }
96
97 void SubscriberSSE::AppendToBuffer(const std::string &s) {
98 size_t start = 0;
99 if (s.substr(0, 6) == "data: ") {
100 start = 6;
101 }
102 buffer_ += s.substr(start);
103 }
104
105 void SubscriberSSE::ClearBuffer() { buffer_.clear(); }
106
107 size_t SubscriberSSE::CurlRecvCB(void *buffer, size_t size, size_t nmemb,
108 void *userp) {
109 notify::SubscriberSSE *sub = static_cast<notify::SubscriberSSE *>(userp);
110
111 if (size * nmemb < 1) {
112 return 0;
113 }
114
115 const std::string buf(static_cast<char *>(buffer));
116
117 std::vector<std::string> lines = SplitString(buf, '\n');
118
119 if (lines.size() == 1) {
120 sub->AppendToBuffer(lines[0]);
121 } else {
122 sub->AppendToBuffer(lines[0]);
123 const notify::Subscriber::Status st =
124 sub->Consume(sub->topic_, sub->buffer_);
125 sub->ClearBuffer();
126 for (size_t i = 1; i < lines.size(); ++i) {
127 if (lines[i].substr(0, 5) == "data: ") {
128 sub->AppendToBuffer(lines[i]);
129 }
130 }
131 switch (st) {
132 case notify::Subscriber::kFinish:
133 sub->Unsubscribe();
134 break;
135 case notify::Subscriber::kError:
136 return 0;
137 default:
138 break;
139 }
140 }
141
142 return size * nmemb;
143 }
144
145 int SubscriberSSE::CurlProgressCB(void *clientp, curl_off_t dltotal,
146 curl_off_t dlnow, curl_off_t ultotal,
147 curl_off_t ulnow) {
148 notify::SubscriberSSE *sub = static_cast<notify::SubscriberSSE *>(clientp);
149 if (sub->ShouldQuit()) {
150 LogCvmfs(kLogCvmfs, kLogInfo,
151 "SubscriberSSE - quit request received. Stopping\n");
152 return 1;
153 }
154
155 return 0;
156 }
157
158 } // namespace notify
159