GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/pack.cc Lines: 236 253 93.3 %
Date: 2019-02-03 02:48:13 Branches: 126 185 68.1 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "pack.h"
6
7
#include <algorithm>
8
#include <cassert>
9
#include <cstring>
10
#include <map>
11
12
#include "platform.h"
13
#include "smalloc.h"
14
#include "util/string.h"
15
#include "util_concurrency.h"
16
17
using namespace std;  // NOLINT
18
19
namespace {  // some private utility functions used by ObjectPackProducer
20
21
80
void InitializeHeader(const int version, const int num_objects,
22
                      const size_t pack_size, std::string *header) {
23
80
  if (header) {
24
80
    *header = "V" + StringifyInt(version) + "\n";
25
80
    *header += "S" + StringifyInt(pack_size) + "\n";
26
80
    *header += "N" + StringifyInt(num_objects) + "\n";
27
80
    *header += "--\n";
28
  }
29
80
}
30
31
6658
void AppendItemToHeader(ObjectPack::BucketContentType object_type,
32
                        const std::string &hash_str, const size_t object_size,
33
                        const std::string &object_name, std::string *header) {
34
  // If the item type is kName, the "item_name" parameter should not be empty
35
  assert((object_type == ObjectPack::kCas) ||
36

6658
         ((object_type == ObjectPack::kNamed) && (!object_name.empty())));
37
6658
  std::string line_prefix = "";
38
6658
  std::string line_suffix = "";
39
6658
  switch (object_type) {
40
    case ObjectPack::kNamed:
41
3
      line_prefix = "N ";
42
3
      line_suffix = std::string(" ") + Base64Url(object_name);
43
3
      break;
44
    case ObjectPack::kCas:
45
6655
      line_prefix = "C ";
46
6655
      break;
47
    default:
48
      LogCvmfs(kLogCvmfs, kLogStderr,
49
               "Unknown object pack type to be added to header.");
50
      abort();
51
  }
52
6658
  if (header) {
53
    *header += line_prefix + hash_str + " " + StringifyInt(object_size) +
54
6658
               line_suffix + "\n";
55
  }
56
6658
}
57
58
}  // namespace
59
60
100382
ObjectPack::Bucket::Bucket()
61
    : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize))),
62
      size(0),
63
      capacity(kInitialSize),
64
      content_type(kEmpty),
65
100382
      name() {}
66
67
371
void ObjectPack::Bucket::Add(const void *buf, const uint64_t buf_size) {
68
371
  if (buf_size == 0) return;
69
70
4919
  while (size + buf_size > capacity) {
71
4177
    capacity *= 2;
72
4177
    content = reinterpret_cast<unsigned char *>(srealloc(content, capacity));
73
  }
74
371
  memcpy(content + size, buf, buf_size);
75
371
  size += buf_size;
76
}
77
78
100382
ObjectPack::Bucket::~Bucket() { free(content); }
79
80
//------------------------------------------------------------------------------
81
82
67
ObjectPack::ObjectPack(const uint64_t limit) : limit_(limit), size_(0) {
83
67
  InitLock();
84
67
}
85
86
66
ObjectPack::~ObjectPack() {
87
198
  for (std::set<BucketHandle>::const_iterator i = open_buckets_.begin(),
88
66
                                              iEnd = open_buckets_.end();
89
       i != iEnd; ++i) {
90
66
    delete *i;
91
  }
92
93

66
  for (unsigned i = 0; i < buckets_.size(); ++i) delete buckets_[i];
94
66
  pthread_mutex_destroy(lock_);
95
66
  free(lock_);
96
66
}
97
98
93
void ObjectPack::AddToBucket(const void *buf, const uint64_t size,
99
                             const ObjectPack::BucketHandle handle) {
100
93
  handle->Add(buf, size);
101
93
}
102
103
100381
ObjectPack::BucketHandle ObjectPack::NewBucket() {
104
100381
  BucketHandle handle = new Bucket();
105
106
100381
  MutexLockGuard mutex_guard(lock_);
107
100381
  open_buckets_.insert(handle);
108
100381
  return handle;
109
}
110
111
/**
112
 * Can only fail due to insufficient remaining space in the ObjectPack.
113
 */
