GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/receiver/reactor.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 215 341 63.0%
Branches: 201 630 31.9%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #include "reactor.h"
6
7 #include <stdint.h>
8 #include <unistd.h>
9
10 #include <cstdlib>
11 #include <cstring>
12 #include <utility>
13 #include <vector>
14
15 #include "commit_processor.h"
16 #include "json_document_write.h"
17 #include "payload_processor.h"
18 #include "repository_tag.h"
19 #include "session_token.h"
20 #include "upload_facility.h"
21 #include "util/exception.h"
22 #include "util/logging.h"
23 #include "util/pointer.h"
24 #include "util/posix.h"
25 #include "util/string.h"
26
27 namespace receiver {
28
29 // NOTE, during the handling of the messages between the gateway and the
30 // receiver, we keep reading `4` bytes instead of the more common
31 // `sizeof(req_id)` or `sizeof(int32_t)`.
32 // This mirror well the behaviour of the gateway code.
33 // It would be possible on both codebase to ask the size of the type, but then
34 // we would need to make sure that the types are actually the same.
35 // It is simpler to send `4` bytes.
36
37 99 Reactor::Request Reactor::ReadRequest(int fd, std::string *data) {
38 using namespace receiver; // NOLINT
39
40 // First, read the command identifier
41 99 int32_t req_id = kQuit;
42
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 int nb = SafeRead(fd, &req_id, 4);
43
44
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if (nb != 4) {
45 return kError;
46 }
47
48 // Then, read message size
49 99 int32_t msg_size = 0;
50
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 nb = SafeRead(fd, &msg_size, 4);
51
52
2/4
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 99 times.
99 if (req_id == kError || nb != 4) {
53 return kError;
54 }
55
56 // Finally read the message body
57
2/2
✓ Branch 0 taken 63 times.
✓ Branch 1 taken 36 times.
99 if (msg_size > 0) {
58
1/2
✓ Branch 2 taken 63 times.
✗ Branch 3 not taken.
63 std::vector<char> buffer(msg_size);
59
1/2
✓ Branch 2 taken 63 times.
✗ Branch 3 not taken.
63 nb = SafeRead(fd, &buffer[0], msg_size);
60
61
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 63 times.
63 if (nb != msg_size) {
62 return kError;
63 }
64
65
1/2
✓ Branch 3 taken 63 times.
✗ Branch 4 not taken.
63 *data = std::string(&buffer[0], msg_size);
66
1/2
✓ Branch 1 taken 63 times.
✗ Branch 2 not taken.
63 }
67
68 99 return static_cast<Request>(req_id);
69 }
70
71 99 bool Reactor::WriteRequest(int fd, Request req, const std::string &data) {
72 99 const int32_t msg_size = data.size();
73 99 const int32_t total_size = 8 + data.size(); // req + msg_size + data
74
75
1/2
✓ Branch 2 taken 99 times.
✗ Branch 3 not taken.
99 std::vector<char> buffer(total_size);
76
77 99 memcpy(&buffer[0], &req, 4);
78 99 memcpy(&buffer[4], &msg_size, 4);
79
80
2/2
✓ Branch 1 taken 63 times.
✓ Branch 2 taken 36 times.
99 if (!data.empty()) {
81 63 memcpy(&buffer[8], &data[0], data.size());
82 }
83
84
1/2
✓ Branch 2 taken 99 times.
✗ Branch 3 not taken.
198 return SafeWrite(fd, &buffer[0], total_size);
85 99 }
86
87 99 bool Reactor::ReadReply(int fd, std::string *data) {
88 99 int32_t msg_size(0);
89
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 int nb = SafeRead(fd, &msg_size, 4);
90
91
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if (nb != 4) {
92 return false;
93 }
94
95
1/2
✓ Branch 2 taken 99 times.
✗ Branch 3 not taken.
99 std::vector<char> buffer(msg_size);
96
1/2
✓ Branch 2 taken 99 times.
✗ Branch 3 not taken.
99 nb = SafeRead(fd, &buffer[0], msg_size);
97
98
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if (nb != msg_size) {
99 return false;
100 }
101
102
1/2
✓ Branch 3 taken 99 times.
✗ Branch 4 not taken.
99 *data = std::string(&buffer[0], msg_size);
103
104 99 return true;
105 99 }
106
107 99 bool Reactor::WriteReply(int fd, const std::string &data) {
108 99 const int32_t msg_size = data.size();
109 99 const int32_t total_size = 4 + data.size();
110
111
1/2
✓ Branch 2 taken 99 times.
✗ Branch 3 not taken.
99 std::vector<char> buffer(total_size);
112
113 99 memcpy(&buffer[0], &msg_size, 4);
114
115
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 if (!data.empty()) {
116 99 memcpy(&buffer[4], &data[0], data.size());
117 }
118
119
1/2
✓ Branch 2 taken 99 times.
✗ Branch 3 not taken.
198 return SafeWrite(fd, &buffer[0], total_size);
120 99 }
121
122 9 bool Reactor::ExtractStatsFromReq(JsonDocument *req, perf::Statistics *stats,
123 std::string *start_time) {
124
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9 times.
✗ Branch 6 not taken.
18 const perf::StatisticsTemplate stats_tmpl("publish", stats);
125
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const upload::UploadCounters counters(stats_tmpl);
126
127
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
9 const JSON *statistics = JsonDocument::SearchInObject(
128 req->root(), "statistics", JSON_OBJECT);
129
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (statistics == NULL) {
130
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 LogCvmfs(kLogReceiver, kLogSyslogErr,
131 "Could not find 'statistics' field in request");
132 9 return false;
133 }
134
135 const JSON *publish_ctrs = JsonDocument::SearchInObject(statistics, "publish",
136 JSON_OBJECT);
137
138 if (publish_ctrs == NULL) {
139 LogCvmfs(kLogReceiver, kLogSyslogErr,
140 "Could not find 'statistics.publish' field in request");
141 return false;
142 }
143
144 const JSON *n_chunks_added = JsonDocument::SearchInObject(
145 publish_ctrs, "n_chunks_added", JSON_INT);
146 const JSON *n_chunks_duplicated = JsonDocument::SearchInObject(
147 publish_ctrs, "n_chunks_duplicated", JSON_INT);
148 const JSON *n_catalogs_added = JsonDocument::SearchInObject(
149 publish_ctrs, "n_catalogs_added", JSON_INT);
150 const JSON *sz_uploaded_bytes = JsonDocument::SearchInObject(
151 publish_ctrs, "sz_uploaded_bytes", JSON_INT);
152 const JSON *sz_uploaded_catalog_bytes = JsonDocument::SearchInObject(
153 publish_ctrs, "sz_uploaded_catalog_bytes", JSON_INT);
154
155 const JSON *start_time_json = JsonDocument::SearchInObject(
156 statistics, "start_time", JSON_STRING);
157
158 if (n_chunks_added == NULL || n_chunks_duplicated == NULL
159 || n_catalogs_added == NULL || sz_uploaded_bytes == NULL
160 || sz_uploaded_catalog_bytes == NULL || start_time_json == NULL) {
161 return false;
162 }
163
164 perf::Xadd(counters.n_chunks_added, n_chunks_added->get<int>());
165 perf::Xadd(counters.n_chunks_duplicated, n_chunks_duplicated->get<int>());
166 perf::Xadd(counters.n_catalogs_added, n_catalogs_added->get<int>());
167 perf::Xadd(counters.sz_uploaded_bytes, sz_uploaded_bytes->get<int>());
168 perf::Xadd(counters.sz_uploaded_catalog_bytes,
169 sz_uploaded_catalog_bytes->get<int>());
170
171 *start_time = start_time_json->get<std::string>();
172
173 return true;
174 9 }
175
176 36 Reactor::Reactor(int fdin, int fdout) : fdin_(fdin), fdout_(fdout) { }
177
178 72 Reactor::~Reactor() { }
179
180 36 bool Reactor::Run() {
181 36 std::string msg_body;
182 36 Request req = kQuit;
183 do {
184 99 msg_body.clear();
185
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 req = ReadRequest(fdin_, &msg_body);
186
2/4
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 99 times.
99 if (!HandleRequest(req, msg_body)) {
187 LogCvmfs(kLogReceiver, kLogSyslogErr,
188 "Reactor: could not handle request %d. Exiting", req);
189 return false;
190 }
191
2/2
✓ Branch 0 taken 63 times.
✓ Branch 1 taken 36 times.
99 } while (req != kQuit);
192
193 36 return true;
194 36 }
195
196 18 bool Reactor::HandleGenerateToken(const std::string &req, std::string *reply) {
197
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 18 times.
18 if (reply == NULL) {
198 PANIC(kLogSyslogErr, "HandleGenerateToken: Invalid reply pointer.");
199 }
200
2/4
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18 times.
✗ Branch 5 not taken.
18 const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
201
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18 times.
18 if (!req_json.IsValid()) {
202 LogCvmfs(kLogReceiver, kLogSyslogErr,
203 "HandleGenerateToken: Invalid JSON request.");
204 return false;
205 }
206
207
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 18 times.
✗ Branch 8 not taken.
18 const JSON *key_id = JsonDocument::SearchInObject(req_json->root(), "key_id",
208 JSON_STRING);
209
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 18 times.
✗ Branch 8 not taken.
18 const JSON *path = JsonDocument::SearchInObject(req_json->root(), "path",
210 JSON_STRING);
211
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 18 times.
✗ Branch 8 not taken.
18 const JSON *max_lease_time = JsonDocument::SearchInObject(
212 req_json->root(), "max_lease_time", JSON_INT);
213
214
3/6
✓ Branch 0 taken 18 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 18 times.
18 if (key_id == NULL || path == NULL || max_lease_time == NULL) {
215 LogCvmfs(kLogReceiver, kLogSyslogErr,
216 "HandleGenerateToken: Missing fields in request.");
217 return false;
218 }
219
220 18 std::string session_token;
221 18 std::string public_token_id;
222 18 std::string token_secret;
223
224
2/4
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18 times.
✗ Branch 5 not taken.
18 if (!GenerateSessionToken(key_id->get<std::string>(),
225
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 18 times.
18 path->get<std::string>(),
226
2/4
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 18 times.
✗ Branch 5 not taken.
18 max_lease_time->get<int>(), &session_token,
227 &public_token_id, &token_secret)) {
228 LogCvmfs(kLogReceiver, kLogSyslogErr,
229 "HandleGenerateToken: Could not generate session token.");
230 return false;
231 }
232
233 18 JsonStringGenerator input;
234
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18 times.
✗ Branch 6 not taken.
18 input.Add("token", session_token);
235
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18 times.
✗ Branch 6 not taken.
18 input.Add("id", public_token_id);
236
2/4
✓ Branch 2 taken 18 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 18 times.
✗ Branch 6 not taken.
18 input.Add("secret", token_secret);
237
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 const std::string json = input.GenerateString();
238
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 *reply = json;
239
240 18 return true;
241 18 }
242
243 9 bool Reactor::HandleGetTokenId(const std::string &req, std::string *reply) {
244
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (reply == NULL) {
245 PANIC(kLogSyslogErr, "HandleGetTokenId: Invalid reply pointer.");
246 }
247
248 9 std::string token_id;
249 9 JsonStringGenerator input;
250
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 9 times.
9 if (!GetTokenPublicId(req, &token_id)) {
251 input.Add("status", "error");
252 input.Add("reason", "invalid_token");
253 } else {
254
3/6
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 9 times.
✗ Branch 10 not taken.
9 input.Add("status", "ok");
255
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9 times.
✗ Branch 6 not taken.
9 input.Add("id", token_id);
256 }
257
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 const std::string json = input.GenerateString();
258
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 *reply = json;
259
260 9 return true;
261 9 }
262
263 9 bool Reactor::HandleCheckToken(const std::string &req, std::string *reply) {
264
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (reply == NULL) {
265 PANIC(kLogSyslogErr, "HandleCheckToken: Invalid reply pointer.");
266 }
267
268
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
269
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
9 if (!req_json.IsValid()) {
270 LogCvmfs(kLogReceiver, kLogSyslogErr,
271 "HandleCheckToken: Invalid JSON request.");
272 return false;
273 }
274
275
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *token = JsonDocument::SearchInObject(req_json->root(), "token",
276 JSON_STRING);
277
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *secret = JsonDocument::SearchInObject(req_json->root(), "secret",
278 JSON_STRING);
279
280
2/4
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
9 if (token == NULL || secret == NULL) {
281 LogCvmfs(kLogReceiver, kLogSyslogErr,
282 "HandleCheckToken: Missing fields in request.");
283 return false;
284 }
285
286 9 std::string path;
287 9 JsonStringGenerator input;
288
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const TokenCheckResult ret = CheckToken(token->get<std::string>(),
289
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 secret->get<std::string>(), &path);
290
1/4
✗ Branch 0 not taken.
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 switch (ret) {
291 case kExpired:
292 // Expired token
293 input.Add("status", "error");
294 input.Add("reason", "expired_token");
295 break;
296 case kInvalid:
297 // Invalid token
298 input.Add("status", "error");
299 input.Add("reason", "invalid_token");
300 break;
301 9 case kValid:
302 // All ok
303
3/6
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 9 times.
✗ Branch 10 not taken.
9 input.Add("status", "ok");
304
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9 times.
✗ Branch 6 not taken.
9 input.Add("path", path);
305 9 break;
306 default:
307 // Should not be reached
308 PANIC(kLogSyslogErr,
309 "HandleCheckToken: Unknown value received. Exiting.");
310 }
311
312
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 const std::string json = input.GenerateString();
313
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 *reply = json;
314
315 9 return true;
316 9 }
317
318 // This is a special handler. We need to continue reading the payload from the
319 // fdin_
320 9 bool Reactor::HandleSubmitPayload(int fdin, const std::string &req,
321 std::string *reply) {
322
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (!reply) {
323 PANIC(kLogSyslogErr, "HandleSubmitPayload: Invalid reply pointer.");
324 }
325
326 // Extract the Path (used for verification), Digest and DigestSize from the
327 // request JSON.
328
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
329
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
9 if (!req_json.IsValid()) {
330 LogCvmfs(kLogReceiver, kLogSyslogErr,
331 "HandleSubmitPayload: Invalid JSON request.");
332 return false;
333 }
334
335
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *path_json = JsonDocument::SearchInObject(req_json->root(), "path",
336 JSON_STRING);
337
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *digest_json = JsonDocument::SearchInObject(req_json->root(),
338 "digest", JSON_STRING);
339
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *header_size_json = JsonDocument::SearchInObject(
340 req_json->root(), "header_size", JSON_INT);
341
342
3/6
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 9 times.
9 if (path_json == NULL || digest_json == NULL || header_size_json == NULL) {
343 LogCvmfs(kLogReceiver, kLogSyslogErr,
344 "HandleSubmitPayload: Missing fields in request.");
345 return false;
346 }
347
348
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 perf::Statistics statistics;
349
350
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const UniquePtr<PayloadProcessor> proc(MakePayloadProcessor());
351
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 proc->SetStatistics(&statistics);
352 9 JsonStringGenerator reply_input;
353
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
18 const PayloadProcessor::Result res = proc->Process(
354
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 fdin, digest_json->get<std::string>(), path_json->get<std::string>(),
355
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 header_size_json->get<int>());
356
357
1/5
✗ Branch 0 not taken.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
✗ Branch 4 not taken.
9 switch (res) {
358 case PayloadProcessor::kPathViolation:
359 reply_input.Add("status", "error");
360 reply_input.Add("reason", "path_violation");
361 break;
362 case PayloadProcessor::kOtherError:
363 reply_input.Add("status", "error");
364 reply_input.Add("reason", "other_error");
365 break;
366 case PayloadProcessor::kUploaderError:
367 reply_input.Add("status", "error");
368 reply_input.Add("reason", "uploader_error");
369 break;
370 9 case PayloadProcessor::kSuccess:
371
3/6
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 9 times.
✗ Branch 10 not taken.
9 reply_input.Add("status", "ok");
372 9 break;
373 default:
374 PANIC(kLogSyslogErr,
375 "HandleSubmitPayload: Unknown value of PayloadProcessor::Result "
376 "encountered.");
377 break;
378 }
379
380 // HandleSubmitPayload sends partial statistics back to the gateway
381
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 const std::string stats_json = statistics.PrintJSON();
382
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9 times.
✗ Branch 6 not taken.
9 reply_input.AddJsonObject("statistics", stats_json);
383
384
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 const std::string json = reply_input.GenerateString();
385
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 *reply = json;
386
387 9 return true;
388 9 }
389
390 9 bool Reactor::HandleCommit(const std::string &req, std::string *reply) {
391 9 return DoCommit(req, reply, /* direct_graft = */ false);
392 }
393
394 bool Reactor::HandleCommitGraft(const std::string &req, std::string *reply) {
395 return DoCommit(req, reply, /* direct_graft = */ true);
396 }
397
398 // Shared body for kCommit and the experimental kCommitGraft. The request
399 // payload is identical for both; direct_graft selects the DirectGraft fast path
400 // in CommitProcessor.
401 9 bool Reactor::DoCommit(const std::string &req, std::string *reply,
402 bool direct_graft) {
403
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (!reply) {
404 PANIC(kLogSyslogErr, "HandleCommit: Invalid reply pointer.");
405 }
406 // Extract the Path from the request JSON.
407
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
408
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
9 if (!req_json.IsValid()) {
409 LogCvmfs(kLogReceiver, kLogSyslogErr,
410 "HandleCommit: Invalid JSON request.");
411 return false;
412 }
413
414
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *lease_path_json = JsonDocument::SearchInObject(
415 req_json->root(), "lease_path", JSON_STRING);
416
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *old_root_hash_json = JsonDocument::SearchInObject(
417 req_json->root(), "old_root_hash", JSON_STRING);
418
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *new_root_hash_json = JsonDocument::SearchInObject(
419 req_json->root(), "new_root_hash", JSON_STRING);
420
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *tag_name_json = JsonDocument::SearchInObject(
421 req_json->root(), "tag_name", JSON_STRING);
422
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *tag_description_json = JsonDocument::SearchInObject(
423 req_json->root(), "tag_description", JSON_STRING);
424
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *auto_tag_threshold_json = JsonDocument::SearchInObject(
425 req_json->root(), "auto_tag_threshold", JSON_INT);
426
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *delete_tags_json = JsonDocument::SearchInObject(
427 req_json->root(), "delete_tags", JSON_STRING);
428
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 9 times.
✗ Branch 8 not taken.
9 const JSON *lease_expiration_json = JsonDocument::SearchInObject(
429 req_json->root(), "lease_expiration", JSON_INT);
430
431
2/4
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 if (lease_path_json == NULL || old_root_hash_json == NULL
432
2/4
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 9 times.
9 || new_root_hash_json == NULL || lease_expiration_json == NULL) {
433 LogCvmfs(kLogReceiver, kLogSyslogErr,
434 "HandleCommit: Missing fields in request.");
435 return false;
436 }
437
438
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 perf::Statistics statistics;
439 9 std::string start_time;
440
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 if (!Reactor::ExtractStatsFromReq(req_json.weak_ref(), &statistics,
441 &start_time)) {
442
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 LogCvmfs(
443 kLogReceiver, kLogSyslogErr,
444 "HandleCommit: Could not extract statistics counters from request");
445 }
446 uint64_t final_revision;
447
448 // Here we use the path to commit the changes!
449
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 const UniquePtr<CommitProcessor> proc(MakeCommitProcessor());
450
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 proc->SetStatistics(&statistics, start_time);
451
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 const shash::Any old_root_hash = shash::MkFromSuffixedHexPtr(
452
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 shash::HexPtr(old_root_hash_json->get<std::string>()));
453
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 const shash::Any new_root_hash = shash::MkFromSuffixedHexPtr(
454
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 shash::HexPtr(new_root_hash_json->get<std::string>()));
455
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 RepositoryTag repo_tag(tag_name_json->get<std::string>(),
456
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
18 tag_description_json->get<std::string>());
457
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (auto_tag_threshold_json != NULL) {
458 repo_tag.SetAutoTagThreshold(
459 static_cast<time_t>(auto_tag_threshold_json->get<int64_t>()));
460 }
461
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (delete_tags_json != NULL) {
462
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 9 times.
✗ Branch 5 not taken.
9 repo_tag.SetDeleteTags(delete_tags_json->get<std::string>());
463 }
464
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9 times.
✗ Branch 6 not taken.
18 const CommitProcessor::Result res = proc->Process(
465
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
18 lease_path_json->get<std::string>(), old_root_hash, new_root_hash,
466 repo_tag, lease_expiration_json->get<int64_t>(), &final_revision,
467 direct_graft);
468
469 9 JsonStringGenerator reply_input;
470
1/6
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
9 switch (res) {
471 9 case CommitProcessor::kSuccess:
472
3/6
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 9 times.
✗ Branch 10 not taken.
9 reply_input.Add("status", "ok");
473
2/4
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 9 times.
✗ Branch 6 not taken.
9 reply_input.Add("final_revision", static_cast<int64_t>(final_revision));
474 9 break;
475 case CommitProcessor::kError:
476 reply_input.Add("status", "error");
477 reply_input.Add("reason", "miscellaneous");
478 break;
479 case CommitProcessor::kMergeFailure:
480 reply_input.Add("status", "error");
481 reply_input.Add("reason", "merge_error");
482 break;
483 case CommitProcessor::kMissingReflog:
484 reply_input.Add("status", "error");
485 reply_input.Add("reason", "missing_reflog");
486 break;
487 case CommitProcessor::kLeaseExpired:
488 reply_input.Add("status", "error");
489 reply_input.Add("reason", "lease_expired");
490 break;
491 default:
492 PANIC(kLogSyslogErr,
493 "Unknown value of CommitProcessor::Result encountered.");
494 break;
495 }
496
497
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 const std::string json = reply_input.GenerateString();
498
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 *reply = json;
499
500 9 return true;
501 9 }
502
503 PayloadProcessor *Reactor::MakePayloadProcessor() {
504 return new PayloadProcessor();
505 }
506
507 CommitProcessor *Reactor::MakeCommitProcessor() {
508 return new CommitProcessor();
509 }
510
511 99 bool Reactor::HandleRequest(Request req, const std::string &data) {
512 99 bool ok = true;
513 99 std::string reply;
514 try {
515
7/11
✓ Branch 0 taken 36 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 18 times.
✓ Branch 3 taken 9 times.
✓ Branch 4 taken 9 times.
✓ Branch 5 taken 9 times.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
99 switch (req) {
516 36 case kQuit:
517
2/4
✓ Branch 2 taken 36 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 36 times.
✗ Branch 6 not taken.
36 ok = WriteReply(fdout_, "ok");
518 36 break;
519 9 case kEcho:
520
4/8
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 9 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 9 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 9 times.
✗ Branch 13 not taken.
9 ok = WriteReply(fdout_, std::string("PID: ") + StringifyUint(getpid()));
521 9 break;
522 18 case kGenerateToken:
523
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 ok &= HandleGenerateToken(data, &reply);
524
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 ok &= WriteReply(fdout_, reply);
525 18 break;
526 9 case kGetTokenId:
527
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= HandleGetTokenId(data, &reply);
528
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= WriteReply(fdout_, reply);
529 9 break;
530 9 case kCheckToken:
531
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= HandleCheckToken(data, &reply);
532
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= WriteReply(fdout_, reply);
533 9 break;
534 9 case kSubmitPayload:
535
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= HandleSubmitPayload(fdin_, data, &reply);
536
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= WriteReply(fdout_, reply);
537 9 break;
538 9 case kCommit:
539
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= HandleCommit(data, &reply);
540
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 ok &= WriteReply(fdout_, reply);
541 9 break;
542 case kCommitGraft:
543 ok &= HandleCommitGraft(data, &reply);
544 ok &= WriteReply(fdout_, reply);
545 break;
546 case kTestCrash:
547 PANIC(kLogSyslogErr,
548 "Crash for test purposes. Should never happen in production "
549 "environment.");
550 break;
551 case kError:
552 LogCvmfs(kLogReceiver, kLogSyslogErr,
553 "Reactor: unknown command received.");
554 ok = false;
555 break;
556 default:
557 break;
558 }
559 } catch (const ECvmfsException &e) {
560 reply.clear();
561
562 std::string error("runtime error: ");
563 error += e.what();
564
565 JsonStringGenerator input;
566 input.Add("status", "error");
567 input.Add("reason", error);
568
569 reply = input.GenerateString();
570 WriteReply(fdout_, reply);
571 throw e;
572 }
573
574 99 return ok;
575 99 }
576
577 } // namespace receiver
578