| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/receiver/reactor.cc |
| Date: | 2025-11-09 02:35:23 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 159 | 318 | 50.0% |
| Branches: | 135 | 572 | 23.6% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #include "reactor.h" | ||
| 6 | |||
| 7 | #include <stdint.h> | ||
| 8 | #include <unistd.h> | ||
| 9 | |||
| 10 | #include <cstdlib> | ||
| 11 | #include <cstring> | ||
| 12 | #include <utility> | ||
| 13 | #include <vector> | ||
| 14 | |||
| 15 | #include "commit_processor.h" | ||
| 16 | #include "json_document_write.h" | ||
| 17 | #include "payload_processor.h" | ||
| 18 | #include "repository_tag.h" | ||
| 19 | #include "session_token.h" | ||
| 20 | #include "upload_facility.h" | ||
| 21 | #include "util/exception.h" | ||
| 22 | #include "util/logging.h" | ||
| 23 | #include "util/pointer.h" | ||
| 24 | #include "util/posix.h" | ||
| 25 | #include "util/string.h" | ||
| 26 | |||
| 27 | namespace receiver { | ||
| 28 | |||
| 29 | // NOTE, during the handling of the messages between the gateway and the | ||
| 30 | // receiver, we keep reading `4` bytes instead of the more common | ||
| 31 | // `sizeof(req_id)` or `sizeof(int32_t)`. | ||
| 32 | // This mirror well the behaviour of the gateway code. | ||
| 33 | // It would be possible on both codebase to ask the size of the type, but then | ||
| 34 | // we would need to make sure that the types are actually the same. | ||
| 35 | // It is simpler to send `4` bytes. | ||
| 36 | |||
| 37 | 180 | Reactor::Request Reactor::ReadRequest(int fd, std::string *data) { | |
| 38 | using namespace receiver; // NOLINT | ||
| 39 | |||
| 40 | // First, read the command identifier | ||
| 41 | 180 | int32_t req_id = kQuit; | |
| 42 |
1/2✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
|
180 | int nb = SafeRead(fd, &req_id, 4); |
| 43 | |||
| 44 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
|
180 | if (nb != 4) { |
| 45 | ✗ | return kError; | |
| 46 | } | ||
| 47 | |||
| 48 | // Then, read message size | ||
| 49 | 180 | int32_t msg_size = 0; | |
| 50 |
1/2✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
|
180 | nb = SafeRead(fd, &msg_size, 4); |
| 51 | |||
| 52 |
2/4✓ Branch 0 taken 180 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 180 times.
|
180 | if (req_id == kError || nb != 4) { |
| 53 | ✗ | return kError; | |
| 54 | } | ||
| 55 | |||
| 56 | // Finally read the message body | ||
| 57 |
2/2✓ Branch 0 taken 120 times.
✓ Branch 1 taken 60 times.
|
180 | if (msg_size > 0) { |
| 58 |
1/2✓ Branch 2 taken 120 times.
✗ Branch 3 not taken.
|
120 | std::vector<char> buffer(msg_size); |
| 59 |
1/2✓ Branch 2 taken 120 times.
✗ Branch 3 not taken.
|
120 | nb = SafeRead(fd, &buffer[0], msg_size); |
| 60 | |||
| 61 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 120 times.
|
120 | if (nb != msg_size) { |
| 62 | ✗ | return kError; | |
| 63 | } | ||
| 64 | |||
| 65 |
1/2✓ Branch 3 taken 120 times.
✗ Branch 4 not taken.
|
120 | *data = std::string(&buffer[0], msg_size); |
| 66 |
1/2✓ Branch 1 taken 120 times.
✗ Branch 2 not taken.
|
120 | } |
| 67 | |||
| 68 | 180 | return static_cast<Request>(req_id); | |
| 69 | } | ||
| 70 | |||
| 71 | 180 | bool Reactor::WriteRequest(int fd, Request req, const std::string &data) { | |
| 72 | 180 | const int32_t msg_size = data.size(); | |
| 73 | 180 | const int32_t total_size = 8 + data.size(); // req + msg_size + data | |
| 74 | |||
| 75 |
1/2✓ Branch 2 taken 180 times.
✗ Branch 3 not taken.
|
180 | std::vector<char> buffer(total_size); |
| 76 | |||
| 77 | 180 | memcpy(&buffer[0], &req, 4); | |
| 78 | 180 | memcpy(&buffer[4], &msg_size, 4); | |
| 79 | |||
| 80 |
2/2✓ Branch 1 taken 120 times.
✓ Branch 2 taken 60 times.
|
180 | if (!data.empty()) { |
| 81 | 120 | memcpy(&buffer[8], &data[0], data.size()); | |
| 82 | } | ||
| 83 | |||
| 84 |
1/2✓ Branch 2 taken 180 times.
✗ Branch 3 not taken.
|
360 | return SafeWrite(fd, &buffer[0], total_size); |
| 85 | 180 | } | |
| 86 | |||
| 87 | 180 | bool Reactor::ReadReply(int fd, std::string *data) { | |
| 88 | 180 | int32_t msg_size(0); | |
| 89 |
1/2✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
|
180 | int nb = SafeRead(fd, &msg_size, 4); |
| 90 | |||
| 91 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
|
180 | if (nb != 4) { |
| 92 | ✗ | return false; | |
| 93 | } | ||
| 94 | |||
| 95 |
1/2✓ Branch 2 taken 180 times.
✗ Branch 3 not taken.
|
180 | std::vector<char> buffer(msg_size); |
| 96 |
1/2✓ Branch 2 taken 180 times.
✗ Branch 3 not taken.
|
180 | nb = SafeRead(fd, &buffer[0], msg_size); |
| 97 | |||
| 98 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 180 times.
|
180 | if (nb != msg_size) { |
| 99 | ✗ | return false; | |
| 100 | } | ||
| 101 | |||
| 102 |
1/2✓ Branch 3 taken 180 times.
✗ Branch 4 not taken.
|
180 | *data = std::string(&buffer[0], msg_size); |
| 103 | |||
| 104 | 180 | return true; | |
| 105 | 180 | } | |
| 106 | |||
| 107 | 180 | bool Reactor::WriteReply(int fd, const std::string &data) { | |
| 108 | 180 | const int32_t msg_size = data.size(); | |
| 109 | 180 | const int32_t total_size = 4 + data.size(); | |
| 110 | |||
| 111 |
1/2✓ Branch 2 taken 180 times.
✗ Branch 3 not taken.
|
180 | std::vector<char> buffer(total_size); |
| 112 | |||
| 113 | 180 | memcpy(&buffer[0], &msg_size, 4); | |
| 114 | |||
| 115 |
1/2✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
|
180 | if (!data.empty()) { |
| 116 | 180 | memcpy(&buffer[4], &data[0], data.size()); | |
| 117 | } | ||
| 118 | |||
| 119 |
1/2✓ Branch 2 taken 180 times.
✗ Branch 3 not taken.
|
360 | return SafeWrite(fd, &buffer[0], total_size); |
| 120 | 180 | } | |
| 121 | |||
| 122 | ✗ | bool Reactor::ExtractStatsFromReq(JsonDocument *req, perf::Statistics *stats, | |
| 123 | std::string *start_time) { | ||
| 124 | ✗ | const perf::StatisticsTemplate stats_tmpl("publish", stats); | |
| 125 | ✗ | const upload::UploadCounters counters(stats_tmpl); | |
| 126 | |||
| 127 | ✗ | const JSON *statistics = JsonDocument::SearchInObject( | |
| 128 | req->root(), "statistics", JSON_OBJECT); | ||
| 129 | ✗ | if (statistics == NULL) { | |
| 130 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 131 | "Could not find 'statistics' field in request"); | ||
| 132 | ✗ | return false; | |
| 133 | } | ||
| 134 | |||
| 135 | ✗ | const JSON *publish_ctrs = JsonDocument::SearchInObject(statistics, "publish", | |
| 136 | JSON_OBJECT); | ||
| 137 | |||
| 138 | ✗ | if (publish_ctrs == NULL) { | |
| 139 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 140 | "Could not find 'statistics.publish' field in request"); | ||
| 141 | ✗ | return false; | |
| 142 | } | ||
| 143 | |||
| 144 | ✗ | const JSON *n_chunks_added = JsonDocument::SearchInObject( | |
| 145 | publish_ctrs, "n_chunks_added", JSON_INT); | ||
| 146 | ✗ | const JSON *n_chunks_duplicated = JsonDocument::SearchInObject( | |
| 147 | publish_ctrs, "n_chunks_duplicated", JSON_INT); | ||
| 148 | ✗ | const JSON *n_catalogs_added = JsonDocument::SearchInObject( | |
| 149 | publish_ctrs, "n_catalogs_added", JSON_INT); | ||
| 150 | ✗ | const JSON *sz_uploaded_bytes = JsonDocument::SearchInObject( | |
| 151 | publish_ctrs, "sz_uploaded_bytes", JSON_INT); | ||
| 152 | ✗ | const JSON *sz_uploaded_catalog_bytes = JsonDocument::SearchInObject( | |
| 153 | publish_ctrs, "sz_uploaded_catalog_bytes", JSON_INT); | ||
| 154 | |||
| 155 | ✗ | const JSON *start_time_json = JsonDocument::SearchInObject( | |
| 156 | statistics, "start_time", JSON_STRING); | ||
| 157 | |||
| 158 | ✗ | if (n_chunks_added == NULL || n_chunks_duplicated == NULL | |
| 159 | ✗ | || n_catalogs_added == NULL || sz_uploaded_bytes == NULL | |
| 160 | ✗ | || sz_uploaded_catalog_bytes == NULL || start_time_json == NULL) { | |
| 161 | ✗ | return false; | |
| 162 | } | ||
| 163 | |||
| 164 | ✗ | perf::Xadd(counters.n_chunks_added, n_chunks_added->int_value); | |
| 165 | ✗ | perf::Xadd(counters.n_chunks_duplicated, n_chunks_duplicated->int_value); | |
| 166 | ✗ | perf::Xadd(counters.n_catalogs_added, n_catalogs_added->int_value); | |
| 167 | ✗ | perf::Xadd(counters.sz_uploaded_bytes, sz_uploaded_bytes->int_value); | |
| 168 | ✗ | perf::Xadd(counters.sz_uploaded_catalog_bytes, | |
| 169 | ✗ | sz_uploaded_catalog_bytes->int_value); | |
| 170 | |||
| 171 | ✗ | *start_time = start_time_json->string_value; | |
| 172 | |||
| 173 | ✗ | return true; | |
| 174 | } | ||
| 175 | |||
| 176 | 60 | Reactor::Reactor(int fdin, int fdout) : fdin_(fdin), fdout_(fdout) { } | |
| 177 | |||
| 178 | 120 | Reactor::~Reactor() { } | |
| 179 | |||
| 180 | 60 | bool Reactor::Run() { | |
| 181 | 60 | std::string msg_body; | |
| 182 | 60 | Request req = kQuit; | |
| 183 | do { | ||
| 184 | 180 | msg_body.clear(); | |
| 185 |
1/2✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
|
180 | req = ReadRequest(fdin_, &msg_body); |
| 186 |
2/4✓ Branch 1 taken 180 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 180 times.
|
180 | if (!HandleRequest(req, msg_body)) { |
| 187 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 188 | "Reactor: could not handle request %d. Exiting", req); | ||
| 189 | ✗ | return false; | |
| 190 | } | ||
| 191 |
2/2✓ Branch 0 taken 120 times.
✓ Branch 1 taken 60 times.
|
180 | } while (req != kQuit); |
| 192 | |||
| 193 | 60 | return true; | |
| 194 | 60 | } | |
| 195 | |||
| 196 | 40 | bool Reactor::HandleGenerateToken(const std::string &req, std::string *reply) { | |
| 197 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 40 times.
|
40 | if (reply == NULL) { |
| 198 | ✗ | PANIC(kLogSyslogErr, "HandleGenerateToken: Invalid reply pointer."); | |
| 199 | } | ||
| 200 |
2/4✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 40 times.
✗ Branch 5 not taken.
|
40 | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
| 201 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
|
40 | if (!req_json.IsValid()) { |
| 202 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 203 | "HandleGenerateToken: Invalid JSON request."); | ||
| 204 | ✗ | return false; | |
| 205 | } | ||
| 206 | |||
| 207 |
2/4✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 40 times.
✗ Branch 8 not taken.
|
40 | const JSON *key_id = JsonDocument::SearchInObject(req_json->root(), "key_id", |
| 208 | JSON_STRING); | ||
| 209 |
2/4✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 40 times.
✗ Branch 8 not taken.
|
40 | const JSON *path = JsonDocument::SearchInObject(req_json->root(), "path", |
| 210 | JSON_STRING); | ||
| 211 |
2/4✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 40 times.
✗ Branch 8 not taken.
|
40 | const JSON *max_lease_time = JsonDocument::SearchInObject( |
| 212 | req_json->root(), "max_lease_time", JSON_INT); | ||
| 213 | |||
| 214 |
3/6✓ Branch 0 taken 40 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 40 times.
|
40 | if (key_id == NULL || path == NULL || max_lease_time == NULL) { |
| 215 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 216 | "HandleGenerateToken: Missing fields in request."); | ||
| 217 | ✗ | return false; | |
| 218 | } | ||
| 219 | |||
| 220 | 40 | std::string session_token; | |
| 221 | 40 | std::string public_token_id; | |
| 222 | 40 | std::string token_secret; | |
| 223 | |||
| 224 |
4/8✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 40 times.
✗ Branch 9 not taken.
✗ Branch 14 not taken.
✓ Branch 15 taken 40 times.
|
40 | if (!GenerateSessionToken(key_id->string_value, path->string_value, |
| 225 | 40 | max_lease_time->int_value, &session_token, | |
| 226 | &public_token_id, &token_secret)) { | ||
| 227 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 228 | "HandleGenerateToken: Could not generate session token."); | ||
| 229 | ✗ | return false; | |
| 230 | } | ||
| 231 | |||
| 232 | 40 | JsonStringGenerator input; | |
| 233 |
2/4✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
|
40 | input.Add("token", session_token); |
| 234 |
2/4✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
|
40 | input.Add("id", public_token_id); |
| 235 |
2/4✓ Branch 2 taken 40 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 40 times.
✗ Branch 6 not taken.
|
40 | input.Add("secret", token_secret); |
| 236 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | const std::string json = input.GenerateString(); |
| 237 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | *reply = json; |
| 238 | |||
| 239 | 40 | return true; | |
| 240 | 40 | } | |
| 241 | |||
| 242 | 20 | bool Reactor::HandleGetTokenId(const std::string &req, std::string *reply) { | |
| 243 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | if (reply == NULL) { |
| 244 | ✗ | PANIC(kLogSyslogErr, "HandleGetTokenId: Invalid reply pointer."); | |
| 245 | } | ||
| 246 | |||
| 247 | 20 | std::string token_id; | |
| 248 | 20 | JsonStringGenerator input; | |
| 249 |
2/4✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 20 times.
|
20 | if (!GetTokenPublicId(req, &token_id)) { |
| 250 | ✗ | input.Add("status", "error"); | |
| 251 | ✗ | input.Add("reason", "invalid_token"); | |
| 252 | } else { | ||
| 253 |
3/6✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 20 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 20 times.
✗ Branch 10 not taken.
|
20 | input.Add("status", "ok"); |
| 254 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20 times.
✗ Branch 6 not taken.
|
20 | input.Add("id", token_id); |
| 255 | } | ||
| 256 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | const std::string json = input.GenerateString(); |
| 257 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | *reply = json; |
| 258 | |||
| 259 | 20 | return true; | |
| 260 | 20 | } | |
| 261 | |||
| 262 | 20 | bool Reactor::HandleCheckToken(const std::string &req, std::string *reply) { | |
| 263 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | if (reply == NULL) { |
| 264 | ✗ | PANIC(kLogSyslogErr, "HandleCheckToken: Invalid reply pointer."); | |
| 265 | } | ||
| 266 | |||
| 267 |
2/4✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
|
20 | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
| 268 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
|
20 | if (!req_json.IsValid()) { |
| 269 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 270 | "HandleCheckToken: Invalid JSON request."); | ||
| 271 | ✗ | return false; | |
| 272 | } | ||
| 273 | |||
| 274 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
|
20 | const JSON *token = JsonDocument::SearchInObject(req_json->root(), "token", |
| 275 | JSON_STRING); | ||
| 276 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
|
20 | const JSON *secret = JsonDocument::SearchInObject(req_json->root(), "secret", |
| 277 | JSON_STRING); | ||
| 278 | |||
| 279 |
2/4✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 20 times.
|
20 | if (token == NULL || secret == NULL) { |
| 280 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 281 | "HandleCheckToken: Missing fields in request."); | ||
| 282 | ✗ | return false; | |
| 283 | } | ||
| 284 | |||
| 285 | 20 | std::string path; | |
| 286 | 20 | JsonStringGenerator input; | |
| 287 |
2/4✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 20 times.
✗ Branch 7 not taken.
|
40 | const TokenCheckResult ret = CheckToken(token->string_value, |
| 288 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | secret->string_value, &path); |
| 289 |
1/4✗ Branch 0 not taken.
✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
|
20 | switch (ret) { |
| 290 | ✗ | case kExpired: | |
| 291 | // Expired token | ||
| 292 | ✗ | input.Add("status", "error"); | |
| 293 | ✗ | input.Add("reason", "expired_token"); | |
| 294 | ✗ | break; | |
| 295 | ✗ | case kInvalid: | |
| 296 | // Invalid token | ||
| 297 | ✗ | input.Add("status", "error"); | |
| 298 | ✗ | input.Add("reason", "invalid_token"); | |
| 299 | ✗ | break; | |
| 300 | 20 | case kValid: | |
| 301 | // All ok | ||
| 302 |
3/6✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 20 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 20 times.
✗ Branch 10 not taken.
|
20 | input.Add("status", "ok"); |
| 303 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20 times.
✗ Branch 6 not taken.
|
20 | input.Add("path", path); |
| 304 | 20 | break; | |
| 305 | ✗ | default: | |
| 306 | // Should not be reached | ||
| 307 | ✗ | PANIC(kLogSyslogErr, | |
| 308 | "HandleCheckToken: Unknown value received. Exiting."); | ||
| 309 | } | ||
| 310 | |||
| 311 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | const std::string json = input.GenerateString(); |
| 312 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | *reply = json; |
| 313 | |||
| 314 | 20 | return true; | |
| 315 | 20 | } | |
| 316 | |||
| 317 | // This is a special handler. We need to continue reading the payload from the | ||
| 318 | // fdin_ | ||
| 319 | 20 | bool Reactor::HandleSubmitPayload(int fdin, const std::string &req, | |
| 320 | std::string *reply) { | ||
| 321 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | if (!reply) { |
| 322 | ✗ | PANIC(kLogSyslogErr, "HandleSubmitPayload: Invalid reply pointer."); | |
| 323 | } | ||
| 324 | |||
| 325 | // Extract the Path (used for verification), Digest and DigestSize from the | ||
| 326 | // request JSON. | ||
| 327 |
2/4✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
|
20 | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); |
| 328 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
|
20 | if (!req_json.IsValid()) { |
| 329 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 330 | "HandleSubmitPayload: Invalid JSON request."); | ||
| 331 | ✗ | return false; | |
| 332 | } | ||
| 333 | |||
| 334 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
|
20 | const JSON *path_json = JsonDocument::SearchInObject(req_json->root(), "path", |
| 335 | JSON_STRING); | ||
| 336 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
|
20 | const JSON *digest_json = JsonDocument::SearchInObject(req_json->root(), |
| 337 | "digest", JSON_STRING); | ||
| 338 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 20 times.
✗ Branch 8 not taken.
|
20 | const JSON *header_size_json = JsonDocument::SearchInObject( |
| 339 | req_json->root(), "header_size", JSON_INT); | ||
| 340 | |||
| 341 |
3/6✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 20 times.
|
20 | if (path_json == NULL || digest_json == NULL || header_size_json == NULL) { |
| 342 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 343 | "HandleSubmitPayload: Missing fields in request."); | ||
| 344 | ✗ | return false; | |
| 345 | } | ||
| 346 | |||
| 347 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | perf::Statistics statistics; |
| 348 | |||
| 349 |
2/4✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
|
20 | const UniquePtr<PayloadProcessor> proc(MakePayloadProcessor()); |
| 350 |
1/2✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
|
20 | proc->SetStatistics(&statistics); |
| 351 | 20 | JsonStringGenerator reply_input; | |
| 352 |
1/2✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
|
60 | const PayloadProcessor::Result res = proc->Process( |
| 353 |
2/4✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 20 times.
✗ Branch 5 not taken.
|
40 | fdin, digest_json->string_value, path_json->string_value, |
| 354 | 20 | header_size_json->int_value); | |
| 355 | |||
| 356 |
1/5✗ Branch 0 not taken.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 20 times.
✗ Branch 4 not taken.
|
20 | switch (res) { |
| 357 | ✗ | case PayloadProcessor::kPathViolation: | |
| 358 | ✗ | reply_input.Add("status", "error"); | |
| 359 | ✗ | reply_input.Add("reason", "path_violation"); | |
| 360 | ✗ | break; | |
| 361 | ✗ | case PayloadProcessor::kOtherError: | |
| 362 | ✗ | reply_input.Add("status", "error"); | |
| 363 | ✗ | reply_input.Add("reason", "other_error"); | |
| 364 | ✗ | break; | |
| 365 | ✗ | case PayloadProcessor::kUploaderError: | |
| 366 | ✗ | reply_input.Add("status", "error"); | |
| 367 | ✗ | reply_input.Add("reason", "uploader_error"); | |
| 368 | ✗ | break; | |
| 369 | 20 | case PayloadProcessor::kSuccess: | |
| 370 |
3/6✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 20 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 20 times.
✗ Branch 10 not taken.
|
20 | reply_input.Add("status", "ok"); |
| 371 | 20 | break; | |
| 372 | ✗ | default: | |
| 373 | ✗ | PANIC(kLogSyslogErr, | |
| 374 | "HandleSubmitPayload: Unknown value of PayloadProcessor::Result " | ||
| 375 | "encountered."); | ||
| 376 | break; | ||
| 377 | } | ||
| 378 | |||
| 379 | // HandleSubmitPayload sends partial statistics back to the gateway | ||
| 380 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
40 | const std::string stats_json = statistics.PrintJSON(); |
| 381 |
2/4✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 20 times.
✗ Branch 6 not taken.
|
20 | reply_input.AddJsonObject("statistics", stats_json); |
| 382 | |||
| 383 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | const std::string json = reply_input.GenerateString(); |
| 384 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | *reply = json; |
| 385 | |||
| 386 | 20 | return true; | |
| 387 | 20 | } | |
| 388 | |||
| 389 | ✗ | bool Reactor::HandleCommit(const std::string &req, std::string *reply) { | |
| 390 | ✗ | if (!reply) { | |
| 391 | ✗ | PANIC(kLogSyslogErr, "HandleCommit: Invalid reply pointer."); | |
| 392 | } | ||
| 393 | // Extract the Path from the request JSON. | ||
| 394 | ✗ | const UniquePtr<JsonDocument> req_json(JsonDocument::Create(req)); | |
| 395 | ✗ | if (!req_json.IsValid()) { | |
| 396 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 397 | "HandleCommit: Invalid JSON request."); | ||
| 398 | ✗ | return false; | |
| 399 | } | ||
| 400 | |||
| 401 | ✗ | const JSON *lease_path_json = JsonDocument::SearchInObject( | |
| 402 | req_json->root(), "lease_path", JSON_STRING); | ||
| 403 | ✗ | const JSON *old_root_hash_json = JsonDocument::SearchInObject( | |
| 404 | req_json->root(), "old_root_hash", JSON_STRING); | ||
| 405 | ✗ | const JSON *new_root_hash_json = JsonDocument::SearchInObject( | |
| 406 | req_json->root(), "new_root_hash", JSON_STRING); | ||
| 407 | ✗ | const JSON *tag_name_json = JsonDocument::SearchInObject( | |
| 408 | req_json->root(), "tag_name", JSON_STRING); | ||
| 409 | ✗ | const JSON *tag_description_json = JsonDocument::SearchInObject( | |
| 410 | req_json->root(), "tag_description", JSON_STRING); | ||
| 411 | |||
| 412 | ✗ | if (lease_path_json == NULL || old_root_hash_json == NULL | |
| 413 | ✗ | || new_root_hash_json == NULL) { | |
| 414 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 415 | "HandleCommit: Missing fields in request."); | ||
| 416 | ✗ | return false; | |
| 417 | } | ||
| 418 | |||
| 419 | ✗ | perf::Statistics statistics; | |
| 420 | ✗ | std::string start_time; | |
| 421 | ✗ | if (!Reactor::ExtractStatsFromReq(req_json.weak_ref(), &statistics, | |
| 422 | &start_time)) { | ||
| 423 | ✗ | LogCvmfs( | |
| 424 | kLogReceiver, kLogSyslogErr, | ||
| 425 | "HandleCommit: Could not extract statistics counters from request"); | ||
| 426 | } | ||
| 427 | uint64_t final_revision; | ||
| 428 | |||
| 429 | // Here we use the path to commit the changes! | ||
| 430 | ✗ | const UniquePtr<CommitProcessor> proc(MakeCommitProcessor()); | |
| 431 | ✗ | proc->SetStatistics(&statistics, start_time); | |
| 432 | ✗ | const shash::Any old_root_hash = shash::MkFromSuffixedHexPtr( | |
| 433 | ✗ | shash::HexPtr(old_root_hash_json->string_value)); | |
| 434 | ✗ | const shash::Any new_root_hash = shash::MkFromSuffixedHexPtr( | |
| 435 | ✗ | shash::HexPtr(new_root_hash_json->string_value)); | |
| 436 | ✗ | const RepositoryTag repo_tag(tag_name_json->string_value, | |
| 437 | ✗ | tag_description_json->string_value); | |
| 438 | ✗ | const CommitProcessor::Result res = proc->Process( | |
| 439 | ✗ | lease_path_json->string_value, old_root_hash, new_root_hash, repo_tag, | |
| 440 | &final_revision); | ||
| 441 | |||
| 442 | ✗ | JsonStringGenerator reply_input; | |
| 443 | ✗ | switch (res) { | |
| 444 | ✗ | case CommitProcessor::kSuccess: | |
| 445 | ✗ | reply_input.Add("status", "ok"); | |
| 446 | ✗ | reply_input.Add("final_revision", static_cast<int64_t>(final_revision)); | |
| 447 | ✗ | break; | |
| 448 | ✗ | case CommitProcessor::kError: | |
| 449 | ✗ | reply_input.Add("status", "error"); | |
| 450 | ✗ | reply_input.Add("reason", "miscellaneous"); | |
| 451 | ✗ | break; | |
| 452 | ✗ | case CommitProcessor::kMergeFailure: | |
| 453 | ✗ | reply_input.Add("status", "error"); | |
| 454 | ✗ | reply_input.Add("reason", "merge_error"); | |
| 455 | ✗ | break; | |
| 456 | ✗ | case CommitProcessor::kMissingReflog: | |
| 457 | ✗ | reply_input.Add("status", "error"); | |
| 458 | ✗ | reply_input.Add("reason", "missing_reflog"); | |
| 459 | ✗ | break; | |
| 460 | ✗ | default: | |
| 461 | ✗ | PANIC(kLogSyslogErr, | |
| 462 | "Unknown value of CommitProcessor::Result encountered."); | ||
| 463 | break; | ||
| 464 | } | ||
| 465 | |||
| 466 | ✗ | const std::string json = reply_input.GenerateString(); | |
| 467 | ✗ | *reply = json; | |
| 468 | |||
| 469 | ✗ | return true; | |
| 470 | } | ||
| 471 | |||
| 472 | ✗ | PayloadProcessor *Reactor::MakePayloadProcessor() { | |
| 473 | ✗ | return new PayloadProcessor(); | |
| 474 | } | ||
| 475 | |||
| 476 | ✗ | CommitProcessor *Reactor::MakeCommitProcessor() { | |
| 477 | ✗ | return new CommitProcessor(); | |
| 478 | } | ||
| 479 | |||
| 480 | 180 | bool Reactor::HandleRequest(Request req, const std::string &data) { | |
| 481 | 180 | bool ok = true; | |
| 482 | 180 | std::string reply; | |
| 483 | try { | ||
| 484 |
6/10✓ Branch 0 taken 60 times.
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 40 times.
✓ Branch 3 taken 20 times.
✓ Branch 4 taken 20 times.
✓ Branch 5 taken 20 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
|
180 | switch (req) { |
| 485 | 60 | case kQuit: | |
| 486 |
2/4✓ Branch 2 taken 60 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 60 times.
✗ Branch 6 not taken.
|
60 | ok = WriteReply(fdout_, "ok"); |
| 487 | 60 | break; | |
| 488 | 20 | case kEcho: | |
| 489 |
4/8✓ Branch 2 taken 20 times.
✗ Branch 3 not taken.
✓ Branch 6 taken 20 times.
✗ Branch 7 not taken.
✓ Branch 9 taken 20 times.
✗ Branch 10 not taken.
✓ Branch 12 taken 20 times.
✗ Branch 13 not taken.
|
20 | ok = WriteReply(fdout_, std::string("PID: ") + StringifyUint(getpid())); |
| 490 | 20 | break; | |
| 491 | 40 | case kGenerateToken: | |
| 492 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | ok &= HandleGenerateToken(data, &reply); |
| 493 |
1/2✓ Branch 1 taken 40 times.
✗ Branch 2 not taken.
|
40 | ok &= WriteReply(fdout_, reply); |
| 494 | 40 | break; | |
| 495 | 20 | case kGetTokenId: | |
| 496 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | ok &= HandleGetTokenId(data, &reply); |
| 497 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | ok &= WriteReply(fdout_, reply); |
| 498 | 20 | break; | |
| 499 | 20 | case kCheckToken: | |
| 500 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | ok &= HandleCheckToken(data, &reply); |
| 501 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | ok &= WriteReply(fdout_, reply); |
| 502 | 20 | break; | |
| 503 | 20 | case kSubmitPayload: | |
| 504 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | ok &= HandleSubmitPayload(fdin_, data, &reply); |
| 505 |
1/2✓ Branch 1 taken 20 times.
✗ Branch 2 not taken.
|
20 | ok &= WriteReply(fdout_, reply); |
| 506 | 20 | break; | |
| 507 | ✗ | case kCommit: | |
| 508 | ✗ | ok &= HandleCommit(data, &reply); | |
| 509 | ✗ | ok &= WriteReply(fdout_, reply); | |
| 510 | ✗ | break; | |
| 511 | ✗ | case kTestCrash: | |
| 512 | ✗ | PANIC(kLogSyslogErr, | |
| 513 | "Crash for test purposes. Should never happen in production " | ||
| 514 | "environment."); | ||
| 515 | break; | ||
| 516 | ✗ | case kError: | |
| 517 | ✗ | LogCvmfs(kLogReceiver, kLogSyslogErr, | |
| 518 | "Reactor: unknown command received."); | ||
| 519 | ✗ | ok = false; | |
| 520 | ✗ | break; | |
| 521 | ✗ | default: | |
| 522 | ✗ | break; | |
| 523 | } | ||
| 524 | ✗ | } catch (const ECvmfsException &e) { | |
| 525 | ✗ | reply.clear(); | |
| 526 | |||
| 527 | ✗ | std::string error("runtime error: "); | |
| 528 | ✗ | error += e.what(); | |
| 529 | |||
| 530 | ✗ | JsonStringGenerator input; | |
| 531 | ✗ | input.Add("status", "error"); | |
| 532 | ✗ | input.Add("reason", error); | |
| 533 | |||
| 534 | ✗ | reply = input.GenerateString(); | |
| 535 | ✗ | WriteReply(fdout_, reply); | |
| 536 | ✗ | throw e; | |
| 537 | } | ||
| 538 | |||
| 539 | 180 | return ok; | |
| 540 | 180 | } | |
| 541 | |||
| 542 | } // namespace receiver | ||
| 543 |