114
100395
bool ObjectPack::CommitBucket(const BucketContentType type,
115
                              const shash::Any &id,
116
                              const ObjectPack::BucketHandle handle,
117
                              const std::string &name) {
118
100395
  handle->id = id;
119
120
100395
  handle->content_type = type;
121
100395
  if (type == kNamed) {
122
    handle->name = name;
123
  }
124
125
100395
  MutexLockGuard mutex_guard(lock_);
126
100395
  if (buckets_.size() >= kMaxObjects) return false;
127
100394
  if (size_ + handle->size > limit_) return false;
128
100312
  open_buckets_.erase(handle);
129
100312
  buckets_.push_back(handle);
130
100312
  size_ += handle->size;
131
100312
  return true;
132
}
133
134
3
void ObjectPack::DiscardBucket(const BucketHandle handle) {
135
3
  MutexLockGuard mutex_guard(lock_);
136
3
  open_buckets_.erase(handle);
137
3
  delete handle;
138
3
}
139
140
67
void ObjectPack::InitLock() {
141
67
  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
142
67
  int retval = pthread_mutex_init(lock_, NULL);
143
67
  assert(retval == 0);
144
67
}
145
146
/**
147
 * If a commit failed, an open Bucket can be transferred to another ObjectPack
148
 * with more space.
149
 */
150
23
void ObjectPack::TransferBucket(const ObjectPack::BucketHandle handle,
151
                                ObjectPack *other) {
152
23
  MutexLockGuard mutex_guard(lock_);
153
23
  open_buckets_.erase(handle);
154
23
  other->open_buckets_.insert(handle);
155
23
}
156
157
37728
unsigned char *ObjectPack::BucketContent(size_t idx) const {
158
37728
  assert(idx < buckets_.size());
159
37728
  return buckets_[idx]->content;
160
}
161
162
44383
uint64_t ObjectPack::BucketSize(size_t idx) const {
163
44383
  assert(idx < buckets_.size());
164
44383
  return buckets_[idx]->size;
165
}
166
167
6655
const shash::Any &ObjectPack::BucketId(size_t idx) const {
168
6655
  assert(idx < buckets_.size());
169
6655
  return buckets_[idx]->id;
170
}
171
172
//------------------------------------------------------------------------------
173
174
/**
175
 * Hash over the header.  The hash algorithm needs to be provided by hash.
176
 */
177
73
void ObjectPackProducer::GetDigest(shash::Any *hash) {
178
73
  assert(hash);
179
73
  shash::HashString(header_, hash);
180
73
}
181
182
77
ObjectPackProducer::ObjectPackProducer(ObjectPack *pack)
183
77
    : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) {
184
77
  unsigned N = pack->GetNoObjects();
185
  // rough guess, most likely a little too much
186
77
  header_.reserve(30 + N * (2 * shash::kMaxDigestSize + 5));
187
188
77
  InitializeHeader(2, N, pack->size(), &header_);
189
190
6732
  for (unsigned i = 0; i < N; ++i) {
191
    AppendItemToHeader(ObjectPack::kCas, pack->BucketId(i).ToString(true),
192
6655
                       pack->BucketSize(i), "", &header_);
193
  }
194
}
195
196
3
ObjectPackProducer::ObjectPackProducer(const shash::Any &id, FILE *big_file,
197
                                       const std::string &file_name)
198
3
    : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) {
199
3
  int fd = fileno(big_file_);
200
3
  assert(fd >= 0);
201
  platform_stat64 info;
202
3
  int retval = platform_fstat(fd, &info);
203
3
  assert(retval == 0);
204
205
3
  InitializeHeader(2, 1, info.st_size, &header_);
206
207
  AppendItemToHeader(ObjectPack::kNamed, id.ToString(true), info.st_size,
208
3
                     file_name, &header_);
209
210
3
  rewind(big_file);
211
}
212
213
/**
214
 * Copies as many bytes as possible into buf.  If the returned number of bytes
215
 * is shorter than buf_size, everything has been produced.
216
 */
