GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/receiver/reactor.cc Lines: 130 228 57.0 %
Date: 2019-02-03 02:48:13 Branches: 40 91 44.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "reactor.h"
6
7
#include <stdint.h>
8
#include <unistd.h>
9
#include <cstdlib>
10
#include <cstring>
11
#include <utility>
12
#include <vector>
13
14
#include "commit_processor.h"
15
#include "json_document.h"
16
#include "logging.h"
17
#include "payload_processor.h"
18
#include "repository_tag.h"
19
#include "session_token.h"
20
#include "util/pointer.h"
21
#include "util/posix.h"
22
#include "util/string.h"
23
24
namespace receiver {
25
26
9
Reactor::Request Reactor::ReadRequest(int fd, std::string* data) {
27
  using namespace receiver;  // NOLINT
28
29
  // First, read the command identifier
30
9
  int32_t req_id = 0;
31
9
  int nb = SafeRead(fd, &req_id, 4);
32
33
9
  if (nb != 4) {
34
    return kError;
35
  }
36
37
  // Then, read message size
38
9
  int32_t msg_size = 0;
39
9
  nb = SafeRead(fd, &msg_size, 4);
40
41

9
  if (req_id == kError || nb != 4) {
42
    return kError;
43
  }
44
45
  // Finally read the message body
46
9
  if (msg_size > 0) {
47
6
    std::vector<char> buffer(msg_size);
48
6
    nb = SafeRead(fd, &buffer[0], msg_size);
49
50
6
    if (nb != msg_size) {
51
      return kError;
52
    }
53
54
6
    *data = std::string(&buffer[0], msg_size);
55
6
    return static_cast<Request>(req_id);
56
  }
57
58
3
  return kQuit;
59
}
60
61
9
bool Reactor::WriteRequest(int fd, Request req, const std::string& data) {
62
9
  const int32_t msg_size = data.size();
63
9
  const int32_t total_size = 8 + data.size();  // req + msg_size + data
64
65
9
  std::vector<char> buffer(total_size);
66
67
9
  memcpy(&buffer[0], &req, 4);
68
9
  memcpy(&buffer[4], &msg_size, 4);
69
70
9
  if (!data.empty()) {
71
6
    memcpy(&buffer[8], &data[0], data.size());
72
  }
73
74
9
  return SafeWrite(fd, &buffer[0], total_size);
75
}
76
77
9
bool Reactor::ReadReply(int fd, std::string* data) {
78
9
  int32_t msg_size(0);
79
9
  int nb = SafeRead(fd, &msg_size, 4);
80
81
9
  if (nb != 4) {
82
    return false;
83
  }
84
85
9
  std::vector<char> buffer(msg_size);
86
9
  nb = SafeRead(fd, &buffer[0], msg_size);
87
88
9
  if (nb != msg_size) {
89
    return false;
90
  }
91
92
9
  *data = std::string(&buffer[0], msg_size);
93
94
9
  return true;
95
}
96
97
9
bool Reactor::WriteReply(int fd, const std::string& data) {
98
9
  const int32_t msg_size = data.size();
99
9
  const int32_t total_size = 4 + data.size();
100
101
9
  std::vector<char> buffer(total_size);
102
103
9
  memcpy(&buffer[0], &msg_size, 4);
104
105
9
  if (!data.empty()) {
106
9
    memcpy(&buffer[4], &data[0], data.size());
107
  }
108
109
9
  return SafeWrite(fd, &buffer[0], total_size);
110
}
111
112
3
Reactor::Reactor(int fdin, int fdout) : fdin_(fdin), fdout_(fdout) {}
113
114
3
Reactor::~Reactor() {}
115
116
3
bool Reactor::Run() {
117
3
  std::string msg_body;
118
3
  Request req = kQuit;
119
9
  do {
120
9
    msg_body.clear();
121
9
    req = ReadRequest(fdin_, &msg_body);
122
9
    if (!HandleRequest(req, msg_body)) {
123
      LogCvmfs(kLogReceiver, kLogSyslogErr,
124
               "Reactor: could not handle request %d. Exiting", req);
125
      return false;
126
    }
127
  } while (req != kQuit);
128
129
3
  return true;
130
}
131
132
2
bool Reactor::HandleGenerateToken(const std::string& req, std::string* reply) {
133
2
  if (reply == NULL) {
134
    LogCvmfs(kLogReceiver, kLogSyslogErr,
135
             "HandleGenerateToken: Invalid reply pointer.");
136
    abort();
137
  }
138
139
2
  UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
140
2
  if (!req_json.IsValid()) {
141
    LogCvmfs(kLogReceiver, kLogSyslogErr,
142
             "HandleGenerateToken: Invalid JSON request.");
143
    return false;
144
  }
145
146
  const JSON* key_id =
147
2
      JsonDocument::SearchInObject(req_json->root(), "key_id", JSON_STRING);
148
  const JSON* path =
149
2
      JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING);
150
  const JSON* max_lease_time = JsonDocument::SearchInObject(
151
2
      req_json->root(), "max_lease_time", JSON_INT);
152
153

2
  if (key_id == NULL || path == NULL || max_lease_time == NULL) {
154
    LogCvmfs(kLogReceiver, kLogSyslogErr,
155
             "HandleGenerateToken: Missing fields in request.");
156
    return false;
157
  }
158
159
2
  std::string session_token;
160
2
  std::string public_token_id;
161
2
  std::string token_secret;
162
163
2
  if (!GenerateSessionToken(key_id->string_value, path->string_value,
164
                            max_lease_time->int_value, &session_token,
165
                            &public_token_id, &token_secret)) {
166
    LogCvmfs(kLogReceiver, kLogSyslogErr,
167
             "HandleGenerateToken: Could not generate session token.");
168
    return false;
169
  }
170
171
2
  JsonStringInput input;
172
2
  input.push_back(std::make_pair("token", session_token.c_str()));
173
2
  input.push_back(std::make_pair("id", public_token_id.c_str()));
174
2
  input.push_back(std::make_pair("secret", token_secret.c_str()));
175
176
2
  ToJsonString(input, reply);
177
178
2
  return true;
179
}
180
181
1
bool Reactor::HandleGetTokenId(const std::string& req, std::string* reply) {
182
1
  if (reply == NULL) {
183
    LogCvmfs(kLogReceiver, kLogSyslogErr,
184
             "HandleGetTokenId: Invalid reply pointer.");
185
    abort();
186
  }
187
188
1
  std::string token_id;
189
1
  JsonStringInput input;
190
1
  if (!GetTokenPublicId(req, &token_id)) {
191
    input.push_back(std::make_pair("status", "error"));
192
    input.push_back(std::make_pair("reason", "invalid_token"));
193
  } else {
194
1
    input.push_back(std::make_pair("status", "ok"));
195
1
    input.push_back(std::make_pair("id", token_id.c_str()));
196
  }
197
198
1
  ToJsonString(input, reply);
199
1
  return true;
200
}
201
202
1
bool Reactor::HandleCheckToken(const std::string& req, std::string* reply) {
203
1
  if (reply == NULL) {
204
    LogCvmfs(kLogReceiver, kLogSyslogErr,
205
             "HandleCheckToken: Invalid reply pointer.");
206
    abort();
207
  }
208
209
1
  UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
210
1
  if (!req_json.IsValid()) {
211
    LogCvmfs(kLogReceiver, kLogSyslogErr,
212
             "HandleCheckToken: Invalid JSON request.");
213
    return false;
214
  }
215
216
  const JSON* token =
217
1
      JsonDocument::SearchInObject(req_json->root(), "token", JSON_STRING);
218
  const JSON* secret =
219
1
      JsonDocument::SearchInObject(req_json->root(), "secret", JSON_STRING);
220
221

1
  if (token == NULL || secret == NULL) {
222
    LogCvmfs(kLogReceiver, kLogSyslogErr,
223
             "HandleCheckToken: Missing fields in request.");
224
    return false;
225
  }
226
227
1
  std::string path;
228
1
  JsonStringInput input;
229
  TokenCheckResult ret =
230
1
      CheckToken(token->string_value, secret->string_value, &path);
231

1
  switch (ret) {
232
    case kExpired:
233
      // Expired token
234
      input.push_back(std::make_pair("status", "error"));
235
      input.push_back(std::make_pair("reason", "expired_token"));
236
      break;
237
    case kInvalid:
238
      // Invalid token
239
      input.push_back(std::make_pair("status", "error"));
240
      input.push_back(std::make_pair("reason", "invalid_token"));
241
      break;
242
    case kValid:
243
      // All ok
244
1
      input.push_back(std::make_pair("status", "ok"));
245
1
      input.push_back(std::make_pair("path", path.c_str()));
246
1
      break;
247
    default:
248
      // Should not be reached
249
      LogCvmfs(kLogReceiver, kLogSyslogErr,
250
               "HandleCheckToken: Unknown value received. Exiting.");
251
      abort();
252
  }
253
254
1
  ToJsonString(input, reply);
255
1
  return true;
256
}
257
258
// This is a special handler. We need to continue reading the payload from the
259
// fdin_
260
1
bool Reactor::HandleSubmitPayload(int fdin, const std::string& req,
261
                                  std::string* reply) {
262
1
  if (!reply) {
263
    LogCvmfs(kLogReceiver, kLogSyslogErr,
264
             "HandleSubmitPayload: Invalid reply pointer.");
265
    abort();
266
  }
267
268
  // Extract the Path (used for verification), Digest and DigestSize from the
269
  // request JSON.
270
1
  UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
271
1
  if (!req_json.IsValid()) {
272
    LogCvmfs(kLogReceiver, kLogSyslogErr,
273
             "HandleSubmitPayload: Invalid JSON request.");
274
    return false;
275
  }
276
277
  const JSON* path_json =
278
1
      JsonDocument::SearchInObject(req_json->root(), "path", JSON_STRING);
279
  const JSON* digest_json =
280
1
      JsonDocument::SearchInObject(req_json->root(), "digest", JSON_STRING);
281
  const JSON* header_size_json =
282
1
      JsonDocument::SearchInObject(req_json->root(), "header_size", JSON_INT);
283
284

1
  if (path_json == NULL || digest_json == NULL || header_size_json == NULL) {
285
    LogCvmfs(kLogReceiver, kLogSyslogErr,
286
             "HandleSubmitPayload: Missing fields in request.");
287
    return false;
288
  }
289
290
1
  UniquePtr<PayloadProcessor> proc(MakePayloadProcessor());
291
1
  JsonStringInput reply_input;
292
  PayloadProcessor::Result res =
293
      proc->Process(fdin, digest_json->string_value, path_json->string_value,
294
1
                    header_size_json->int_value);
295
296

1
  switch (res) {
297
    case PayloadProcessor::kPathViolation:
298
      reply_input.push_back(std::make_pair("status", "error"));
299
      reply_input.push_back(std::make_pair("reason", "path_violation"));
300
      break;
301
    case PayloadProcessor::kOtherError:
302
      reply_input.push_back(std::make_pair("status", "error"));
303
      reply_input.push_back(std::make_pair("reason", "other_error"));
304
      break;
305
    case PayloadProcessor::kSpoolerError:
306
      reply_input.push_back(std::make_pair("status", "error"));
307
      reply_input.push_back(std::make_pair("reason", "spooler_error"));
308
      break;
309
    case PayloadProcessor::kSuccess:
310
1
      reply_input.push_back(std::make_pair("status", "ok"));
311
1
      break;
312
    default:
313
      LogCvmfs(kLogReceiver, kLogSyslogErr,
314
               "HandleSubmitPayload: Unknown value of PayloadProcessor::Result "
315
               "encountered.");
316
      abort();
317
      break;
318
  }
319
320
1
  ToJsonString(reply_input, reply);
321
322
1
  return true;
323
}
324
325
bool Reactor::HandleCommit(const std::string& req, std::string* reply) {
326
  if (!reply) {
327
    LogCvmfs(kLogReceiver, kLogSyslogErr,
328
             "HandleCommit: Invalid reply pointer.");
329
    abort();
330
  }
331
332
  // Extract the Path from the request JSON.
333
  UniquePtr<JsonDocument> req_json(JsonDocument::Create(req));
334
  if (!req_json.IsValid()) {
335
    LogCvmfs(kLogReceiver, kLogSyslogErr,
336
             "HandleCommit: Invalid JSON request.");
337
    return false;
338
  }
339
340
  const JSON* lease_path_json =
341
      JsonDocument::SearchInObject(req_json->root(), "lease_path", JSON_STRING);
342
  const JSON* old_root_hash_json = JsonDocument::SearchInObject(
343
      req_json->root(), "old_root_hash", JSON_STRING);
344
  const JSON* new_root_hash_json = JsonDocument::SearchInObject(
345
      req_json->root(), "new_root_hash", JSON_STRING);
346
  const JSON* tag_name_json = JsonDocument::SearchInObject(
347
      req_json->root(), "tag_name", JSON_STRING);
348
  const JSON* tag_channel_json = JsonDocument::SearchInObject(
349
    req_json->root(), "tag_channel", JSON_STRING);
350
  const JSON* tag_description_json = JsonDocument::SearchInObject(
351
    req_json->root(), "tag_description", JSON_STRING);
352
353
  if (lease_path_json == NULL || old_root_hash_json == NULL ||
354
      new_root_hash_json == NULL) {
355
    LogCvmfs(kLogReceiver, kLogSyslogErr,
356
             "HandleCommit: Missing fields in request.");
357
    return false;
358
  }
359
360
  // Here we use the path to commit the changes!
361
  UniquePtr<CommitProcessor> proc(MakeCommitProcessor());
362
  shash::Any old_root_hash = shash::MkFromSuffixedHexPtr(
363
      shash::HexPtr(old_root_hash_json->string_value));
364
  shash::Any new_root_hash = shash::MkFromSuffixedHexPtr(
365
      shash::HexPtr(new_root_hash_json->string_value));
366
  RepositoryTag repo_tag(tag_name_json->string_value,
367
                         tag_channel_json->string_value,
368
                         tag_description_json->string_value);
369
  CommitProcessor::Result res = proc->Process(lease_path_json->string_value,
370
                                              old_root_hash, new_root_hash,
371
                                              repo_tag);
372
373
  JsonStringInput reply_input;
374
  switch (res) {
375
    case CommitProcessor::kSuccess:
376
      reply_input.push_back(std::make_pair("status", "ok"));
377
      break;
378
    case CommitProcessor::kError:
379
      reply_input.push_back(std::make_pair("status", "error"));
380
      reply_input.push_back(std::make_pair("reason", "miscellaneous"));
381
      break;
382
    case CommitProcessor::kMergeFailure:
383
      reply_input.push_back(std::make_pair("status", "error"));
384
      reply_input.push_back(std::make_pair("reason", "merge_error"));
385
      break;
386
    case CommitProcessor::kMissingReflog:
387
      reply_input.push_back(std::make_pair("status", "error"));
388
      reply_input.push_back(std::make_pair("reason", "missing_reflog"));
389
      break;
390
    default:
391
      LogCvmfs(kLogReceiver, kLogSyslogErr,
392
               "Unknown value of CommitProcessor::Result encountered.");
393
      abort();
394
      break;
395
  }
396
397
  ToJsonString(reply_input, reply);
398
399
  return true;
400
}
401
402
PayloadProcessor* Reactor::MakePayloadProcessor() {
403
  return new PayloadProcessor();
404
}
405
406
CommitProcessor* Reactor::MakeCommitProcessor() {
407
  return new CommitProcessor();
408
}
409
410
9
bool Reactor::HandleRequest(Request req, const std::string& data) {
411
9
  bool ok = true;
412
9
  std::string reply;
413


9
  switch (req) {
414
    case kQuit:
415
3
      ok = WriteReply(fdout_, "ok");
416
3
      break;
417
    case kEcho:
418
1
      ok = WriteReply(fdout_, std::string("PID: ") + StringifyUint(getpid()));
419
1
      break;
420
    case kGenerateToken:
421
2
      ok &= HandleGenerateToken(data, &reply);
422
2
      ok &= WriteReply(fdout_, reply);
423
2
      break;
424
    case kGetTokenId:
425
1
      ok &= HandleGetTokenId(data, &reply);
426
1
      ok &= WriteReply(fdout_, reply);
427
1
      break;
428
    case kCheckToken:
429
1
      ok &= HandleCheckToken(data, &reply);
430
1
      ok &= WriteReply(fdout_, reply);
431
1
      break;
432
    case kSubmitPayload:
433
1
      ok &= HandleSubmitPayload(fdin_, data, &reply);
434
1
      ok &= WriteReply(fdout_, reply);
435
1
      break;
436
    case kCommit:
437
      ok &= HandleCommit(data, &reply);
438
      ok &= WriteReply(fdout_, reply);
439
      break;
440
    case kError:
441
      LogCvmfs(kLogReceiver, kLogSyslogErr,
442
               "Reactor: unknown command received.");
443
      ok = false;
444
      break;
445
    default:
446
      break;
447
  }
448
449
9
  return ok;
450
}
451
452
}  // namespace receiver