CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
reactor.cc
Go to the documentation of this file.
1 
5 #include "reactor.h"
6 
7 #include <stdint.h>
8 #include <unistd.h>
9 #include <cstdlib>
10 #include <cstring>
11 #include <utility>
12 #include <vector>
13 
14 #include "commit_processor.h"
15 #include "json_document_write.h"
16 #include "logging.h"
17 #include "payload_processor.h"
18 #include "repository_tag.h"
19 #include "session_token.h"
20 #include "upload_facility.h"
21 #include "util/exception.h"
22 #include "util/pointer.h"
23 #include "util/posix.h"
24 #include "util/string.h"
25 
26 namespace receiver {
27 
28 // NOTE, during the handling of the messages between the gateway and the
29 // receiver, we keep reading `4` bytes instead of the more common
30 // `sizeof(req_id)` or `sizeof(int32_t)`.
31 // This mirror well the behaviour of the gateway code.
32 // It would be possible on both codebase to ask the size of the type, but then
33 // we would need to make sure that the types are actually the same.
34 // It is simpler to send `4` bytes.
35 
36 Reactor::Request Reactor::ReadRequest(int fd, std::string* data) {
37  using namespace receiver; // NOLINT
38 
39  // First, read the command identifier
40  int32_t req_id = kQuit;
41  int nb = SafeRead(fd, &req_id, 4);
42 
43  if (nb != 4) {
44  return kError;
45  }
46 
47  // Then, read message size
48  int32_t msg_size = 0;
49  nb = SafeRead(fd, &msg_size, 4);
50 
51  if (req_id == kError || nb != 4) {
52  return kError;
53  }
54 
55  // Finally read the message body
56  if (msg_size > 0) {
57  std::vector<char> buffer(msg_size);
58  nb = SafeRead(fd, &buffer[0], msg_size);
59 
60  if (nb != msg_size) {
61  return kError;
62  }
63 
64  *data = std::string(&buffer[0], msg_size);
65  }
66 
67  return static_cast<Request>(req_id);
68 }
69 
70 bool Reactor::WriteRequest(int fd, Request req, const std::string& data) {
71  const int32_t msg_size = data.size();
72  const int32_t total_size = 8 + data.size(); // req + msg_size + data
73 
74  std::vector<char> buffer(total_size);
75 
76  memcpy(&buffer[0], &req, 4);
77  memcpy(&buffer[4], &msg_size, 4);
78 
79  if (!data.empty()) {
80  memcpy(&buffer[8], &data[0], data.size());
81  }
82 
83  return SafeWrite(fd, &buffer[0], total_size);
84 }
85 
86 bool Reactor::ReadReply(int fd, std::string* data) {
87  int32_t msg_size(0);
88  int nb = SafeRead(fd, &msg_size, 4);
89 
90  if (nb != 4) {
91  return false;
92  }
93 
94  std::vector<char> buffer(msg_size);
95  nb = SafeRead(fd, &buffer[0], msg_size);
96 
97  if (nb != msg_size) {
98  return false;
99  }
100 
101  *data = std::string(&buffer[0], msg_size);
102 
103  return true;
104 }
105 
106 bool Reactor::WriteReply(int fd, const std::string& data) {
107  const int32_t msg_size = data.size();
108  const int32_t total_size = 4 + data.size();
109 
110  std::vector<char> buffer(total_size);
111 
112  memcpy(&buffer[0], &msg_size, 4);
113 
114  if (!data.empty()) {
115  memcpy(&buffer[4], &data[0], data.size());
116  }
117 
118  return SafeWrite(fd, &buffer[0], total_size);
119 }
120 
122  perf::Statistics *stats,
123  std::string *start_time)
124 {
125  perf::StatisticsTemplate stats_tmpl("publish", stats);
126  upload::UploadCounters counters(stats_tmpl);
127 
128  const JSON* statistics = JsonDocument::SearchInObject(
129  req->root(), "statistics", JSON_OBJECT);
130  if (statistics == NULL) {
132  "Could not find 'statistics' field in request");
133  return false;
134  }
135 
136  const JSON* publish_ctrs = JsonDocument::SearchInObject(
137  statistics, "publish", JSON_OBJECT);
138 
139  if (publish_ctrs == NULL) {
141  "Could not find 'statistics.publish' field in request");
142  return false;
143  }
144 
145  const JSON *n_chunks_added = JsonDocument::SearchInObject(
146  publish_ctrs, "n_chunks_added", JSON_STRING);
147  const JSON *n_chunks_duplicated = JsonDocument::SearchInObject(
148  publish_ctrs, "n_chunks_duplicated", JSON_STRING);
149  const JSON *n_catalogs_added = JsonDocument::SearchInObject(
150  publish_ctrs, "n_catalogs_added", JSON_STRING);
151  const JSON *sz_uploaded_bytes = JsonDocument::SearchInObject(
152  publish_ctrs, "sz_uploaded_bytes", JSON_STRING);
153  const JSON *sz_uploaded_catalog_bytes = JsonDocument::SearchInObject(
154  publish_ctrs, "sz_uploaded_catalog_bytes", JSON_STRING);
155 
156  const JSON *start_time_json = JsonDocument::SearchInObject(
157  statistics, "start_time", JSON_STRING);
158 
159  if (n_chunks_added == NULL || n_chunks_duplicated == NULL ||
160  n_catalogs_added == NULL || sz_uploaded_bytes == NULL ||
161  sz_uploaded_catalog_bytes == NULL || start_time_json == NULL) {
162  return false;
163  }
164 
165  perf::Xadd(counters.n_chunks_added,
166  String2Int64(n_chunks_added->string_value));
168  String2Int64(n_chunks_duplicated->string_value));
169  perf::Xadd(counters.n_catalogs_added,
170  String2Int64(n_catalogs_added->string_value));
171  perf::Xadd(counters.sz_uploaded_bytes,
172  String2Int64(sz_uploaded_bytes->string_value));
174  String2Int64(sz_uploaded_catalog_bytes->string_value));
175 
176  *start_time = start_time_json->string_value;
177 
178  return true;
179 }
180 
181 
182 Reactor::Reactor(int fdin, int fdout) : fdin_(fdin), fdout_(fdout) {}
183 
185 
186 bool Reactor::Run() {
187  std::string msg_body;
188  Request req = kQuit;
189  do {
190  msg_body.clear();
191  req = ReadRequest(fdin_, &msg_body);
192  if (!HandleRequest(req, msg_body)) {
194  "Reactor: could not handle request %d. Exiting", req);
195  return false;
196  }
197  } while (req != kQuit);
198 
199  return true;
200 }
201 
202 bool Reactor::HandleGenerateToken(const std::string& req, std::string* reply) {
203  if (reply == NULL) {
204  PANIC(kLogSyslogErr, "HandleGenerateToken: Invalid reply pointer.");
205  }
207  if (!req_json.IsValid()) {
209  "HandleGenerateToken: Invalid JSON request.");
210  return false;
211  }
212 
213  const JSON* key_id =
214  JsonDocument::SearchInObject(req_json->root(), "key_id", JSON_STRING);
215  const JSON* path =
216  JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING);
217  const JSON* max_lease_time = JsonDocument::SearchInObject(
218  req_json->root(), "max_lease_time", JSON_INT);
219 
220  if (key_id == NULL || path == NULL || max_lease_time == NULL) {
222  "HandleGenerateToken: Missing fields in request.");
223  return false;
224  }
225 
226  std::string session_token;
227  std::string public_token_id;
228  std::string token_secret;
229 
230  if (!GenerateSessionToken(key_id->string_value, path->string_value,
231  max_lease_time->int_value, &session_token,
232  &public_token_id, &token_secret)) {
234  "HandleGenerateToken: Could not generate session token.");
235  return false;
236  }
237 
238  JsonStringGenerator input;
239  input.Add("token", session_token);
240  input.Add("id", public_token_id);
241  input.Add("secret", token_secret);
242  std::string json = input.GenerateString();
243  *reply = json;
244 
245  return true;
246 }
247 
248 bool Reactor::HandleGetTokenId(const std::string& req, std::string* reply) {
249  if (reply == NULL) {
250  PANIC(kLogSyslogErr, "HandleGetTokenId: Invalid reply pointer.");
251  }
252 
253  std::string token_id;
254  JsonStringGenerator input;
255  if (!GetTokenPublicId(req, &token_id)) {
256  input.Add("status", "error");
257  input.Add("reason", "invalid_token");
258  } else {
259  input.Add("status", "ok");
260  input.Add("id", token_id);
261  }
262  std::string json = input.GenerateString();
263  *reply = json;
264 
265  return true;
266 }
267 
268 bool Reactor::HandleCheckToken(const std::string& req, std::string* reply) {
269  if (reply == NULL) {
270  PANIC(kLogSyslogErr, "HandleCheckToken: Invalid reply pointer.");
271  }
272 
274  if (!req_json.IsValid()) {
276  "HandleCheckToken: Invalid JSON request.");
277  return false;
278  }
279 
280  const JSON* token =
281  JsonDocument::SearchInObject(req_json->root(), "token", JSON_STRING);
282  const JSON* secret =
283  JsonDocument::SearchInObject(req_json->root(), "secret", JSON_STRING);
284 
285  if (token == NULL || secret == NULL) {
287  "HandleCheckToken: Missing fields in request.");
288  return false;
289  }
290 
291  std::string path;
292  JsonStringGenerator input;
293  TokenCheckResult ret =
294  CheckToken(token->string_value, secret->string_value, &path);
295  switch (ret) {
296  case kExpired:
297  // Expired token
298  input.Add("status", "error");
299  input.Add("reason", "expired_token");
300  break;
301  case kInvalid:
302  // Invalid token
303  input.Add("status", "error");
304  input.Add("reason", "invalid_token");
305  break;
306  case kValid:
307  // All ok
308  input.Add("status", "ok");
309  input.Add("path", path);
310  break;
311  default:
312  // Should not be reached
314  "HandleCheckToken: Unknown value received. Exiting.");
315  }
316 
317  std::string json = input.GenerateString();
318  *reply = json;
319 
320  return true;
321 }
322 
323 // This is a special handler. We need to continue reading the payload from the
324 // fdin_
325 bool Reactor::HandleSubmitPayload(int fdin, const std::string& req,
326  std::string* reply) {
327  if (!reply) {
328  PANIC(kLogSyslogErr, "HandleSubmitPayload: Invalid reply pointer.");
329  }
330 
331  // Extract the Path (used for verification), Digest and DigestSize from the
332  // request JSON.
334  if (!req_json.IsValid()) {
336  "HandleSubmitPayload: Invalid JSON request.");
337  return false;
338  }
339 
340  const JSON* path_json =
341  JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING);
342  const JSON* digest_json =
343  JsonDocument::SearchInObject(req_json->root(), "digest", JSON_STRING);
344  const JSON* header_size_json =
345  JsonDocument::SearchInObject(req_json->root(), "header_size", JSON_INT);
346 
347  if (path_json == NULL || digest_json == NULL || header_size_json == NULL) {
349  "HandleSubmitPayload: Missing fields in request.");
350  return false;
351  }
352 
353  perf::Statistics statistics;
354 
356  proc->SetStatistics(&statistics);
357  JsonStringGenerator reply_input;
359  proc->Process(fdin, digest_json->string_value, path_json->string_value,
360  header_size_json->int_value);
361 
362  switch (res) {
364  reply_input.Add("status", "error");
365  reply_input.Add("reason", "path_violation");
366  break;
368  reply_input.Add("status", "error");
369  reply_input.Add("reason", "other_error");
370  break;
372  reply_input.Add("status", "error");
373  reply_input.Add("reason", "uploader_error");
374  break;
376  reply_input.Add("status", "ok");
377  break;
378  default:
380  "HandleSubmitPayload: Unknown value of PayloadProcessor::Result "
381  "encountered.");
382  break;
383  }
384 
385  // HandleSubmitPayload sends partial statistics back to the gateway
386  std::string stats_json = statistics.PrintJSON();
387  reply_input.AddJsonObject("statistics", stats_json);
388 
389  std::string json = reply_input.GenerateString();
390  *reply = json;
391 
392  return true;
393 }
394 
395 
396 bool Reactor::HandleCommit(const std::string& req, std::string* reply) {
397  if (!reply) {
398  PANIC(kLogSyslogErr, "HandleCommit: Invalid reply pointer.");
399  }
400  // Extract the Path from the request JSON.
402  if (!req_json.IsValid()) {
404  "HandleCommit: Invalid JSON request.");
405  return false;
406  }
407 
408  const JSON* lease_path_json =
409  JsonDocument::SearchInObject(req_json->root(), "lease_path", JSON_STRING);
410  const JSON* old_root_hash_json = JsonDocument::SearchInObject(
411  req_json->root(), "old_root_hash", JSON_STRING);
412  const JSON* new_root_hash_json = JsonDocument::SearchInObject(
413  req_json->root(), "new_root_hash", JSON_STRING);
414  const JSON* tag_name_json = JsonDocument::SearchInObject(
415  req_json->root(), "tag_name", JSON_STRING);
416  const JSON* tag_channel_json = JsonDocument::SearchInObject(
417  req_json->root(), "tag_channel", JSON_STRING);
418  const JSON* tag_description_json = JsonDocument::SearchInObject(
419  req_json->root(), "tag_description", JSON_STRING);
420 
421  if (lease_path_json == NULL || old_root_hash_json == NULL ||
422  new_root_hash_json == NULL) {
424  "HandleCommit: Missing fields in request.");
425  return false;
426  }
427 
428  perf::Statistics statistics;
429  std::string start_time;
430  if (!Reactor::ExtractStatsFromReq(req_json, &statistics, &start_time)) {
432  "HandleCommit: Could not extract statistics counters from request");
433  }
434  uint64_t final_revision;
435 
436  // Here we use the path to commit the changes!
438  proc->SetStatistics(&statistics, start_time);
439  shash::Any old_root_hash = shash::MkFromSuffixedHexPtr(
440  shash::HexPtr(old_root_hash_json->string_value));
441  shash::Any new_root_hash = shash::MkFromSuffixedHexPtr(
442  shash::HexPtr(new_root_hash_json->string_value));
443  RepositoryTag repo_tag(tag_name_json->string_value,
444  tag_channel_json->string_value,
445  tag_description_json->string_value);
446  CommitProcessor::Result res = proc->Process(lease_path_json->string_value,
447  old_root_hash, new_root_hash,
448  repo_tag, &final_revision);
449 
450  JsonStringGenerator reply_input;
451  switch (res) {
453  reply_input.Add("status", "ok");
454  reply_input.Add("final_revision", static_cast<int>(final_revision));
455  break;
457  reply_input.Add("status", "error");
458  reply_input.Add("reason", "miscellaneous");
459  break;
461  reply_input.Add("status", "error");
462  reply_input.Add("reason", "merge_error");
463  break;
465  reply_input.Add("status", "error");
466  reply_input.Add("reason", "missing_reflog");
467  break;
468  default:
470  "Unknown value of CommitProcessor::Result encountered.");
471  break;
472  }
473 
474  std::string json = reply_input.GenerateString();
475  *reply = json;
476 
477  return true;
478 }
479 
481  return new PayloadProcessor();
482 }
483 
485  return new CommitProcessor();
486 }
487 
488 bool Reactor::HandleRequest(Request req, const std::string& data) {
489  bool ok = true;
490  std::string reply;
491  try {
492  switch (req) {
493  case kQuit:
494  ok = WriteReply(fdout_, "ok");
495  break;
496  case kEcho:
497  ok = WriteReply(fdout_, std::string("PID: ") + StringifyUint(getpid()));
498  break;
499  case kGenerateToken:
500  ok &= HandleGenerateToken(data, &reply);
501  ok &= WriteReply(fdout_, reply);
502  break;
503  case kGetTokenId:
504  ok &= HandleGetTokenId(data, &reply);
505  ok &= WriteReply(fdout_, reply);
506  break;
507  case kCheckToken:
508  ok &= HandleCheckToken(data, &reply);
509  ok &= WriteReply(fdout_, reply);
510  break;
511  case kSubmitPayload:
512  ok &= HandleSubmitPayload(fdin_, data, &reply);
513  ok &= WriteReply(fdout_, reply);
514  break;
515  case kCommit:
516  ok &= HandleCommit(data, &reply);
517  ok &= WriteReply(fdout_, reply);
518  break;
519  case kTestCrash:
521  "Crash for test purposes. Should never happen in production "
522  "environment.");
523  break;
524  case kError:
526  "Reactor: unknown command received.");
527  ok = false;
528  break;
529  default:
530  break;
531  }
532  } catch (const ECvmfsException &e) {
533  reply.clear();
534 
535  std::string error("runtime error: ");
536  error += e.what();
537 
538  JsonStringGenerator input;
539  input.Add("status", "error");
540  input.Add("reason", error);
541 
542  reply = input.GenerateString();
543  WriteReply(fdout_, reply);
544  throw e;
545  }
546 
547  return ok;
548 }
549 
550 } // namespace receiver
#define LogCvmfs(source, mask,...)
Definition: logging.h:20
perf::Counter * n_chunks_duplicated
void Add(const std::string &key, const std::string &val)
int64_t Xadd(class Counter *counter, const int64_t delta)
Definition: statistics.h:51
virtual bool HandleCommit(const std::string &req, std::string *reply)
Definition: reactor.cc:396
virtual CommitProcessor * MakeCommitProcessor()
Definition: reactor.cc:484
static JSON * SearchInObject(const JSON *json_object, const std::string &name, const json_type type)
#define PANIC(...)
Definition: exception.h:26
virtual bool HandleCheckToken(const std::string &req, std::string *reply)
Definition: reactor.cc:268
bool HandleRequest(Request req, const std::string &data)
Definition: reactor.cc:488
bool GenerateSessionToken(const std::string &key_id, const std::string &path, uint64_t max_lease_time, std::string *session_token, std::string *public_token_id, std::string *token_secret)
virtual bool HandleGetTokenId(const std::string &req, std::string *reply)
Definition: reactor.cc:248
bool SafeWrite(int fd, const void *buf, size_t nbyte)
Definition: posix.cc:1919
Any MkFromSuffixedHexPtr(const HexPtr hex)
Definition: hash.cc:105
std::string StringifyUint(const uint64_t value)
Definition: string.cc:83
static bool ReadReply(int fd, std::string *data)
Definition: reactor.cc:86
virtual ~Reactor()
Definition: reactor.cc:184
int64_t String2Int64(const string &value)
Definition: string.cc:221
void SetStatistics(perf::Statistics *st, std::string start_time)
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
Definition: posix.cc:1977
static JsonDocument * Create(const std::string &text)
virtual bool HandleGenerateToken(const std::string &req, std::string *reply)
Definition: reactor.cc:202
perf::Counter * sz_uploaded_catalog_bytes
Result Process(int fdin, const std::string &header_digest, const std::string &path, uint64_t header_size)
static bool ExtractStatsFromReq(JsonDocument *req, perf::Statistics *stats, std::string *start_time)
Definition: reactor.cc:121
bool GetTokenPublicId(const std::string &token, std::string *public_id)
static Request ReadRequest(int fd, std::string *data)
Definition: reactor.cc:36
TokenCheckResult CheckToken(const std::string &token, const std::string &secret, std::string *lease_path)
static bool WriteReply(int fd, const std::string &data)
Definition: reactor.cc:106
perf::Counter * n_chunks_added
std::string PrintJSON()
Definition: statistics.cc:106
Result Process(const std::string &lease_path, const shash::Any &old_root_hash, const shash::Any &new_root_hash, const RepositoryTag &tag, uint64_t *final_revision)
perf::Counter * n_catalogs_added
virtual PayloadProcessor * MakePayloadProcessor()
Definition: reactor.cc:480
virtual bool HandleSubmitPayload(int fdin, const std::string &req, std::string *reply)
Definition: reactor.cc:325
static bool WriteRequest(int fd, Request req, const std::string &data)
Definition: reactor.cc:70
std::string GenerateString() const
void SetStatistics(perf::Statistics *st)
void AddJsonObject(const std::string &key, const std::string &json)
struct json_value JSON
Definition: helper_allow.cc:11
Reactor(int fdin, int fdout)
Definition: reactor.cc:182
const JSON * root() const
Definition: json_document.h:25
perf::Counter * sz_uploaded_bytes