217
31429
unsigned ObjectPackProducer::ProduceNext(const unsigned buf_size,
218
                                         unsigned char *buf) {
219
  const unsigned remaining_in_header =
220
31429
      (pos_ < header_.size()) ? (header_.size() - pos_) : 0;
221
31429
  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
222
31429
  if (nbytes_header) {
223
330
    memcpy(buf, header_.data() + pos_, nbytes_header);
224
330
    pos_ += nbytes_header;
225
  }
226
227
31429
  unsigned remaining_in_buf = buf_size - nbytes_header;
228
31429
  if (remaining_in_buf == 0) return nbytes_header;
229
31176
  unsigned nbytes_payload = 0;
230
231
31176
  if (big_file_) {
232
19
    size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf, big_file_);
233
19
    nbytes_payload = nbytes;
234
19
    pos_ += nbytes_payload;
235
31157
  } else if (idx_ < pack_->GetNoObjects()) {
236
    // Copy a few buckets more
237

100024
    while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) {
238
      const unsigned remaining_in_bucket =
239
37728
          pack_->BucketSize(idx_) - pos_in_bucket_;
240
37728
      const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket);
241
      memcpy(buf + nbytes_header + nbytes_payload,
242
37728
             pack_->BucketContent(idx_) + pos_in_bucket_, nbytes);
243
244
37728
      pos_in_bucket_ += nbytes;
245
37728
      nbytes_payload += nbytes;
246
37728
      remaining_in_buf -= nbytes;
247
37728
      if (nbytes == remaining_in_bucket) {
248
6655
        pos_in_bucket_ = 0;
249
6655
        idx_++;
250
      }
251
    }
252
  }
253
254
31176
  return nbytes_header + nbytes_payload;
255
}
256
257
//------------------------------------------------------------------------------
258
259
74
ObjectPackConsumer::ObjectPackConsumer(const shash::Any &expected_digest,
260
                                       const unsigned expected_header_size)
261
    : expected_digest_(expected_digest),
262
      expected_header_size_(expected_header_size),
263
      pos_(0),
264
      idx_(0),
265
      pos_in_object_(0),
266
      pos_in_accu_(0),
267
      state_(ObjectPackBuild::kStateContinue),
268
74
      size_(0) {
269
  // Upper limit of 100B per entry
270
74
  if (expected_header_size > (100 * ObjectPack::kMaxObjects)) {
271
    state_ = ObjectPackBuild::kStateHeaderTooBig;
272
    return;
273
  }
274
275
74
  raw_header_.reserve(expected_header_size);
276
}
277
278
/**
279
 * At the end of the function, pos_ will have progressed by buf_size (unless
280
 * the buffer contains trailing garbage bytes.
281
 */
282
31380
ObjectPackBuild::State ObjectPackConsumer::ConsumeNext(
283
    const unsigned buf_size, const unsigned char *buf) {
284
31380
  if (buf_size == 0) return state_;
285
31313
  if (state_ == ObjectPackBuild::kStateDone) {
286
    state_ = ObjectPackBuild::kStateTrailingBytes;
287
    return state_;
288
  }
289
31313
  if (state_ != ObjectPackBuild::kStateContinue) return state_;
290
291
  const unsigned remaining_in_header =
292
31313
      (pos_ < expected_header_size_) ? (expected_header_size_ - pos_) : 0;
293
31313
  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
294
31313
  if (nbytes_header) {
295
245
    raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header);
296
245
    pos_ += nbytes_header;
297
  }
298
299
31313
  if (pos_ < expected_header_size_) return ObjectPackBuild::kStateContinue;
300
301
  // This condition can only be true once through the lifetime of the
302
  // Consumer.
303

