CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
reactor.cc
Go to the documentation of this file.
1 
5 #include "reactor.h"
6 
7 #include <stdint.h>
8 #include <unistd.h>
9 
10 #include <cstdlib>
11 #include <cstring>
12 #include <utility>
13 #include <vector>
14 
15 #include "commit_processor.h"
16 #include "json_document_write.h"
17 #include "payload_processor.h"
18 #include "repository_tag.h"
19 #include "session_token.h"
20 #include "upload_facility.h"
21 #include "util/exception.h"
22 #include "util/logging.h"
23 #include "util/pointer.h"
24 #include "util/posix.h"
25 #include "util/string.h"
26 
27 namespace receiver {
28 
29 // NOTE, during the handling of the messages between the gateway and the
30 // receiver, we keep reading `4` bytes instead of the more common
31 // `sizeof(req_id)` or `sizeof(int32_t)`.
32 // This mirror well the behaviour of the gateway code.
33 // It would be possible on both codebase to ask the size of the type, but then
34 // we would need to make sure that the types are actually the same.
35 // It is simpler to send `4` bytes.
36 
37 Reactor::Request Reactor::ReadRequest(int fd, std::string* data) {
38  using namespace receiver; // NOLINT
39 
40  // First, read the command identifier
41  int32_t req_id = kQuit;
42  int nb = SafeRead(fd, &req_id, 4);
43 
44  if (nb != 4) {
45  return kError;
46  }
47 
48  // Then, read message size
49  int32_t msg_size = 0;
50  nb = SafeRead(fd, &msg_size, 4);
51 
52  if (req_id == kError || nb != 4) {
53  return kError;
54  }
55 
56  // Finally read the message body
57  if (msg_size > 0) {
58  std::vector<char> buffer(msg_size);
59  nb = SafeRead(fd, &buffer[0], msg_size);
60 
61  if (nb != msg_size) {
62  return kError;
63  }
64 
65  *data = std::string(&buffer[0], msg_size);
66  }
67 
68  return static_cast<Request>(req_id);
69 }
70 
71 bool Reactor::WriteRequest(int fd, Request req, const std::string& data) {
72  const int32_t msg_size = data.size();
73  const int32_t total_size = 8 + data.size(); // req + msg_size + data
74 
75  std::vector<char> buffer(total_size);
76 
77  memcpy(&buffer[0], &req, 4);
78  memcpy(&buffer[4], &msg_size, 4);
79 
80  if (!data.empty()) {
81  memcpy(&buffer[8], &data[0], data.size());
82  }
83 
84  return SafeWrite(fd, &buffer[0], total_size);
85 }
86 
87 bool Reactor::ReadReply(int fd, std::string* data) {
88  int32_t msg_size(0);
89  int nb = SafeRead(fd, &msg_size, 4);
90 
91  if (nb != 4) {
92  return false;
93  }
94 
95  std::vector<char> buffer(msg_size);
96  nb = SafeRead(fd, &buffer[0], msg_size);
97 
98  if (nb != msg_size) {
99  return false;
100  }
101 
102  *data = std::string(&buffer[0], msg_size);
103 
104  return true;
105 }
106 
107 bool Reactor::WriteReply(int fd, const std::string& data) {
108  const int32_t msg_size = data.size();
109  const int32_t total_size = 4 + data.size();
110 
111  std::vector<char> buffer(total_size);
112 
113  memcpy(&buffer[0], &msg_size, 4);
114 
115  if (!data.empty()) {
116  memcpy(&buffer[4], &data[0], data.size());
117  }
118 
119  return SafeWrite(fd, &buffer[0], total_size);
120 }
121 
123  std::string* start_time) {
124  perf::StatisticsTemplate stats_tmpl("publish", stats);
125  upload::UploadCounters counters(stats_tmpl);
126 
127  const JSON* statistics =
128  JsonDocument::SearchInObject(req->root(), "statistics", JSON_OBJECT);
129  if (statistics == NULL) {
131  "Could not find 'statistics' field in request");
132  return false;
133  }
134 
135  const JSON* publish_ctrs =
136  JsonDocument::SearchInObject(statistics, "publish", JSON_OBJECT);
137 
138  if (publish_ctrs == NULL) {
140  "Could not find 'statistics.publish' field in request");
141  return false;
142  }
143 
144  const JSON* n_chunks_added =
145  JsonDocument::SearchInObject(publish_ctrs, "n_chunks_added", JSON_INT);
146  const JSON* n_chunks_duplicated = JsonDocument::SearchInObject(
147  publish_ctrs, "n_chunks_duplicated", JSON_INT);
148  const JSON* n_catalogs_added =
149  JsonDocument::SearchInObject(publish_ctrs, "n_catalogs_added", JSON_INT);
150  const JSON* sz_uploaded_bytes =
151  JsonDocument::SearchInObject(publish_ctrs, "sz_uploaded_bytes", JSON_INT);
152  const JSON* sz_uploaded_catalog_bytes = JsonDocument::SearchInObject(
153  publish_ctrs, "sz_uploaded_catalog_bytes", JSON_INT);
154 
155  const JSON* start_time_json =
156  JsonDocument::SearchInObject(statistics, "start_time", JSON_STRING);
157 
158  if (n_chunks_added == NULL || n_chunks_duplicated == NULL ||
159  n_catalogs_added == NULL || sz_uploaded_bytes == NULL ||
160  sz_uploaded_catalog_bytes == NULL || start_time_json == NULL) {
161  return false;
162  }
163 
164  perf::Xadd(counters.n_chunks_added, n_chunks_added->int_value);
165  perf::Xadd(counters.n_chunks_duplicated, n_chunks_duplicated->int_value);
166  perf::Xadd(counters.n_catalogs_added, n_catalogs_added->int_value);
167  perf::Xadd(counters.sz_uploaded_bytes, sz_uploaded_bytes->int_value);
169  sz_uploaded_catalog_bytes->int_value);
170 
171  *start_time = start_time_json->string_value;
172 
173  return true;
174 }
175 
176 Reactor::Reactor(int fdin, int fdout) : fdin_(fdin), fdout_(fdout) {}
177 
179 
180 bool Reactor::Run() {
181  std::string msg_body;
182  Request req = kQuit;
183  do {
184  msg_body.clear();
185  req = ReadRequest(fdin_, &msg_body);
186  if (!HandleRequest(req, msg_body)) {
188  "Reactor: could not handle request %d. Exiting", req);
189  return false;
190  }
191  } while (req != kQuit);
192 
193  return true;
194 }
195 
196 bool Reactor::HandleGenerateToken(const std::string& req, std::string* reply) {
197  if (reply == NULL) {
198  PANIC(kLogSyslogErr, "HandleGenerateToken: Invalid reply pointer.");
199  }
201  if (!req_json.IsValid()) {
203  "HandleGenerateToken: Invalid JSON request.");
204  return false;
205  }
206 
207  const JSON* key_id =
208  JsonDocument::SearchInObject(req_json->root(), "key_id", JSON_STRING);
209  const JSON* path =
210  JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING);
211  const JSON* max_lease_time = JsonDocument::SearchInObject(
212  req_json->root(), "max_lease_time", JSON_INT);
213 
214  if (key_id == NULL || path == NULL || max_lease_time == NULL) {
216  "HandleGenerateToken: Missing fields in request.");
217  return false;
218  }
219 
220  std::string session_token;
221  std::string public_token_id;
222  std::string token_secret;
223 
224  if (!GenerateSessionToken(key_id->string_value, path->string_value,
225  max_lease_time->int_value, &session_token,
226  &public_token_id, &token_secret)) {
228  "HandleGenerateToken: Could not generate session token.");
229  return false;
230  }
231 
232  JsonStringGenerator input;
233  input.Add("token", session_token);
234  input.Add("id", public_token_id);
235  input.Add("secret", token_secret);
236  std::string json = input.GenerateString();
237  *reply = json;
238 
239  return true;
240 }
241 
242 bool Reactor::HandleGetTokenId(const std::string& req, std::string* reply) {
243  if (reply == NULL) {
244  PANIC(kLogSyslogErr, "HandleGetTokenId: Invalid reply pointer.");
245  }
246 
247  std::string token_id;
248  JsonStringGenerator input;
249  if (!GetTokenPublicId(req, &token_id)) {
250  input.Add("status", "error");
251  input.Add("reason", "invalid_token");
252  } else {
253  input.Add("status", "ok");
254  input.Add("id", token_id);
255  }
256  std::string json = input.GenerateString();
257  *reply = json;
258 
259  return true;
260 }
261 
262 bool Reactor::HandleCheckToken(const std::string& req, std::string* reply) {
263  if (reply == NULL) {
264  PANIC(kLogSyslogErr, "HandleCheckToken: Invalid reply pointer.");
265  }
266 
268  if (!req_json.IsValid()) {
270  "HandleCheckToken: Invalid JSON request.");
271  return false;
272  }
273 
274  const JSON* token =
275  JsonDocument::SearchInObject(req_json->root(), "token", JSON_STRING);
276  const JSON* secret =
277  JsonDocument::SearchInObject(req_json->root(), "secret", JSON_STRING);
278 
279  if (token == NULL || secret == NULL) {
281  "HandleCheckToken: Missing fields in request.");
282  return false;
283  }
284 
285  std::string path;
286  JsonStringGenerator input;
287  TokenCheckResult ret =
288  CheckToken(token->string_value, secret->string_value, &path);
289  switch (ret) {
290  case kExpired:
291  // Expired token
292  input.Add("status", "error");
293  input.Add("reason", "expired_token");
294  break;
295  case kInvalid:
296  // Invalid token
297  input.Add("status", "error");
298  input.Add("reason", "invalid_token");
299  break;
300  case kValid:
301  // All ok
302  input.Add("status", "ok");
303  input.Add("path", path);
304  break;
305  default:
306  // Should not be reached
308  "HandleCheckToken: Unknown value received. Exiting.");
309  }
310 
311  std::string json = input.GenerateString();
312  *reply = json;
313 
314  return true;
315 }
316 
317 // This is a special handler. We need to continue reading the payload from the
318 // fdin_
319 bool Reactor::HandleSubmitPayload(int fdin, const std::string& req,
320  std::string* reply) {
321  if (!reply) {
322  PANIC(kLogSyslogErr, "HandleSubmitPayload: Invalid reply pointer.");
323  }
324 
325  // Extract the Path (used for verification), Digest and DigestSize from the
326  // request JSON.
328  if (!req_json.IsValid()) {
330  "HandleSubmitPayload: Invalid JSON request.");
331  return false;
332  }
333 
334  const JSON* path_json =
335  JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING);
336  const JSON* digest_json =
337  JsonDocument::SearchInObject(req_json->root(), "digest", JSON_STRING);
338  const JSON* header_size_json =
339  JsonDocument::SearchInObject(req_json->root(), "header_size", JSON_INT);
340 
341  if (path_json == NULL || digest_json == NULL || header_size_json == NULL) {
343  "HandleSubmitPayload: Missing fields in request.");
344  return false;
345  }
346 
347  perf::Statistics statistics;
348 
350  proc->SetStatistics(&statistics);
351  JsonStringGenerator reply_input;
353  proc->Process(fdin, digest_json->string_value, path_json->string_value,
354  header_size_json->int_value);
355 
356  switch (res) {
358  reply_input.Add("status", "error");
359  reply_input.Add("reason", "path_violation");
360  break;
362  reply_input.Add("status", "error");
363  reply_input.Add("reason", "other_error");
364  break;
366  reply_input.Add("status", "error");
367  reply_input.Add("reason", "uploader_error");
368  break;
370  reply_input.Add("status", "ok");
371  break;
372  default:
374  "HandleSubmitPayload: Unknown value of PayloadProcessor::Result "
375  "encountered.");
376  break;
377  }
378 
379  // HandleSubmitPayload sends partial statistics back to the gateway
380  std::string stats_json = statistics.PrintJSON();
381  reply_input.AddJsonObject("statistics", stats_json);
382 
383  std::string json = reply_input.GenerateString();
384  *reply = json;
385 
386  return true;
387 }
388 
389 bool Reactor::HandleCommit(const std::string& req, std::string* reply) {
390  if (!reply) {
391  PANIC(kLogSyslogErr, "HandleCommit: Invalid reply pointer.");
392  }
393  // Extract the Path from the request JSON.
395  if (!req_json.IsValid()) {
397  "HandleCommit: Invalid JSON request.");
398  return false;
399  }
400 
401  const JSON* lease_path_json =
402  JsonDocument::SearchInObject(req_json->root(), "lease_path", JSON_STRING);
403  const JSON* old_root_hash_json = JsonDocument::SearchInObject(
404  req_json->root(), "old_root_hash", JSON_STRING);
405  const JSON* new_root_hash_json = JsonDocument::SearchInObject(
406  req_json->root(), "new_root_hash", JSON_STRING);
407  const JSON* tag_name_json =
408  JsonDocument::SearchInObject(req_json->root(), "tag_name", JSON_STRING);
409  const JSON* tag_description_json = JsonDocument::SearchInObject(
410  req_json->root(), "tag_description", JSON_STRING);
411 
412  if (lease_path_json == NULL || old_root_hash_json == NULL ||
413  new_root_hash_json == NULL) {
415  "HandleCommit: Missing fields in request.");
416  return false;
417  }
418 
419  perf::Statistics statistics;
420  std::string start_time;
421  if (!Reactor::ExtractStatsFromReq(req_json.weak_ref(), &statistics,
422  &start_time)) {
423  LogCvmfs(
425  "HandleCommit: Could not extract statistics counters from request");
426  }
427  uint64_t final_revision;
428 
429  // Here we use the path to commit the changes!
431  proc->SetStatistics(&statistics, start_time);
432  shash::Any old_root_hash = shash::MkFromSuffixedHexPtr(
433  shash::HexPtr(old_root_hash_json->string_value));
434  shash::Any new_root_hash = shash::MkFromSuffixedHexPtr(
435  shash::HexPtr(new_root_hash_json->string_value));
436  RepositoryTag repo_tag(tag_name_json->string_value,
437  tag_description_json->string_value);
439  proc->Process(lease_path_json->string_value, old_root_hash, new_root_hash,
440  repo_tag, &final_revision);
441 
442  JsonStringGenerator reply_input;
443  switch (res) {
445  reply_input.Add("status", "ok");
446  reply_input.Add("final_revision", static_cast<int64_t>(final_revision));
447  break;
449  reply_input.Add("status", "error");
450  reply_input.Add("reason", "miscellaneous");
451  break;
453  reply_input.Add("status", "error");
454  reply_input.Add("reason", "merge_error");
455  break;
457  reply_input.Add("status", "error");
458  reply_input.Add("reason", "missing_reflog");
459  break;
460  default:
462  "Unknown value of CommitProcessor::Result encountered.");
463  break;
464  }
465 
466  std::string json = reply_input.GenerateString();
467  *reply = json;
468 
469  return true;
470 }
471 
473  return new PayloadProcessor();
474 }
475 
477  return new CommitProcessor();
478 }
479 
480 bool Reactor::HandleRequest(Request req, const std::string& data) {
481  bool ok = true;
482  std::string reply;
483  try {
484  switch (req) {
485  case kQuit:
486  ok = WriteReply(fdout_, "ok");
487  break;
488  case kEcho:
489  ok = WriteReply(fdout_, std::string("PID: ") + StringifyUint(getpid()));
490  break;
491  case kGenerateToken:
492  ok &= HandleGenerateToken(data, &reply);
493  ok &= WriteReply(fdout_, reply);
494  break;
495  case kGetTokenId:
496  ok &= HandleGetTokenId(data, &reply);
497  ok &= WriteReply(fdout_, reply);
498  break;
499  case kCheckToken:
500  ok &= HandleCheckToken(data, &reply);
501  ok &= WriteReply(fdout_, reply);
502  break;
503  case kSubmitPayload:
504  ok &= HandleSubmitPayload(fdin_, data, &reply);
505  ok &= WriteReply(fdout_, reply);
506  break;
507  case kCommit:
508  ok &= HandleCommit(data, &reply);
509  ok &= WriteReply(fdout_, reply);
510  break;
511  case kTestCrash:
513  "Crash for test purposes. Should never happen in production "
514  "environment.");
515  break;
516  case kError:
518  "Reactor: unknown command received.");
519  ok = false;
520  break;
521  default:
522  break;
523  }
524  } catch (const ECvmfsException& e) {
525  reply.clear();
526 
527  std::string error("runtime error: ");
528  error += e.what();
529 
530  JsonStringGenerator input;
531  input.Add("status", "error");
532  input.Add("reason", error);
533 
534  reply = input.GenerateString();
535  WriteReply(fdout_, reply);
536  throw e;
537  }
538 
539  return ok;
540 }
541 
542 } // namespace receiver
perf::Counter * n_chunks_duplicated
void Add(const std::string &key, const std::string &val)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
virtual bool HandleCommit(const std::string &req, std::string *reply)
Definition: reactor.cc:389
virtual CommitProcessor * MakeCommitProcessor()
Definition: reactor.cc:476
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
#define PANIC(...)
Definition: exception.h:29
virtual bool HandleCheckToken(const std::string &req, std::string *reply)
Definition: reactor.cc:262
bool HandleRequest(Request req, const std::string &data)
Definition: reactor.cc:480
bool GenerateSessionToken(const std::string &key_id, const std::string &path, uint64_t max_lease_time, std::string *session_token, std::string *public_token_id, std::string *token_secret)
virtual bool HandleGetTokenId(const std::string &req, std::string *reply)
Definition: reactor.cc:242
bool SafeWrite(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:2060
std::string StringifyUint(const uint64_t value)
Definition: string.cc:84
static bool ReadReply(int fd, std::string *data)
Definition: reactor.cc:87
virtual ~Reactor()
Definition: reactor.cc:178
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
Definition: posix.cc:2119
static JsonDocument * Create(const std::string &text)
virtual bool HandleGenerateToken(const std::string &req, std::string *reply)
Definition: reactor.cc:196
perf::Counter * sz_uploaded_catalog_bytes
Result Process(int fdin, const std::string &header_digest, const std::string &path, uint64_t header_size)
static bool ExtractStatsFromReq(JsonDocument *req, perf::Statistics *stats, std::string *start_time)
Definition: reactor.cc:122
bool GetTokenPublicId(const std::string &token, std::string *public_id)
void SetStatistics(perf::Statistics *st, const std::string &start_time)
static Request ReadRequest(int fd, std::string *data)
Definition: reactor.cc:37
TokenCheckResult CheckToken(const std::string &token, const std::string &secret, std::string *lease_path)
static bool WriteReply(int fd, const std::string &data)
Definition: reactor.cc:107
perf::Counter * n_chunks_added
std::string PrintJSON()
Definition: statistics.cc:106
Any MkFromSuffixedHexPtr(const HexPtr hex)
Definition: hash.cc:105
Result Process(const std::string &lease_path, const shash::Any &old_root_hash, const shash::Any &new_root_hash, const RepositoryTag &tag, uint64_t *final_revision)
perf::Counter * n_catalogs_added
virtual PayloadProcessor * MakePayloadProcessor()
Definition: reactor.cc:472
virtual bool HandleSubmitPayload(int fdin, const std::string &req, std::string *reply)
Definition: reactor.cc:319
static bool WriteRequest(int fd, Request req, const std::string &data)
Definition: reactor.cc:71
std::string GenerateString() const
void SetStatistics(perf::Statistics *st)
void AddJsonObject(const std::string &key, const std::string &json)
struct json_value JSON
Definition: helper_allow.cc:11
Reactor(int fdin, int fdout)
Definition: reactor.cc:176
const JSON * root() const
Definition: json_document.h:25
perf::Counter * sz_uploaded_bytes
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528