GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/receiver/payload_processor.cc Lines: 23 114 20.2 %
Date: 2019-02-03 02:48:13 Branches: 8 54 14.8 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "payload_processor.h"
6
7
#include <fcntl.h>
8
#include <unistd.h>
9
#include <vector>
10
11
#include "logging.h"
12
#include "params.h"
13
#include "util/posix.h"
14
#include "util/string.h"
15
16
namespace {
17
18
const size_t kConsumerBuffer = 10 * 1024 * 1024;  // 10 MB
19
20
}
21
22
namespace receiver {
23
24
FileInfo::FileInfo()
25
  : total_size(0),
26
    current_size(0),
27
    hash_context(),
28
    hash_buffer(),
29
    skip(false)
30
{}
31
32
FileInfo::FileInfo(const ObjectPackBuild::Event& event)
33
  : temp_path(),
34
    total_size(event.size),
35
    current_size(0),
36
    hash_context(shash::ContextPtr(event.id.algorithm)),
37
    hash_buffer(hash_context.size, 0),
38
    skip(false)
39
{
40
  hash_context.buffer = &hash_buffer[0];
41
  shash::Init(hash_context);
42
}
43
44
FileInfo::FileInfo(const FileInfo& other)
45
  : temp_path(other.temp_path),
46
    total_size(other.total_size),
47
    current_size(other.current_size),
48
    hash_context(other.hash_context),
49
    hash_buffer(other.hash_buffer),
50
    skip(other.skip)
51
{
52
  hash_context.buffer = &hash_buffer[0];
53
}
54
55
FileInfo& FileInfo::operator=(const FileInfo& other) {
56
  temp_path = other.temp_path;
57
  total_size = other.total_size;
58
  current_size = other.current_size;
59
  hash_context = other.hash_context;
60
  hash_buffer = other.hash_buffer;
61
  hash_context.buffer = &hash_buffer[0];
62
  skip = other.skip;
63
64
  return *this;
65
}
66
67
2
PayloadProcessor::PayloadProcessor()
68
    : pending_files_(),
69
      current_repo_(),
70
      spooler_(),
71
      temp_dir_(),
72
2
      num_errors_(0) {}
73
74
2
PayloadProcessor::~PayloadProcessor() {}
75
76
2
PayloadProcessor::Result PayloadProcessor::Process(
77
    int fdin, const std::string& header_digest, const std::string& path,
78
    uint64_t header_size) {
79
  LogCvmfs(kLogReceiver, kLogSyslog,
80
           "PayloadProcessor - lease_path: %s, header digest: %s, header "
81
           "size: %ld",
82
2
           path.c_str(), header_digest.c_str(), header_size);
83
84
2
  const size_t first_slash_idx = path.find('/', 0);
85
86
2
  current_repo_ = path.substr(0, first_slash_idx);
87
88
2
  Result init_result = Initialize();
89
2
  if (init_result != kSuccess) {
90
    return init_result;
91
  }
92
93
  // Set up object pack deserialization
94
2
  shash::Any digest = shash::MkFromHexPtr(shash::HexPtr(header_digest));
95
96
2
  ObjectPackConsumer deserializer(digest, header_size);
97
2
  deserializer.RegisterListener(&PayloadProcessor::ConsumerEventCallback, this);
98
99
2
  int nb = 0;
100
2
  ObjectPackBuild::State consumer_state = ObjectPackBuild::kStateContinue;
101
2
  std::vector<unsigned char> buffer(kConsumerBuffer, 0);
102

2
  do {
103
2
    nb = read(fdin, &buffer[0], buffer.size());
104
2
    consumer_state = deserializer.ConsumeNext(nb, &buffer[0]);
105

2
    if (consumer_state != ObjectPackBuild::kStateContinue &&
106
        consumer_state != ObjectPackBuild::kStateDone) {
107
      LogCvmfs(kLogReceiver, kLogSyslogErr,
108
               "PayloadProcessor - error: %d encountered when consuming object "
109
               "pack.",
110
               consumer_state);
111
      break;
112
    }
113
  } while (nb > 0 && consumer_state != ObjectPackBuild::kStateDone);
114
115
2
  assert(pending_files_.empty());
116
117
2
  Result res = Finalize();
118
119
2
  deserializer.UnregisterListeners();
120
121
2
  return res;
122
}
123
124
void PayloadProcessor::ConsumerEventCallback(
125
    const ObjectPackBuild::Event& event) {
126
  std::string path("");
127
128
  if (event.object_type == ObjectPack::kCas) {
129
    path = event.id.MakePath();
130
  } else if (event.object_type == ObjectPack::kNamed) {
131
    path = event.object_name;
132
  } else {
133
    // kEmpty - this is an error.
134
    LogCvmfs(kLogReceiver, kLogSyslogErr,
135
             "PayloadProcessor - error: Event received with unknown object.");
136
    num_errors_++;
137
    return;
138
  }
139
140
  FileIterator it = pending_files_.find(event.id);
141
  if (it == pending_files_.end()) {
142
    FileInfo info(event);
143
144
    // If the file already exists in the repository, don't create a temp file,
145
    // mark it to be skipped in the FileInfo, but keep track of the number of
146
    // bytes currently written
147
    if (spooler_->Peek("data/" + path)) {
148
      LogCvmfs(
149
          kLogReceiver, kLogDebug,
150
          "PayloadProcessor - file %s already exists at destination. "
151
          "Marking it to be skipped.",
152
          path.c_str());
153
      info.skip = true;
154
    } else {
155
      // New file to unpack
156
      const std::string tmp_path =
157
          CreateTempPath(temp_dir_->dir() + "/payload", 0666);
158
      if (tmp_path.empty()) {
159
        LogCvmfs(kLogReceiver, kLogSyslogErr,
160
                "PayloadProcessor - error: Unable to create temporary path.");
161
        num_errors_++;
162
        return;
163
      }
164
      info.temp_path = tmp_path;
165
    }
166
167
    pending_files_[event.id] = info;
168
  }
169
170
  FileInfo& info = pending_files_[event.id];
171
172
  if (!info.skip) {
173
    int fdout =
174
        open(info.temp_path.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0600);
175
    if (fdout == -1) {
176
      LogCvmfs(
177
          kLogReceiver, kLogSyslogErr,
178
          "PayloadProcessor - error: Unable to open temporary output file: %s",
179
          info.temp_path.c_str());
180
      return;
181
    }
182
183
    if (!WriteFile(fdout, event.buf, event.buf_size)) {
184
      LogCvmfs(kLogReceiver, kLogSyslogErr,
185
               "PayloadProcessor - error: Unable to write %s",
186
               info.temp_path.c_str());
187
      num_errors_++;
188
      unlink(info.temp_path.c_str());
189
      close(fdout);
190
      return;
191
    }
192
    close(fdout);
193
194
    shash::Update(static_cast<const unsigned char*>(event.buf),
195
                  event.buf_size,
196
                  info.hash_context);
197
  }
198
199
  info.current_size += event.buf_size;
200
201
  if (info.current_size == info.total_size) {
202
    if (!info.skip) {
203
      shash::Any file_hash(event.id.algorithm);
204
      shash::Final(info.hash_context, &file_hash);
205
206
      if (file_hash != event.id) {
207
        LogCvmfs(
208
            kLogReceiver, kLogSyslogErr,
209
            "PayloadProcessor - error: Hash mismatch for unpacked file: event "
210
            "size: %ld, file size: %ld, event hash: %s, file hash: %s",
211
            event.size, GetFileSize(info.temp_path),
212
            event.id.ToString(true).c_str(), file_hash.ToString(true).c_str());
213
        num_errors_++;
214
        return;
215
      }
216
217
      Upload(info.temp_path, "data/" + path);
218
    }
219
220
    pending_files_.erase(event.id);
221
  }
222
}
223
224
PayloadProcessor::Result PayloadProcessor::Initialize() {
225
  Params params;
226
  if (!GetParamsFromFile(current_repo_, &params)) {
227
    LogCvmfs(
228
        kLogReceiver, kLogSyslogErr,
229
        "PayloadProcessor - error: Could not get configuration parameters.");
230
    return kOtherError;
231
  }
232
233
  const std::string spooler_temp_dir =
234
      GetSpoolerTempDir(params.spooler_configuration);
235
  assert(!spooler_temp_dir.empty());
236
  assert(MkdirDeep(spooler_temp_dir + "/receiver", 0770, true));
237
  temp_dir_ =
238
      RaiiTempDir::Create(spooler_temp_dir + "/receiver/payload_processor");
239
240
  upload::SpoolerDefinition definition(
241
      params.spooler_configuration, params.hash_alg, params.compression_alg,
242
      params.generate_legacy_bulk_chunks, params.use_file_chunking,
243
      params.min_chunk_size, params.avg_chunk_size, params.max_chunk_size,
244
      "dummy_token", "dummy_key");
245
246
  spooler_.Destroy();
247
  spooler_ = upload::Spooler::Construct(definition);
248
249
  return kSuccess;
250
}
251
252
PayloadProcessor::Result PayloadProcessor::Finalize() {
253
  spooler_->WaitForUpload();
254
  temp_dir_.Destroy();
255
256
  const unsigned num_spooler_errors = spooler_->GetNumberOfErrors();
257
  if (num_spooler_errors > 0) {
258
    LogCvmfs(kLogReceiver, kLogSyslogErr,
259
             "PayloadProcessor - error: Spooler - %d upload(s) failed.",
260
             num_spooler_errors);
261
    return kSpoolerError;
262
  }
263
264
  if (GetNumErrors() > 0) {
265
    LogCvmfs(kLogReceiver, kLogSyslogErr,
266
             "PayloadProcessor - error: % unpacking error(s).", GetNumErrors());
267
    return kOtherError;
268
  }
269
270
  return kSuccess;
271
}
272
273
void PayloadProcessor::Upload(const std::string& source,
274
                              const std::string& dest) {
275
  spooler_->Upload(source, dest);
276
}
277
278
bool PayloadProcessor::WriteFile(int fd, const void* const buf,
279
                                 size_t buf_size) {
280
  return SafeWrite(fd, buf, buf_size);
281
}
282
283
}  // namespace receiver