38 using namespace receiver;
41 int32_t req_id =
kQuit;
52 if (req_id ==
kError || nb != 4) {
58 std::vector<char> buffer(msg_size);
59 nb =
SafeRead(fd, &buffer[0], msg_size);
65 *data = std::string(&buffer[0], msg_size);
68 return static_cast<Request>(req_id);
72 const int32_t msg_size = data.size();
73 const int32_t total_size = 8 + data.size();
75 std::vector<char> buffer(total_size);
77 memcpy(&buffer[0], &req, 4);
78 memcpy(&buffer[4], &msg_size, 4);
81 memcpy(&buffer[8], &data[0], data.size());
84 return SafeWrite(fd, &buffer[0], total_size);
95 std::vector<char> buffer(msg_size);
96 nb =
SafeRead(fd, &buffer[0], msg_size);
102 *data = std::string(&buffer[0], msg_size);
108 const int32_t msg_size = data.size();
109 const int32_t total_size = 4 + data.size();
111 std::vector<char> buffer(total_size);
113 memcpy(&buffer[0], &msg_size, 4);
116 memcpy(&buffer[4], &data[0], data.size());
119 return SafeWrite(fd, &buffer[0], total_size);
123 std::string* start_time) {
127 const JSON* statistics =
129 if (statistics == NULL) {
131 "Could not find 'statistics' field in request");
135 const JSON* publish_ctrs =
138 if (publish_ctrs == NULL) {
140 "Could not find 'statistics.publish' field in request");
144 const JSON* n_chunks_added =
147 publish_ctrs,
"n_chunks_duplicated", JSON_INT);
148 const JSON* n_catalogs_added =
150 const JSON* sz_uploaded_bytes =
153 publish_ctrs,
"sz_uploaded_catalog_bytes", JSON_INT);
155 const JSON* start_time_json =
158 if (n_chunks_added == NULL || n_chunks_duplicated == NULL ||
159 n_catalogs_added == NULL || sz_uploaded_bytes == NULL ||
160 sz_uploaded_catalog_bytes == NULL || start_time_json == NULL) {
169 sz_uploaded_catalog_bytes->int_value);
171 *start_time = start_time_json->string_value;
181 std::string msg_body;
188 "Reactor: could not handle request %d. Exiting", req);
191 }
while (req !=
kQuit);
203 "HandleGenerateToken: Invalid JSON request.");
212 req_json->
root(),
"max_lease_time", JSON_INT);
214 if (key_id == NULL || path == NULL || max_lease_time == NULL) {
216 "HandleGenerateToken: Missing fields in request.");
220 std::string session_token;
221 std::string public_token_id;
222 std::string token_secret;
225 max_lease_time->int_value, &session_token,
226 &public_token_id, &token_secret)) {
228 "HandleGenerateToken: Could not generate session token.");
233 input.
Add(
"token", session_token);
234 input.
Add(
"id", public_token_id);
235 input.
Add(
"secret", token_secret);
247 std::string token_id;
250 input.
Add(
"status",
"error");
251 input.
Add(
"reason",
"invalid_token");
253 input.
Add(
"status",
"ok");
254 input.
Add(
"id", token_id);
270 "HandleCheckToken: Invalid JSON request.");
279 if (token == NULL || secret == NULL) {
281 "HandleCheckToken: Missing fields in request.");
288 CheckToken(token->string_value, secret->string_value, &path);
292 input.
Add(
"status",
"error");
293 input.
Add(
"reason",
"expired_token");
297 input.
Add(
"status",
"error");
298 input.
Add(
"reason",
"invalid_token");
302 input.
Add(
"status",
"ok");
303 input.
Add(
"path", path);
308 "HandleCheckToken: Unknown value received. Exiting.");
320 std::string* reply) {
330 "HandleSubmitPayload: Invalid JSON request.");
334 const JSON* path_json =
336 const JSON* digest_json =
338 const JSON* header_size_json =
341 if (path_json == NULL || digest_json == NULL || header_size_json == NULL) {
343 "HandleSubmitPayload: Missing fields in request.");
353 proc->
Process(fdin, digest_json->string_value, path_json->string_value,
354 header_size_json->int_value);
358 reply_input.
Add(
"status",
"error");
359 reply_input.
Add(
"reason",
"path_violation");
362 reply_input.
Add(
"status",
"error");
363 reply_input.
Add(
"reason",
"other_error");
366 reply_input.
Add(
"status",
"error");
367 reply_input.
Add(
"reason",
"uploader_error");
370 reply_input.
Add(
"status",
"ok");
374 "HandleSubmitPayload: Unknown value of PayloadProcessor::Result "
380 std::string stats_json = statistics.
PrintJSON();
397 "HandleCommit: Invalid JSON request.");
401 const JSON* lease_path_json =
404 req_json->
root(),
"old_root_hash", JSON_STRING);
406 req_json->
root(),
"new_root_hash", JSON_STRING);
407 const JSON* tag_name_json =
410 req_json->
root(),
"tag_description", JSON_STRING);
412 if (lease_path_json == NULL || old_root_hash_json == NULL ||
413 new_root_hash_json == NULL) {
415 "HandleCommit: Missing fields in request.");
420 std::string start_time;
425 "HandleCommit: Could not extract statistics counters from request");
427 uint64_t final_revision;
437 tag_description_json->string_value);
439 proc->
Process(lease_path_json->string_value, old_root_hash, new_root_hash,
440 repo_tag, &final_revision);
445 reply_input.
Add(
"status",
"ok");
446 reply_input.
Add(
"final_revision", static_cast<int64_t>(final_revision));
449 reply_input.
Add(
"status",
"error");
450 reply_input.
Add(
"reason",
"miscellaneous");
453 reply_input.
Add(
"status",
"error");
454 reply_input.
Add(
"reason",
"merge_error");
457 reply_input.
Add(
"status",
"error");
458 reply_input.
Add(
"reason",
"missing_reflog");
462 "Unknown value of CommitProcessor::Result encountered.");
513 "Crash for test purposes. Should never happen in production "
518 "Reactor: unknown command received.");
527 std::string error(
"runtime error: ");
531 input.
Add(
"status",
"error");
532 input.
Add(
"reason", error);
perf::Counter * n_chunks_duplicated
void Add(const std::string &key, const std::string &val)
int64_t Xadd(class Counter *counter, const int64_t delta)
virtual bool HandleCommit(const std::string &req, std::string *reply)
virtual CommitProcessor * MakeCommitProcessor()
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
virtual bool HandleCheckToken(const std::string &req, std::string *reply)
bool HandleRequest(Request req, const std::string &data)
bool GenerateSessionToken(const std::string &key_id, const std::string &path, uint64_t max_lease_time, std::string *session_token, std::string *public_token_id, std::string *token_secret)
virtual bool HandleGetTokenId(const std::string &req, std::string *reply)
bool SafeWrite(int fd, const void *buf, size_t nbyte)
std::string StringifyUint(const uint64_t value)
static bool ReadReply(int fd, std::string *data)
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
static JsonDocument * Create(const std::string &text)
virtual bool HandleGenerateToken(const std::string &req, std::string *reply)
perf::Counter * sz_uploaded_catalog_bytes
Result Process(int fdin, const std::string &header_digest, const std::string &path, uint64_t header_size)
static bool ExtractStatsFromReq(JsonDocument *req, perf::Statistics *stats, std::string *start_time)
bool GetTokenPublicId(const std::string &token, std::string *public_id)
void SetStatistics(perf::Statistics *st, const std::string &start_time)
static Request ReadRequest(int fd, std::string *data)
TokenCheckResult CheckToken(const std::string &token, const std::string &secret, std::string *lease_path)
static bool WriteReply(int fd, const std::string &data)
perf::Counter * n_chunks_added
Any MkFromSuffixedHexPtr(const HexPtr hex)
Result Process(const std::string &lease_path, const shash::Any &old_root_hash, const shash::Any &new_root_hash, const RepositoryTag &tag, uint64_t *final_revision)
perf::Counter * n_catalogs_added
virtual PayloadProcessor * MakePayloadProcessor()
virtual bool HandleSubmitPayload(int fdin, const std::string &req, std::string *reply)
static bool WriteRequest(int fd, Request req, const std::string &data)
std::string GenerateString() const
void SetStatistics(perf::Statistics *st)
void AddJsonObject(const std::string &key, const std::string &json)
Reactor(int fdin, int fdout)
const JSON * root() const
perf::Counter * sz_uploaded_bytes
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)