Directory: | cvmfs/ |
---|---|
File: | cvmfs/receiver/reactor.cc |
Date: | 2025-03-09 02:34:28 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 157 | 314 | 50.0% |
Branches: | 135 | 572 | 23.6% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "reactor.h" | ||
6 | |||
7 | #include <stdint.h> | ||
8 | #include <unistd.h> | ||
9 | |||
10 | #include <cstdlib> | ||
11 | #include <cstring> | ||
12 | #include <utility> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "commit_processor.h" | ||
16 | #include "json_document_write.h" | ||
17 | #include "payload_processor.h" | ||
18 | #include "repository_tag.h" | ||
19 | #include "session_token.h" | ||
20 | #include "upload_facility.h" | ||
21 | #include "util/exception.h" | ||
22 | #include "util/logging.h" | ||
23 | #include "util/pointer.h" | ||
24 | #include "util/posix.h" | ||
25 | #include "util/string.h" | ||
26 | |||
27 | namespace receiver { | ||
28 | |||
29 | // NOTE, during the handling of the messages between the gateway and the | ||
30 | // receiver, we keep reading `4` bytes instead of the more common | ||
31 | // `sizeof(req_id)` or `sizeof(int32_t)`. | ||
32 | // This mirror well the behaviour of the gateway code. | ||
33 | // It would be possible on both codebase to ask the size of the type, but then | ||
34 | // we would need to make sure that the types are actually the same. | ||
35 | // It is simpler to send `4` bytes. | ||
36 | |||
37 | 9 | Reactor::Request Reactor::ReadRequest(int fd, std::string* data) { | |
38 | using namespace receiver; // NOLINT | ||
39 | |||
40 | // First, read the command identifier | ||
41 | 9 | int32_t req_id = kQuit; | |
42 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | int nb = SafeRead(fd, &req_id, 4); |
43 | |||
44 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
|
9 | if (nb != 4) { |
45 | ✗ | return kError; | |
46 | } | ||
47 | |||
48 | // Then, read message size | ||
49 | 9 | int32_t msg_size = 0; | |
50 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | nb = SafeRead(fd, &msg_size, 4); |
51 | |||
52 |
2/4✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
|
9 | if (req_id == kError || nb != 4) { |
53 | ✗ | return kError; | |
54 | } | ||
55 | |||
56 | // Finally read the message body | ||
57 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 3 times.
|
9 | if (msg_size > 0) { |
58 |
1/2✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
|
6 | std::vector<char> buffer(msg_size); |
59 |
1/2✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
|
6 | nb = SafeRead(fd, &buffer[0], msg_size); |
60 | |||
61 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
|
6 | if (nb != msg_size) { |
62 | ✗ | return kError; | |
63 | } | ||
64 | |||
65 |
1/2✓ Branch 3 taken 6 times.
✗ Branch 4 not taken.
|
6 | *data = std::string(&buffer[0], msg_size); |
66 |
1/2✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
|
6 | } |
67 | |||
68 | 9 | return static_cast<Request>(req_id); | |
69 | } | ||
70 | |||
71 | 9 | bool Reactor::WriteRequest(int fd, Request req, const std::string& data) { | |
72 | 9 | const int32_t msg_size = data.size(); | |
73 | 9 | const int32_t total_size = 8 + data.size(); // req + msg_size + data | |
74 | |||
75 |
1/2✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
|
9 | std::vector<char> buffer(total_size); |
76 | |||
77 | 9 | memcpy(&buffer[0], &req, 4); | |
78 | 9 | memcpy(&buffer[4], &msg_size, 4); | |
79 | |||
80 |
2/2✓ Branch 1 taken 6 times.
✓ Branch 2 taken 3 times.
|
9 | if (!data.empty()) { |
81 | 6 | memcpy(&buffer[8], &data[0], data.size()); | |
82 | } | ||
83 | |||
84 |
1/2✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
|
18 | return SafeWrite(fd, &buffer[0], total_size); |
85 | 9 | } | |
86 | |||
87 | 9 | bool Reactor::ReadReply(int fd, std::string* data) { | |
88 | 9 | int32_t msg_size(0); | |
89 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | int nb = SafeRead(fd, &msg_size, 4); |
90 | |||
91 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
|
9 | if (nb != 4) { |
92 | ✗ | return false; | |
93 | } | ||
94 | |||
95 |
1/2✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
|
9 | std::vector<char> buffer(msg_size); |
96 |
1/2✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
|
9 | nb = SafeRead(fd, &buffer[0], msg_size); |
97 | |||
98 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
|
9 | if (nb != msg_size) { |
99 | ✗ | return false; | |
100 | } | ||
101 | |||
102 |
1/2✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
|
9 | *data = std::string(&buffer[0], msg_size); |
103 | |||
104 | 9 | return true; | |
105 | 9 | } | |
106 | |||
107 | 9 | bool Reactor::WriteReply(int fd, const std::string& data) { | |
108 | 9 | const int32_t msg_size = data.size(); | |
109 | 9 | const int32_t total_size = 4 + data.size(); | |
110 | |||
111 |
1/2✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
|
9 | std::vector<char> buffer(total_size); |
112 | |||
113 | 9 | memcpy(&buffer[0], &msg_size, 4); | |
114 | |||
115 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | if (!data.empty()) { |
116 | 9 | memcpy(&buffer[4], &data[0], data.size()); | |
117 | } | ||
118 | |||
119 |
1/2✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
|
18 | return SafeWrite(fd, &buffer[0], total_size); |
120 | 9 | } | |
121 | |||
122 | ✗ | bool Reactor::ExtractStatsFromReq(JsonDocument* req, perf::Statistics* stats, | |
123 | std::string* start_time) { | ||
124 | ✗ | perf::StatisticsTemplate stats_tmpl("publish", stats); | |
125 | ✗ | upload::UploadCounters counters(stats_tmpl); | |
126 | |||
127 | const JSON* statistics = | ||
128 | ✗ | JsonDocument::SearchInObject(req->root(), "statistics", JSON_OBJECT); | |
129 | ✗ | if (statistics == NULL) { | |
130 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
131 | "Could not find 'statistics' field in request"); | ||
132 | ✗ | return false; | |
133 | } | ||
134 | |||
135 | const JSON* publish_ctrs = | ||
136 | ✗ | JsonDocument::SearchInObject(statistics, "publish", JSON_OBJECT); | |
137 | |||
138 | ✗ | if (publish_ctrs == NULL) { | |
139 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
140 | "Could not find 'statistics.publish' field in request"); | ||
141 | ✗ | return false; | |
142 | } | ||
143 | |||
144 | const JSON* n_chunks_added = | ||
145 | ✗ | JsonDocument::SearchInObject(publish_ctrs, "n_chunks_added", JSON_INT); | |
146 | ✗ | const JSON* n_chunks_duplicated = JsonDocument::SearchInObject( | |
147 | publish_ctrs, "n_chunks_duplicated", JSON_INT); | ||
148 | const JSON* n_catalogs_added = | ||
149 | ✗ | JsonDocument::SearchInObject(publish_ctrs, "n_catalogs_added", JSON_INT); | |
150 | const JSON* sz_uploaded_bytes = | ||
151 | ✗ | JsonDocument::SearchInObject(publish_ctrs, "sz_uploaded_bytes", JSON_INT); | |
152 | ✗ | const JSON* sz_uploaded_catalog_bytes = JsonDocument::SearchInObject( | |
153 | publish_ctrs, "sz_uploaded_catalog_bytes", JSON_INT); | ||
154 | |||
155 | const JSON* start_time_json = | ||
156 | ✗ | JsonDocument::SearchInObject(statistics, "start_time", JSON_STRING); | |
157 | |||
158 | ✗ | if (n_chunks_added == NULL || n_chunks_duplicated == NULL || | |
159 | ✗ | n_catalogs_added == NULL || sz_uploaded_bytes == NULL || | |
160 | ✗ | sz_uploaded_catalog_bytes == NULL || start_time_json == NULL) { | |
161 | ✗ | return false; | |
162 | } | ||
163 | |||
164 | ✗ | perf::Xadd(counters.n_chunks_added, n_chunks_added->int_value); | |
165 | ✗ | perf::Xadd(counters.n_chunks_duplicated, n_chunks_duplicated->int_value); | |
166 | ✗ | perf::Xadd(counters.n_catalogs_added, n_catalogs_added->int_value); | |
167 | ✗ | perf::Xadd(counters.sz_uploaded_bytes, sz_uploaded_bytes->int_value); | |
168 | ✗ | perf::Xadd(counters.sz_uploaded_catalog_bytes, | |
169 | ✗ | sz_uploaded_catalog_bytes->int_value); | |
170 | |||
171 | ✗ | *start_time = start_time_json->string_value; | |
172 | |||
173 | ✗ | return true; | |
174 | } | ||
175 | |||
176 | 3 | Reactor::Reactor(int fdin, int fdout) : fdin_(fdin), fdout_(fdout) {} | |
177 | |||
178 | 6 | Reactor::~Reactor() {} | |
179 | |||
180 | 3 | bool Reactor::Run() { | |
181 | 3 | std::string msg_body; | |
182 | 3 | Request req = kQuit; | |
183 | do { | ||
184 | 9 | msg_body.clear(); | |
185 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | req = ReadRequest(fdin_, &msg_body); |
186 |
2/4✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 9 times.
|
9 | if (!HandleRequest(req, msg_body)) { |
187 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
188 | "Reactor: could not handle request %d. Exiting", req); | ||
189 | ✗ | return false; | |
190 | } | ||
191 |
2/2✓ Branch 0 taken 6 times.
✓ Branch 1 taken 3 times.
|
9 | } while (req != kQuit); |
192 | |||
193 | 3 | return true; | |
194 | 3 | } | |
195 | |||
196 | 2 | bool Reactor::HandleGenerateToken(const std::string& req, std::string* reply) { | |
197 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (reply == NULL) { |
198 | ✗ | PANIC(kLogSyslogErr, "HandleGenerateToken: Invalid reply pointer."); | |
199 | } | ||
200 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
201 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (!req_json.IsValid()) { |
202 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
203 | "HandleGenerateToken: Invalid JSON request."); | ||
204 | ✗ | return false; | |
205 | } | ||
206 | |||
207 | const JSON* key_id = | ||
208 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | JsonDocument::SearchInObject(req_json->root(), "key_id", JSON_STRING); |
209 | const JSON* path = | ||
210 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING); |
211 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | const JSON* max_lease_time = JsonDocument::SearchInObject( |
212 | req_json->root(), "max_lease_time", JSON_INT); | ||
213 | |||
214 |
3/6✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
|
2 | if (key_id == NULL || path == NULL || max_lease_time == NULL) { |
215 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
216 | "HandleGenerateToken: Missing fields in request."); | ||
217 | ✗ | return false; | |
218 | } | ||
219 | |||
220 | 2 | std::string session_token; | |
221 | 2 | std::string public_token_id; | |
222 | 2 | std::string token_secret; | |
223 | |||
224 |
4/8✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2 times.
✗ Branch 9 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 2 times.
|
2 | if (!GenerateSessionToken(key_id->string_value, path->string_value, |
225 | 2 | max_lease_time->int_value, &session_token, | |
226 | &public_token_id, &token_secret)) { | ||
227 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
228 | "HandleGenerateToken: Could not generate session token."); | ||
229 | ✗ | return false; | |
230 | } | ||
231 | |||
232 | 2 | JsonStringGenerator input; | |
233 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
|
2 | input.Add("token", session_token); |
234 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
|
2 | input.Add("id", public_token_id); |
235 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
|
2 | input.Add("secret", token_secret); |
236 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | std::string json = input.GenerateString(); |
237 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | *reply = json; |
238 | |||
239 | 2 | return true; | |
240 | 2 | } | |
241 | |||
242 | 1 | bool Reactor::HandleGetTokenId(const std::string& req, std::string* reply) { | |
243 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (reply == NULL) { |
244 | ✗ | PANIC(kLogSyslogErr, "HandleGetTokenId: Invalid reply pointer."); | |
245 | } | ||
246 | |||
247 | 1 | std::string token_id; | |
248 | 1 | JsonStringGenerator input; | |
249 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
|
1 | if (!GetTokenPublicId(req, &token_id)) { |
250 | ✗ | input.Add("status", "error"); | |
251 | ✗ | input.Add("reason", "invalid_token"); | |
252 | } else { | ||
253 |
3/6✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
1 | input.Add("status", "ok"); |
254 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
|
1 | input.Add("id", token_id); |
255 | } | ||
256 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | std::string json = input.GenerateString(); |
257 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | *reply = json; |
258 | |||
259 | 1 | return true; | |
260 | 1 | } | |
261 | |||
262 | 1 | bool Reactor::HandleCheckToken(const std::string& req, std::string* reply) { | |
263 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (reply == NULL) { |
264 | ✗ | PANIC(kLogSyslogErr, "HandleCheckToken: Invalid reply pointer."); | |
265 | } | ||
266 | |||
267 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
268 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
|
1 | if (!req_json.IsValid()) { |
269 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
270 | "HandleCheckToken: Invalid JSON request."); | ||
271 | ✗ | return false; | |
272 | } | ||
273 | |||
274 | const JSON* token = | ||
275 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
|
1 | JsonDocument::SearchInObject(req_json->root(), "token", JSON_STRING); |
276 | const JSON* secret = | ||
277 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
|
1 | JsonDocument::SearchInObject(req_json->root(), "secret", JSON_STRING); |
278 | |||
279 |
2/4✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
|
1 | if (token == NULL || secret == NULL) { |
280 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
281 | "HandleCheckToken: Missing fields in request."); | ||
282 | ✗ | return false; | |
283 | } | ||
284 | |||
285 | 1 | std::string path; | |
286 | 1 | JsonStringGenerator input; | |
287 | TokenCheckResult ret = | ||
288 |
3/6✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
1 | CheckToken(token->string_value, secret->string_value, &path); |
289 |
1/4✗ Branch 0 not taken.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | switch (ret) { |
290 | ✗ | case kExpired: | |
291 | // Expired token | ||
292 | ✗ | input.Add("status", "error"); | |
293 | ✗ | input.Add("reason", "expired_token"); | |
294 | ✗ | break; | |
295 | ✗ | case kInvalid: | |
296 | // Invalid token | ||
297 | ✗ | input.Add("status", "error"); | |
298 | ✗ | input.Add("reason", "invalid_token"); | |
299 | ✗ | break; | |
300 | 1 | case kValid: | |
301 | // All ok | ||
302 |
3/6✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
1 | input.Add("status", "ok"); |
303 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
|
1 | input.Add("path", path); |
304 | 1 | break; | |
305 | ✗ | default: | |
306 | // Should not be reached | ||
307 | ✗ | PANIC(kLogSyslogErr, | |
308 | "HandleCheckToken: Unknown value received. Exiting."); | ||
309 | } | ||
310 | |||
311 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | std::string json = input.GenerateString(); |
312 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | *reply = json; |
313 | |||
314 | 1 | return true; | |
315 | 1 | } | |
316 | |||
317 | // This is a special handler. We need to continue reading the payload from the | ||
318 | // fdin_ | ||
319 | 1 | bool Reactor::HandleSubmitPayload(int fdin, const std::string& req, | |
320 | std::string* reply) { | ||
321 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1 times.
|
1 | if (!reply) { |
322 | ✗ | PANIC(kLogSyslogErr, "HandleSubmitPayload: Invalid reply pointer."); | |
323 | } | ||
324 | |||
325 | // Extract the Path (used for verification), Digest and DigestSize from the | ||
326 | // request JSON. | ||
327 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
328 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
|
1 | if (!req_json.IsValid()) { |
329 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
330 | "HandleSubmitPayload: Invalid JSON request."); | ||
331 | ✗ | return false; | |
332 | } | ||
333 | |||
334 | const JSON* path_json = | ||
335 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
|
1 | JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING); |
336 | const JSON* digest_json = | ||
337 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
|
1 | JsonDocument::SearchInObject(req_json->root(), "digest", JSON_STRING); |
338 | const JSON* header_size_json = | ||
339 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 1 times.
✗ Branch 8 not taken.
|
1 | JsonDocument::SearchInObject(req_json->root(), "header_size", JSON_INT); |
340 | |||
341 |
3/6✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 times.
|
1 | if (path_json == NULL || digest_json == NULL || header_size_json == NULL) { |
342 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
343 | "HandleSubmitPayload: Missing fields in request."); | ||
344 | ✗ | return false; | |
345 | } | ||
346 | |||
347 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | perf::Statistics statistics; |
348 | |||
349 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | UniquePtr<PayloadProcessor> proc(MakePayloadProcessor()); |
350 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | proc->SetStatistics(&statistics); |
351 | 1 | JsonStringGenerator reply_input; | |
352 | PayloadProcessor::Result res = | ||
353 |
3/6✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
2 | proc->Process(fdin, digest_json->string_value, path_json->string_value, |
354 | 1 | header_size_json->int_value); | |
355 | |||
356 |
1/5✗ Branch 0 not taken.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 times.
✗ Branch 4 not taken.
|
1 | switch (res) { |
357 | ✗ | case PayloadProcessor::kPathViolation: | |
358 | ✗ | reply_input.Add("status", "error"); | |
359 | ✗ | reply_input.Add("reason", "path_violation"); | |
360 | ✗ | break; | |
361 | ✗ | case PayloadProcessor::kOtherError: | |
362 | ✗ | reply_input.Add("status", "error"); | |
363 | ✗ | reply_input.Add("reason", "other_error"); | |
364 | ✗ | break; | |
365 | ✗ | case PayloadProcessor::kUploaderError: | |
366 | ✗ | reply_input.Add("status", "error"); | |
367 | ✗ | reply_input.Add("reason", "uploader_error"); | |
368 | ✗ | break; | |
369 | 1 | case PayloadProcessor::kSuccess: | |
370 |
3/6✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
|
1 | reply_input.Add("status", "ok"); |
371 | 1 | break; | |
372 | ✗ | default: | |
373 | ✗ | PANIC(kLogSyslogErr, | |
374 | "HandleSubmitPayload: Unknown value of PayloadProcessor::Result " | ||
375 | "encountered."); | ||
376 | break; | ||
377 | } | ||
378 | |||
379 | // HandleSubmitPayload sends partial statistics back to the gateway | ||
380 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
2 | std::string stats_json = statistics.PrintJSON(); |
381 |
2/4✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
|
1 | reply_input.AddJsonObject("statistics", stats_json); |
382 | |||
383 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | std::string json = reply_input.GenerateString(); |
384 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | *reply = json; |
385 | |||
386 | 1 | return true; | |
387 | 1 | } | |
388 | |||
389 | ✗ | bool Reactor::HandleCommit(const std::string& req, std::string* reply) { | |
390 | ✗ | if (!reply) { | |
391 | ✗ | PANIC(kLogSyslogErr, "HandleCommit: Invalid reply pointer."); | |
392 | } | ||
393 | // Extract the Path from the request JSON. | ||
394 | ✗ | UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); | |
395 | ✗ | if (!req_json.IsValid()) { | |
396 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
397 | "HandleCommit: Invalid JSON request."); | ||
398 | ✗ | return false; | |
399 | } | ||
400 | |||
401 | const JSON* lease_path_json = | ||
402 | ✗ | JsonDocument::SearchInObject(req_json->root(), "lease_path", JSON_STRING); | |
403 | ✗ | const JSON* old_root_hash_json = JsonDocument::SearchInObject( | |
404 | req_json->root(), "old_root_hash", JSON_STRING); | ||
405 | ✗ | const JSON* new_root_hash_json = JsonDocument::SearchInObject( | |
406 | req_json->root(), "new_root_hash", JSON_STRING); | ||
407 | const JSON* tag_name_json = | ||
408 | ✗ | JsonDocument::SearchInObject(req_json->root(), "tag_name", JSON_STRING); | |
409 | ✗ | const JSON* tag_description_json = JsonDocument::SearchInObject( | |
410 | req_json->root(), "tag_description", JSON_STRING); | ||
411 | |||
412 | ✗ | if (lease_path_json == NULL || old_root_hash_json == NULL || | |
413 | new_root_hash_json == NULL) { | ||
414 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
415 | "HandleCommit: Missing fields in request."); | ||
416 | ✗ | return false; | |
417 | } | ||
418 | |||
419 | ✗ | perf::Statistics statistics; | |
420 | ✗ | std::string start_time; | |
421 | ✗ | if (!Reactor::ExtractStatsFromReq(req_json.weak_ref(), &statistics, | |
422 | &start_time)) { | ||
423 | ✗ | LogCvmfs( | |
424 | kLogReceiver, kLogSyslogErr, | ||
425 | "HandleCommit: Could not extract statistics counters from request"); | ||
426 | } | ||
427 | uint64_t final_revision; | ||
428 | |||
429 | // Here we use the path to commit the changes! | ||
430 | ✗ | UniquePtr<CommitProcessor> proc(MakeCommitProcessor()); | |
431 | ✗ | proc->SetStatistics(&statistics, start_time); | |
432 | ✗ | shash::Any old_root_hash = shash::MkFromSuffixedHexPtr( | |
433 | ✗ | shash::HexPtr(old_root_hash_json->string_value)); | |
434 | ✗ | shash::Any new_root_hash = shash::MkFromSuffixedHexPtr( | |
435 | ✗ | shash::HexPtr(new_root_hash_json->string_value)); | |
436 | ✗ | RepositoryTag repo_tag(tag_name_json->string_value, | |
437 | ✗ | tag_description_json->string_value); | |
438 | CommitProcessor::Result res = | ||
439 | ✗ | proc->Process(lease_path_json->string_value, old_root_hash, new_root_hash, | |
440 | repo_tag, &final_revision); | ||
441 | |||
442 | ✗ | JsonStringGenerator reply_input; | |
443 | ✗ | switch (res) { | |
444 | ✗ | case CommitProcessor::kSuccess: | |
445 | ✗ | reply_input.Add("status", "ok"); | |
446 | ✗ | reply_input.Add("final_revision", static_cast<int64_t>(final_revision)); | |
447 | ✗ | break; | |
448 | ✗ | case CommitProcessor::kError: | |
449 | ✗ | reply_input.Add("status", "error"); | |
450 | ✗ | reply_input.Add("reason", "miscellaneous"); | |
451 | ✗ | break; | |
452 | ✗ | case CommitProcessor::kMergeFailure: | |
453 | ✗ | reply_input.Add("status", "error"); | |
454 | ✗ | reply_input.Add("reason", "merge_error"); | |
455 | ✗ | break; | |
456 | ✗ | case CommitProcessor::kMissingReflog: | |
457 | ✗ | reply_input.Add("status", "error"); | |
458 | ✗ | reply_input.Add("reason", "missing_reflog"); | |
459 | ✗ | break; | |
460 | ✗ | default: | |
461 | ✗ | PANIC(kLogSyslogErr, | |
462 | "Unknown value of CommitProcessor::Result encountered."); | ||
463 | break; | ||
464 | } | ||
465 | |||
466 | ✗ | std::string json = reply_input.GenerateString(); | |
467 | ✗ | *reply = json; | |
468 | |||
469 | ✗ | return true; | |
470 | } | ||
471 | |||
472 | ✗ | PayloadProcessor* Reactor::MakePayloadProcessor() { | |
473 | ✗ | return new PayloadProcessor(); | |
474 | } | ||
475 | |||
476 | ✗ | CommitProcessor* Reactor::MakeCommitProcessor() { | |
477 | ✗ | return new CommitProcessor(); | |
478 | } | ||
479 | |||
480 | 9 | bool Reactor::HandleRequest(Request req, const std::string& data) { | |
481 | 9 | bool ok = true; | |
482 | 9 | std::string reply; | |
483 | try { | ||
484 |
6/10✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 1 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 1 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
|
9 | switch (req) { |
485 | 3 | case kQuit: | |
486 |
2/4✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
|
3 | ok = WriteReply(fdout_, "ok"); |
487 | 3 | break; | |
488 | 1 | case kEcho: | |
489 |
4/8✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 1 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 1 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 1 times.
✗ Branch 13 not taken.
|
1 | ok = WriteReply(fdout_, std::string("PID: ") + StringifyUint(getpid())); |
490 | 1 | break; | |
491 | 2 | case kGenerateToken: | |
492 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= HandleGenerateToken(data, &reply); |
493 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= WriteReply(fdout_, reply); |
494 | 2 | break; | |
495 | 1 | case kGetTokenId: | |
496 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ok &= HandleGetTokenId(data, &reply); |
497 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ok &= WriteReply(fdout_, reply); |
498 | 1 | break; | |
499 | 1 | case kCheckToken: | |
500 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ok &= HandleCheckToken(data, &reply); |
501 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ok &= WriteReply(fdout_, reply); |
502 | 1 | break; | |
503 | 1 | case kSubmitPayload: | |
504 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ok &= HandleSubmitPayload(fdin_, data, &reply); |
505 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | ok &= WriteReply(fdout_, reply); |
506 | 1 | break; | |
507 | ✗ | case kCommit: | |
508 | ✗ | ok &= HandleCommit(data, &reply); | |
509 | ✗ | ok &= WriteReply(fdout_, reply); | |
510 | ✗ | break; | |
511 | ✗ | case kTestCrash: | |
512 | ✗ | PANIC(kLogSyslogErr, | |
513 | "Crash for test purposes. Should never happen in production " | ||
514 | "environment."); | ||
515 | break; | ||
516 | ✗ | case kError: | |
517 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
518 | "Reactor: unknown command received."); | ||
519 | ✗ | ok = false; | |
520 | ✗ | break; | |
521 | ✗ | default: | |
522 | ✗ | break; | |
523 | } | ||
524 | ✗ | } catch (const ECvmfsException& e) { | |
525 | ✗ | reply.clear(); | |
526 | |||
527 | ✗ | std::string error("runtime error: "); | |
528 | ✗ | error += e.what(); | |
529 | |||
530 | ✗ | JsonStringGenerator input; | |
531 | ✗ | input.Add("status", "error"); | |
532 | ✗ | input.Add("reason", error); | |
533 | |||
534 | ✗ | reply = input.GenerateString(); | |
535 | ✗ | WriteReply(fdout_, reply); | |
536 | ✗ | throw e; | |
537 | } | ||
538 | |||
539 | 9 | return ok; | |
540 | 9 | } | |
541 | |||
542 | } // namespace receiver | ||
543 |