31142
  if (nbytes_header && (pos_ == expected_header_size_)) {
304
74
    shash::Any digest(expected_digest_.algorithm);
305
74
    shash::HashString(raw_header_, &digest);
306
74
    if (digest != expected_digest_) {
307
      state_ = ObjectPackBuild::kStateCorrupt;
308
      return state_;
309
    } else {
310
74
      bool retval = ParseHeader();
311
74
      if (!retval) {
312
        state_ = ObjectPackBuild::kStateBadFormat;
313
        return state_;
314
      }
315
      // We don't need the raw string anymore
316
74
      raw_header_.clear();
317
    }
318
319
    // Empty pack?
320

74
    if ((buf_size == nbytes_header) && (index_.size() == 0)) {
321
2
      state_ = ObjectPackBuild::kStateDone;
322
2
      return state_;
323
    }
324
  }
325
326
31140
  unsigned remaining_in_buf = buf_size - nbytes_header;
327
31140
  const unsigned char *payload = buf + nbytes_header;
328
31140
  return ConsumePayload(remaining_in_buf, payload);
329
}
330
331
/**
332
 * Informs listeners for small complete objects.  For large objects, buffers
333
 * the
334
 * input into reasonably sized chunks.  buf can contain both a chunk of data
335
 * that needs to be added to the consumer's accumulator and a bunch of
336
 * complete small objects.  We use the accumulator only if necessary to avoid
337
 * unnecessary memory copies.
338
 */
339
31140
ObjectPackBuild::State ObjectPackConsumer::ConsumePayload(
340
    const unsigned buf_size, const unsigned char *buf) {
341
31140
  uint64_t pos_in_buf = 0;
342

104637
  while ((pos_in_buf < buf_size) && (idx_ < index_.size())) {
343
    // Fill the accumulator or process next small object
344
    uint64_t nbytes;  // How many bytes are consumed in this iteration
345
42357
    const uint64_t remaining_in_buf = buf_size - pos_in_buf;
346
42357
    const uint64_t remaining_in_object = index_[idx_].size - pos_in_object_;
347
42357
    const bool is_small_rest = remaining_in_buf < kAccuSize;
348
349
    // We use the accumulator if there is already something in or if we have a
350
    // small piece of data of a larger object.
351
42357
    nbytes = std::min(remaining_in_object, remaining_in_buf);
352

57288
    if ((pos_in_accu_ > 0) ||
353
        ((remaining_in_buf < remaining_in_object) && is_small_rest)) {
354
14931
      const uint64_t remaining_in_accu = kAccuSize - pos_in_accu_;
355
14931
      nbytes = std::min(remaining_in_accu, nbytes);
356
14931
      memcpy(accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes);
357
14931
      pos_in_accu_ += nbytes;
358

14931
      if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) {
359
        NotifyListeners(ObjectPackBuild::Event(
360
            index_[idx_].id, index_[idx_].size, pos_in_accu_, accumulator_,
361
4774
            index_[idx_].entry_type, index_[idx_].entry_name));
362
4774
        pos_in_accu_ = 0;
363
      }
364
    } else {  // directly trigger listeners using buf
365
      NotifyListeners(ObjectPackBuild::Event(
366
          index_[idx_].id, index_[idx_].size, nbytes, buf + pos_in_buf,
367
27426
          index_[idx_].entry_type, index_[idx_].entry_name));
368
    }
369
370
42357
    pos_in_buf += nbytes;
371
42357
    pos_in_object_ += nbytes;
372
42357
    if (nbytes == remaining_in_object) {
373
6651
      idx_++;
374
6651
      pos_in_object_ = 0;
375
    }
376
  }
377
378
31140
  pos_ += buf_size;
379
380
31140
  if (idx_ == index_.size())
381
    state_ = (pos_in_buf == buf_size) ? ObjectPackBuild::kStateDone
382
72
                                      : ObjectPackBuild::kStateTrailingBytes;
383
  else
384
31068
    state_ = ObjectPackBuild::kStateContinue;
385
31140
  return state_;
