Directory: | cvmfs/ |
---|---|
File: | cvmfs/receiver/reactor.cc |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 157 | 315 | 49.8% |
Branches: | 135 | 572 | 23.6% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "reactor.h" | ||
6 | |||
7 | #include <stdint.h> | ||
8 | #include <unistd.h> | ||
9 | |||
10 | #include <cstdlib> | ||
11 | #include <cstring> | ||
12 | #include <utility> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "commit_processor.h" | ||
16 | #include "json_document_write.h" | ||
17 | #include "payload_processor.h" | ||
18 | #include "repository_tag.h" | ||
19 | #include "session_token.h" | ||
20 | #include "upload_facility.h" | ||
21 | #include "util/exception.h" | ||
22 | #include "util/logging.h" | ||
23 | #include "util/pointer.h" | ||
24 | #include "util/posix.h" | ||
25 | #include "util/string.h" | ||
26 | |||
27 | namespace receiver { | ||
28 | |||
29 | // NOTE, during the handling of the messages between the gateway and the | ||
30 | // receiver, we keep reading `4` bytes instead of the more common | ||
31 | // `sizeof(req_id)` or `sizeof(int32_t)`. | ||
32 | // This mirror well the behaviour of the gateway code. | ||
33 | // It would be possible on both codebase to ask the size of the type, but then | ||
34 | // we would need to make sure that the types are actually the same. | ||
35 | // It is simpler to send `4` bytes. | ||
36 | |||
37 | 18 | Reactor::Request Reactor::ReadRequest(int fd, std::string *data) { | |
38 | using namespace receiver; // NOLINT | ||
39 | |||
40 | // First, read the command identifier | ||
41 | 18 | int32_t req_id = kQuit; | |
42 |
1/2✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
|
18 | int nb = SafeRead(fd, &req_id, 4); |
43 | |||
44 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
|
18 | if (nb != 4) { |
45 | ✗ | return kError; | |
46 | } | ||
47 | |||
48 | // Then, read message size | ||
49 | 18 | int32_t msg_size = 0; | |
50 |
1/2✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
|
18 | nb = SafeRead(fd, &msg_size, 4); |
51 | |||
52 |
2/4✓ Branch 0 taken 18 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 18 times.
|
18 | if (req_id == kError || nb != 4) { |
53 | ✗ | return kError; | |
54 | } | ||
55 | |||
56 | // Finally read the message body | ||
57 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 6 times.
|
18 | if (msg_size > 0) { |
58 |
1/2✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
|
12 | std::vector<char> buffer(msg_size); |
59 |
1/2✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
|
12 | nb = SafeRead(fd, &buffer[0], msg_size); |
60 | |||
61 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12 times.
|
12 | if (nb != msg_size) { |
62 | ✗ | return kError; | |
63 | } | ||
64 | |||
65 |
1/2✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
|
12 | *data = std::string(&buffer[0], msg_size); |
66 |
1/2✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
|
12 | } |
67 | |||
68 | 18 | return static_cast<Request>(req_id); | |
69 | } | ||
70 | |||
71 | 18 | bool Reactor::WriteRequest(int fd, Request req, const std::string &data) { | |
72 | 18 | const int32_t msg_size = data.size(); | |
73 | 18 | const int32_t total_size = 8 + data.size(); // req + msg_size + data | |
74 | |||
75 |
1/2✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
|
18 | std::vector<char> buffer(total_size); |
76 | |||
77 | 18 | memcpy(&buffer[0], &req, 4); | |
78 | 18 | memcpy(&buffer[4], &msg_size, 4); | |
79 | |||
80 |
2/2✓ Branch 1 taken 12 times.
✓ Branch 2 taken 6 times.
|
18 | if (!data.empty()) { |
81 | 12 | memcpy(&buffer[8], &data[0], data.size()); | |
82 | } | ||
83 | |||
84 |
1/2✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
|
36 | return SafeWrite(fd, &buffer[0], total_size); |
85 | 18 | } | |
86 | |||
87 | 18 | bool Reactor::ReadReply(int fd, std::string *data) { | |
88 | 18 | int32_t msg_size(0); | |
89 |
1/2✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
|
18 | int nb = SafeRead(fd, &msg_size, 4); |
90 | |||
91 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
|
18 | if (nb != 4) { |
92 | ✗ | return false; | |
93 | } | ||
94 | |||
95 |
1/2✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
|
18 | std::vector<char> buffer(msg_size); |
96 |
1/2✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
|
18 | nb = SafeRead(fd, &buffer[0], msg_size); |
97 | |||
98 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
|
18 | if (nb != msg_size) { |
99 | ✗ | return false; | |
100 | } | ||
101 | |||
102 |
1/2✓ Branch 3 taken 18 times.
✗ Branch 4 not taken.
|
18 | *data = std::string(&buffer[0], msg_size); |
103 | |||
104 | 18 | return true; | |
105 | 18 | } | |
106 | |||
107 | 18 | bool Reactor::WriteReply(int fd, const std::string &data) { | |
108 | 18 | const int32_t msg_size = data.size(); | |
109 | 18 | const int32_t total_size = 4 + data.size(); | |
110 | |||
111 |
1/2✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
|
18 | std::vector<char> buffer(total_size); |
112 | |||
113 | 18 | memcpy(&buffer[0], &msg_size, 4); | |
114 | |||
115 |
1/2✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
|
18 | if (!data.empty()) { |
116 | 18 | memcpy(&buffer[4], &data[0], data.size()); | |
117 | } | ||
118 | |||
119 |
1/2✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
|
36 | return SafeWrite(fd, &buffer[0], total_size); |
120 | 18 | } | |
121 | |||
122 | ✗ | bool Reactor::ExtractStatsFromReq(JsonDocument *req, perf::Statistics *stats, | |
123 | std::string *start_time) { | ||
124 | ✗ | const perf::StatisticsTemplate stats_tmpl("publish", stats); | |
125 | ✗ | const upload::UploadCounters counters(stats_tmpl); | |
126 | |||
127 | ✗ | const JSON *statistics = JsonDocument::SearchInObject( | |
128 | req->root(), "statistics", JSON_OBJECT); | ||
129 | ✗ | if (statistics == NULL) { | |
130 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
131 | "Could not find 'statistics' field in request"); | ||
132 | ✗ | return false; | |
133 | } | ||
134 | |||
135 | ✗ | const JSON *publish_ctrs = JsonDocument::SearchInObject(statistics, "publish", | |
136 | JSON_OBJECT); | ||
137 | |||
138 | ✗ | if (publish_ctrs == NULL) { | |
139 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
140 | "Could not find 'statistics.publish' field in request"); | ||
141 | ✗ | return false; | |
142 | } | ||
143 | |||
144 | ✗ | const JSON *n_chunks_added = JsonDocument::SearchInObject( | |
145 | publish_ctrs, "n_chunks_added", JSON_INT); | ||
146 | ✗ | const JSON *n_chunks_duplicated = JsonDocument::SearchInObject( | |
147 | publish_ctrs, "n_chunks_duplicated", JSON_INT); | ||
148 | ✗ | const JSON *n_catalogs_added = JsonDocument::SearchInObject( | |
149 | publish_ctrs, "n_catalogs_added", JSON_INT); | ||
150 | ✗ | const JSON *sz_uploaded_bytes = JsonDocument::SearchInObject( | |
151 | publish_ctrs, "sz_uploaded_bytes", JSON_INT); | ||
152 | ✗ | const JSON *sz_uploaded_catalog_bytes = JsonDocument::SearchInObject( | |
153 | publish_ctrs, "sz_uploaded_catalog_bytes", JSON_INT); | ||
154 | |||
155 | ✗ | const JSON *start_time_json = JsonDocument::SearchInObject( | |
156 | statistics, "start_time", JSON_STRING); | ||
157 | |||
158 | ✗ | if (n_chunks_added == NULL || n_chunks_duplicated == NULL | |
159 | ✗ | || n_catalogs_added == NULL || sz_uploaded_bytes == NULL | |
160 | ✗ | || sz_uploaded_catalog_bytes == NULL || start_time_json == NULL) { | |
161 | ✗ | return false; | |
162 | } | ||
163 | |||
164 | ✗ | perf::Xadd(counters.n_chunks_added, n_chunks_added->int_value); | |
165 | ✗ | perf::Xadd(counters.n_chunks_duplicated, n_chunks_duplicated->int_value); | |
166 | ✗ | perf::Xadd(counters.n_catalogs_added, n_catalogs_added->int_value); | |
167 | ✗ | perf::Xadd(counters.sz_uploaded_bytes, sz_uploaded_bytes->int_value); | |
168 | ✗ | perf::Xadd(counters.sz_uploaded_catalog_bytes, | |
169 | ✗ | sz_uploaded_catalog_bytes->int_value); | |
170 | |||
171 | ✗ | *start_time = start_time_json->string_value; | |
172 | |||
173 | ✗ | return true; | |
174 | } | ||
175 | |||
176 | 6 | Reactor::Reactor(int fdin, int fdout) : fdin_(fdin), fdout_(fdout) { } | |
177 | |||
178 | 12 | Reactor::~Reactor() { } | |
179 | |||
180 | 6 | bool Reactor::Run() { | |
181 | 6 | std::string msg_body; | |
182 | 6 | Request req = kQuit; | |
183 | do { | ||
184 | 18 | msg_body.clear(); | |
185 |
1/2✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
|
18 | req = ReadRequest(fdin_, &msg_body); |
186 |
2/4✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 18 times.
|
18 | if (!HandleRequest(req, msg_body)) { |
187 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
188 | "Reactor: could not handle request %d. Exiting", req); | ||
189 | ✗ | return false; | |
190 | } | ||
191 |
2/2✓ Branch 0 taken 12 times.
✓ Branch 1 taken 6 times.
|
18 | } while (req != kQuit); |
192 | |||
193 | 6 | return true; | |
194 | 6 | } | |
195 | |||
196 | 4 | bool Reactor::HandleGenerateToken(const std::string &req, std::string *reply) { | |
197 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4 times.
|
4 | if (reply == NULL) { |
198 | ✗ | PANIC(kLogSyslogErr, "HandleGenerateToken: Invalid reply pointer."); | |
199 | } | ||
200 |
2/4✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4 times.
✗ Branch 5 not taken.
|
4 | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
201 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
|
4 | if (!req_json.IsValid()) { |
202 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
203 | "HandleGenerateToken: Invalid JSON request."); | ||
204 | ✗ | return false; | |
205 | } | ||
206 | |||
207 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
|
4 | const JSON *key_id = JsonDocument::SearchInObject(req_json->root(), "key_id", |
208 | JSON_STRING); | ||
209 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
|
4 | const JSON *path = JsonDocument::SearchInObject(req_json->root(), "path", |
210 | JSON_STRING); | ||
211 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 4 times.
✗ Branch 8 not taken.
|
4 | const JSON *max_lease_time = JsonDocument::SearchInObject( |
212 | req_json->root(), "max_lease_time", JSON_INT); | ||
213 | |||
214 |
3/6✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
|
4 | if (key_id == NULL || path == NULL || max_lease_time == NULL) { |
215 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
216 | "HandleGenerateToken: Missing fields in request."); | ||
217 | ✗ | return false; | |
218 | } | ||
219 | |||
220 | 4 | std::string session_token; | |
221 | 4 | std::string public_token_id; | |
222 | 4 | std::string token_secret; | |
223 | |||
224 |
4/8✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 4 times.
✗ Branch 9 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 4 times.
|
4 | if (!GenerateSessionToken(key_id->string_value, path->string_value, |
225 | 4 | max_lease_time->int_value, &session_token, | |
226 | &public_token_id, &token_secret)) { | ||
227 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
228 | "HandleGenerateToken: Could not generate session token."); | ||
229 | ✗ | return false; | |
230 | } | ||
231 | |||
232 | 4 | JsonStringGenerator input; | |
233 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
|
4 | input.Add("token", session_token); |
234 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
|
4 | input.Add("id", public_token_id); |
235 |
2/4✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
|
4 | input.Add("secret", token_secret); |
236 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | const std::string json = input.GenerateString(); |
237 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | *reply = json; |
238 | |||
239 | 4 | return true; | |
240 | 4 | } | |
241 | |||
242 | 2 | bool Reactor::HandleGetTokenId(const std::string &req, std::string *reply) { | |
243 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (reply == NULL) { |
244 | ✗ | PANIC(kLogSyslogErr, "HandleGetTokenId: Invalid reply pointer."); | |
245 | } | ||
246 | |||
247 | 2 | std::string token_id; | |
248 | 2 | JsonStringGenerator input; | |
249 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
|
2 | if (!GetTokenPublicId(req, &token_id)) { |
250 | ✗ | input.Add("status", "error"); | |
251 | ✗ | input.Add("reason", "invalid_token"); | |
252 | } else { | ||
253 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | input.Add("status", "ok"); |
254 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
|
2 | input.Add("id", token_id); |
255 | } | ||
256 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | const std::string json = input.GenerateString(); |
257 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | *reply = json; |
258 | |||
259 | 2 | return true; | |
260 | 2 | } | |
261 | |||
262 | 2 | bool Reactor::HandleCheckToken(const std::string &req, std::string *reply) { | |
263 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (reply == NULL) { |
264 | ✗ | PANIC(kLogSyslogErr, "HandleCheckToken: Invalid reply pointer."); | |
265 | } | ||
266 | |||
267 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
268 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (!req_json.IsValid()) { |
269 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
270 | "HandleCheckToken: Invalid JSON request."); | ||
271 | ✗ | return false; | |
272 | } | ||
273 | |||
274 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | const JSON *token = JsonDocument::SearchInObject(req_json->root(), "token", |
275 | JSON_STRING); | ||
276 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | const JSON *secret = JsonDocument::SearchInObject(req_json->root(), "secret", |
277 | JSON_STRING); | ||
278 | |||
279 |
2/4✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
|
2 | if (token == NULL || secret == NULL) { |
280 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
281 | "HandleCheckToken: Missing fields in request."); | ||
282 | ✗ | return false; | |
283 | } | ||
284 | |||
285 | 2 | std::string path; | |
286 | 2 | JsonStringGenerator input; | |
287 | const TokenCheckResult ret = | ||
288 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | CheckToken(token->string_value, secret->string_value, &path); |
289 |
1/4✗ Branch 0 not taken.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | switch (ret) { |
290 | ✗ | case kExpired: | |
291 | // Expired token | ||
292 | ✗ | input.Add("status", "error"); | |
293 | ✗ | input.Add("reason", "expired_token"); | |
294 | ✗ | break; | |
295 | ✗ | case kInvalid: | |
296 | // Invalid token | ||
297 | ✗ | input.Add("status", "error"); | |
298 | ✗ | input.Add("reason", "invalid_token"); | |
299 | ✗ | break; | |
300 | 2 | case kValid: | |
301 | // All ok | ||
302 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | input.Add("status", "ok"); |
303 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
|
2 | input.Add("path", path); |
304 | 2 | break; | |
305 | ✗ | default: | |
306 | // Should not be reached | ||
307 | ✗ | PANIC(kLogSyslogErr, | |
308 | "HandleCheckToken: Unknown value received. Exiting."); | ||
309 | } | ||
310 | |||
311 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | const std::string json = input.GenerateString(); |
312 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | *reply = json; |
313 | |||
314 | 2 | return true; | |
315 | 2 | } | |
316 | |||
317 | // This is a special handler. We need to continue reading the payload from the | ||
318 | // fdin_ | ||
319 | 2 | bool Reactor::HandleSubmitPayload(int fdin, const std::string &req, | |
320 | std::string *reply) { | ||
321 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (!reply) { |
322 | ✗ | PANIC(kLogSyslogErr, "HandleSubmitPayload: Invalid reply pointer."); | |
323 | } | ||
324 | |||
325 | // Extract the Path (used for verification), Digest and DigestSize from the | ||
326 | // request JSON. | ||
327 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
328 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
|
2 | if (!req_json.IsValid()) { |
329 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
330 | "HandleSubmitPayload: Invalid JSON request."); | ||
331 | ✗ | return false; | |
332 | } | ||
333 | |||
334 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | const JSON *path_json = JsonDocument::SearchInObject(req_json->root(), "path", |
335 | JSON_STRING); | ||
336 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | const JSON *digest_json = JsonDocument::SearchInObject(req_json->root(), |
337 | "digest", JSON_STRING); | ||
338 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 2 times.
✗ Branch 8 not taken.
|
2 | const JSON *header_size_json = JsonDocument::SearchInObject( |
339 | req_json->root(), "header_size", JSON_INT); | ||
340 | |||
341 |
3/6✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
|
2 | if (path_json == NULL || digest_json == NULL || header_size_json == NULL) { |
342 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
343 | "HandleSubmitPayload: Missing fields in request."); | ||
344 | ✗ | return false; | |
345 | } | ||
346 | |||
347 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | perf::Statistics statistics; |
348 | |||
349 |
2/4✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
|
2 | const UniquePtr<PayloadProcessor> proc(MakePayloadProcessor()); |
350 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
2 | proc->SetStatistics(&statistics); |
351 | 2 | JsonStringGenerator reply_input; | |
352 | const PayloadProcessor::Result res = | ||
353 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
4 | proc->Process(fdin, digest_json->string_value, path_json->string_value, |
354 | 2 | header_size_json->int_value); | |
355 | |||
356 |
1/5✗ Branch 0 not taken.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
|
2 | switch (res) { |
357 | ✗ | case PayloadProcessor::kPathViolation: | |
358 | ✗ | reply_input.Add("status", "error"); | |
359 | ✗ | reply_input.Add("reason", "path_violation"); | |
360 | ✗ | break; | |
361 | ✗ | case PayloadProcessor::kOtherError: | |
362 | ✗ | reply_input.Add("status", "error"); | |
363 | ✗ | reply_input.Add("reason", "other_error"); | |
364 | ✗ | break; | |
365 | ✗ | case PayloadProcessor::kUploaderError: | |
366 | ✗ | reply_input.Add("status", "error"); | |
367 | ✗ | reply_input.Add("reason", "uploader_error"); | |
368 | ✗ | break; | |
369 | 2 | case PayloadProcessor::kSuccess: | |
370 |
3/6✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
|
2 | reply_input.Add("status", "ok"); |
371 | 2 | break; | |
372 | ✗ | default: | |
373 | ✗ | PANIC(kLogSyslogErr, | |
374 | "HandleSubmitPayload: Unknown value of PayloadProcessor::Result " | ||
375 | "encountered."); | ||
376 | break; | ||
377 | } | ||
378 | |||
379 | // HandleSubmitPayload sends partial statistics back to the gateway | ||
380 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
4 | const std::string stats_json = statistics.PrintJSON(); |
381 |
2/4✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
|
2 | reply_input.AddJsonObject("statistics", stats_json); |
382 | |||
383 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | const std::string json = reply_input.GenerateString(); |
384 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | *reply = json; |
385 | |||
386 | 2 | return true; | |
387 | 2 | } | |
388 | |||
389 | ✗ | bool Reactor::HandleCommit(const std::string &req, std::string *reply) { | |
390 | ✗ | if (!reply) { | |
391 | ✗ | PANIC(kLogSyslogErr, "HandleCommit: Invalid reply pointer."); | |
392 | } | ||
393 | // Extract the Path from the request JSON. | ||
394 | ✗ | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); | |
395 | ✗ | if (!req_json.IsValid()) { | |
396 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
397 | "HandleCommit: Invalid JSON request."); | ||
398 | ✗ | return false; | |
399 | } | ||
400 | |||
401 | ✗ | const JSON *lease_path_json = JsonDocument::SearchInObject( | |
402 | req_json->root(), "lease_path", JSON_STRING); | ||
403 | ✗ | const JSON *old_root_hash_json = JsonDocument::SearchInObject( | |
404 | req_json->root(), "old_root_hash", JSON_STRING); | ||
405 | ✗ | const JSON *new_root_hash_json = JsonDocument::SearchInObject( | |
406 | req_json->root(), "new_root_hash", JSON_STRING); | ||
407 | ✗ | const JSON *tag_name_json = JsonDocument::SearchInObject( | |
408 | req_json->root(), "tag_name", JSON_STRING); | ||
409 | ✗ | const JSON *tag_description_json = JsonDocument::SearchInObject( | |
410 | req_json->root(), "tag_description", JSON_STRING); | ||
411 | |||
412 | ✗ | if (lease_path_json == NULL || old_root_hash_json == NULL | |
413 | ✗ | || new_root_hash_json == NULL) { | |
414 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
415 | "HandleCommit: Missing fields in request."); | ||
416 | ✗ | return false; | |
417 | } | ||
418 | |||
419 | ✗ | perf::Statistics statistics; | |
420 | ✗ | std::string start_time; | |
421 | ✗ | if (!Reactor::ExtractStatsFromReq(req_json.weak_ref(), &statistics, | |
422 | &start_time)) { | ||
423 | ✗ | LogCvmfs( | |
424 | kLogReceiver, kLogSyslogErr, | ||
425 | "HandleCommit: Could not extract statistics counters from request"); | ||
426 | } | ||
427 | uint64_t final_revision; | ||
428 | |||
429 | // Here we use the path to commit the changes! | ||
430 | ✗ | const UniquePtr<CommitProcessor> proc(MakeCommitProcessor()); | |
431 | ✗ | proc->SetStatistics(&statistics, start_time); | |
432 | ✗ | const shash::Any old_root_hash = shash::MkFromSuffixedHexPtr( | |
433 | ✗ | shash::HexPtr(old_root_hash_json->string_value)); | |
434 | ✗ | const shash::Any new_root_hash = shash::MkFromSuffixedHexPtr( | |
435 | ✗ | shash::HexPtr(new_root_hash_json->string_value)); | |
436 | ✗ | const RepositoryTag repo_tag(tag_name_json->string_value, | |
437 | ✗ | tag_description_json->string_value); | |
438 | const CommitProcessor::Result res = | ||
439 | ✗ | proc->Process(lease_path_json->string_value, old_root_hash, new_root_hash, | |
440 | repo_tag, &final_revision); | ||
441 | |||
442 | ✗ | JsonStringGenerator reply_input; | |
443 | ✗ | switch (res) { | |
444 | ✗ | case CommitProcessor::kSuccess: | |
445 | ✗ | reply_input.Add("status", "ok"); | |
446 | ✗ | reply_input.Add("final_revision", static_cast<int64_t>(final_revision)); | |
447 | ✗ | break; | |
448 | ✗ | case CommitProcessor::kError: | |
449 | ✗ | reply_input.Add("status", "error"); | |
450 | ✗ | reply_input.Add("reason", "miscellaneous"); | |
451 | ✗ | break; | |
452 | ✗ | case CommitProcessor::kMergeFailure: | |
453 | ✗ | reply_input.Add("status", "error"); | |
454 | ✗ | reply_input.Add("reason", "merge_error"); | |
455 | ✗ | break; | |
456 | ✗ | case CommitProcessor::kMissingReflog: | |
457 | ✗ | reply_input.Add("status", "error"); | |
458 | ✗ | reply_input.Add("reason", "missing_reflog"); | |
459 | ✗ | break; | |
460 | ✗ | default: | |
461 | ✗ | PANIC(kLogSyslogErr, | |
462 | "Unknown value of CommitProcessor::Result encountered."); | ||
463 | break; | ||
464 | } | ||
465 | |||
466 | ✗ | const std::string json = reply_input.GenerateString(); | |
467 | ✗ | *reply = json; | |
468 | |||
469 | ✗ | return true; | |
470 | } | ||
471 | |||
472 | ✗ | PayloadProcessor *Reactor::MakePayloadProcessor() { | |
473 | ✗ | return new PayloadProcessor(); | |
474 | } | ||
475 | |||
476 | ✗ | CommitProcessor *Reactor::MakeCommitProcessor() { | |
477 | ✗ | return new CommitProcessor(); | |
478 | } | ||
479 | |||
480 | 18 | bool Reactor::HandleRequest(Request req, const std::string &data) { | |
481 | 18 | bool ok = true; | |
482 | 18 | std::string reply; | |
483 | try { | ||
484 |
6/10✓ Branch 0 taken 6 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 2 times.
✓ Branch 5 taken 2 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
|
18 | switch (req) { |
485 | 6 | case kQuit: | |
486 |
2/4✓ Branch 2 taken 6 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 6 times.
✗ Branch 6 not taken.
|
6 | ok = WriteReply(fdout_, "ok"); |
487 | 6 | break; | |
488 | 2 | case kEcho: | |
489 |
4/8✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 2 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 2 times.
✗ Branch 13 not taken.
|
2 | ok = WriteReply(fdout_, std::string("PID: ") + StringifyUint(getpid())); |
490 | 2 | break; | |
491 | 4 | case kGenerateToken: | |
492 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | ok &= HandleGenerateToken(data, &reply); |
493 |
1/2✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
|
4 | ok &= WriteReply(fdout_, reply); |
494 | 4 | break; | |
495 | 2 | case kGetTokenId: | |
496 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= HandleGetTokenId(data, &reply); |
497 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= WriteReply(fdout_, reply); |
498 | 2 | break; | |
499 | 2 | case kCheckToken: | |
500 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= HandleCheckToken(data, &reply); |
501 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= WriteReply(fdout_, reply); |
502 | 2 | break; | |
503 | 2 | case kSubmitPayload: | |
504 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= HandleSubmitPayload(fdin_, data, &reply); |
505 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | ok &= WriteReply(fdout_, reply); |
506 | 2 | break; | |
507 | ✗ | case kCommit: | |
508 | ✗ | ok &= HandleCommit(data, &reply); | |
509 | ✗ | ok &= WriteReply(fdout_, reply); | |
510 | ✗ | break; | |
511 | ✗ | case kTestCrash: | |
512 | ✗ | PANIC(kLogSyslogErr, | |
513 | "Crash for test purposes. Should never happen in production " | ||
514 | "environment."); | ||
515 | break; | ||
516 | ✗ | case kError: | |
517 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
518 | "Reactor: unknown command received."); | ||
519 | ✗ | ok = false; | |
520 | ✗ | break; | |
521 | ✗ | default: | |
522 | ✗ | break; | |
523 | } | ||
524 | ✗ | } catch (const ECvmfsException &e) { | |
525 | ✗ | reply.clear(); | |
526 | |||
527 | ✗ | std::string error("runtime error: "); | |
528 | ✗ | error += e.what(); | |
529 | |||
530 | ✗ | JsonStringGenerator input; | |
531 | ✗ | input.Add("status", "error"); | |
532 | ✗ | input.Add("reason", error); | |
533 | |||
534 | ✗ | reply = input.GenerateString(); | |
535 | ✗ | WriteReply(fdout_, reply); | |
536 | ✗ | throw e; | |
537 | } | ||
538 | |||
539 | 18 | return ok; | |
540 | 18 | } | |
541 | |||
542 | } // namespace receiver | ||
543 |