386
}
387
388
74
bool ObjectPackConsumer::ParseHeader() {
389
74
  map<char, string> header;
390
  const unsigned char *data =
391
74
      reinterpret_cast<const unsigned char *>(raw_header_.data());
392
74
  ParseKeyvalMem(data, raw_header_.size(), &header);
393
74
  if (header.find('V') == header.end()) return false;
394
74
  if (header['V'] != "2") return false;
395
74
  size_ = String2Uint64(header['S']);
396
74
  unsigned nobjects = String2Uint64(header['N']);
397
398
74
  if (nobjects == 0) return true;
399
400
  // Build the object index
401
72
  const size_t separator_idx = raw_header_.find("--\n");
402
72
  if (separator_idx == string::npos) return false;
403
72
  unsigned index_idx = separator_idx + 3;
404
72
  if (index_idx >= raw_header_.size()) return false;
405
406
72
  uint64_t sum_size = 0;
407
6651
  do {
408
6651
    const unsigned remaining_in_header = raw_header_.size() - index_idx;
409
    string line =
410
6651
        GetLineMem(raw_header_.data() + index_idx, remaining_in_header);
411
6651
    if (line == "") break;
412
413
6651
    IndexEntry entry;
414
6651
    if (!ParseItem(line, &entry, &sum_size)) {
415
      break;
416
    }
417
418
6651
    index_.push_back(entry);
419

6651
    index_idx += line.size() + 1;
420
  } while (index_idx < raw_header_.size());
421
422

72
  return (nobjects == index_.size()) && (size_ == sum_size);
423
}
424
425
6651
bool ObjectPackConsumer::ParseItem(const std::string &line,
426
                                   ObjectPackConsumer::IndexEntry *entry,
427
                                   uint64_t *sum_size) {
428

6651
  if (!entry || !sum_size) {
429
    return false;
430
  }
431
432
6651
  if (line[0] == 'C') {  // CAS blob
433
6650
    const ObjectPack::BucketContentType entry_type = ObjectPack::kCas;
434
435
    // We could use SplitString but we can have many lines so we do something
436
    // more efficient here
437
6650
    const size_t separator = line.find(' ', 2);
438

6650
    if ((separator == string::npos) || (separator == (line.size() - 1))) {
439
      return false;
440
    }
441
442
6650
    uint64_t size = String2Uint64(line.substr(separator + 1));
443
6650
    *sum_size += size;
444
445
    // Warning do not construct a HexPtr with an rvalue!
446
    // The constructor takes the address of its argument.
447
6650
    const std::string hash_string = line.substr(2, separator - 2);
448
6650
    shash::HexPtr hex_ptr(hash_string);
449
450
6650
    entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
451
6650
    entry->size = size;
452
6650
    entry->entry_type = entry_type;
453
6650
    entry->entry_name = "";
454
1
  } else if (line[0] == 'N') {  // Named file
455
1
    const ObjectPack::BucketContentType entry_type = ObjectPack::kNamed;
456
457
    // First separator, before the size field
458
1
    const size_t separator1 = line.find(' ', 2);
459

1
    if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) {
460
      return false;
461
    }
462
463
    // Second separator, before the name field
464
1
    const size_t separator2 = line.find(' ', separator1 + 1);
465


1
    if ((separator1 == 0) || (separator1 == string::npos) ||
466
        (separator1 == (line.size() - 1))) {
467
      return false;
468
    }
469
470
    uint64_t size =
471
1
        String2Uint64(line.substr(separator1 + 1, separator2 - separator1 - 1));
472
473
1
    std::string name;
474
2
    if (!Debase64(line.substr(separator2 + 1), &name)) {
475
      return false;
476
    }
477
478
1
    *sum_size += size;
479
480
    // Warning do not construct a HexPtr with an rvalue!
481
    // The constructor takes the address of its argument.
482
1
    const std::string hash_string = line.substr(2, separator1 - 2);
483
1
    shash::HexPtr hex_ptr(hash_string);
484
485
1
    entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
486
1
    entry->size = size;
487
1
    entry->entry_type = entry_type;
488

1
    entry->entry_name = name;
489
  } else {  // Error
490
    return false;
491
  }
492
493
6651
  return true;
494
}