CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pack.cc
Go to the documentation of this file.
1 
5 #include "pack.h"
6 
7 #include <algorithm>
8 #include <cassert>
9 #include <cstring>
10 #include <map>
11 
12 #include "util/concurrency.h"
13 #include "util/exception.h"
14 #include "util/platform.h"
15 #include "util/smalloc.h"
16 #include "util/string.h"
17 
18 using namespace std; // NOLINT
19 
20 namespace { // some private utility functions used by ObjectPackProducer
21 
22 void InitializeHeader(const int version, const int num_objects,
23  const size_t pack_size, std::string *header) {
24  if (header) {
25  *header = "V" + StringifyInt(version) + "\n";
26  *header += "S" + StringifyInt(pack_size) + "\n";
27  *header += "N" + StringifyInt(num_objects) + "\n";
28  *header += "--\n";
29  }
30 }
31 
33  const std::string &hash_str, const size_t object_size,
34  const std::string &object_name, std::string *header) {
35  // If the item type is kName, the "item_name" parameter should not be empty
36  assert((object_type == ObjectPack::kCas) ||
37  ((object_type == ObjectPack::kNamed) && (!object_name.empty())));
38  std::string line_prefix = "";
39  std::string line_suffix = "";
40  switch (object_type) {
41  case ObjectPack::kNamed:
42  line_prefix = "N ";
43  line_suffix = std::string(" ") + Base64Url(object_name);
44  break;
45  case ObjectPack::kCas:
46  line_prefix = "C ";
47  break;
48  default:
49  PANIC(kLogStderr, "Unknown object pack type to be added to header.");
50  }
51  if (header) {
52  *header += line_prefix + hash_str + " " + StringifyInt(object_size) +
53  line_suffix + "\n";
54  }
55 }
56 
57 } // namespace
58 
60  : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize))),
61  size(0),
62  capacity(kInitialSize),
63  content_type(kEmpty),
64  name() {}
65 
66 void ObjectPack::Bucket::Add(const void *buf, const uint64_t buf_size) {
67  if (buf_size == 0) return;
68 
69  while (size + buf_size > capacity) {
70  capacity *= 2;
71  content = reinterpret_cast<unsigned char *>(srealloc(content, capacity));
72  }
73  memcpy(content + size, buf, buf_size);
74  size += buf_size;
75 }
76 
77 ObjectPack::Bucket::~Bucket() { free(content); }
78 
79 //------------------------------------------------------------------------------
80 
81 ObjectPack::ObjectPack(const uint64_t limit) : limit_(limit), size_(0) {
82  InitLock();
83 }
84 
86  for (std::set<BucketHandle>::const_iterator i = open_buckets_.begin(),
87  iEnd = open_buckets_.end();
88  i != iEnd; ++i) {
89  delete *i;
90  }
91 
92  for (unsigned i = 0; i < buckets_.size(); ++i) delete buckets_[i];
93  pthread_mutex_destroy(lock_);
94  free(lock_);
95 }
96 
97 void ObjectPack::AddToBucket(const void *buf, const uint64_t size,
98  const ObjectPack::BucketHandle handle) {
99  handle->Add(buf, size);
100 }
101 
103  BucketHandle handle = new Bucket();
104 
105  MutexLockGuard mutex_guard(lock_);
106  open_buckets_.insert(handle);
107  return handle;
108 }
109 
114  const shash::Any &id,
115  const ObjectPack::BucketHandle handle,
116  const std::string &name) {
117  handle->id = id;
118 
119  handle->content_type = type;
120  if (type == kNamed) {
121  handle->name = name;
122  }
123 
124  MutexLockGuard mutex_guard(lock_);
125  if (buckets_.size() >= kMaxObjects) return false;
126  if (size_ + handle->size > limit_) return false;
127  open_buckets_.erase(handle);
128  buckets_.push_back(handle);
129  size_ += handle->size;
130  return true;
131 }
132 
134  MutexLockGuard mutex_guard(lock_);
135  open_buckets_.erase(handle);
136  delete handle;
137 }
138 
140  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
141  int retval = pthread_mutex_init(lock_, NULL);
142  assert(retval == 0);
143 }
144 
150  ObjectPack *other) {
151  MutexLockGuard mutex_guard(lock_);
152  open_buckets_.erase(handle);
153  other->open_buckets_.insert(handle);
154 }
155 
156 unsigned char *ObjectPack::BucketContent(size_t idx) const {
157  assert(idx < buckets_.size());
158  return buckets_[idx]->content;
159 }
160 
161 uint64_t ObjectPack::BucketSize(size_t idx) const {
162  assert(idx < buckets_.size());
163  return buckets_[idx]->size;
164 }
165 
166 const shash::Any &ObjectPack::BucketId(size_t idx) const {
167  assert(idx < buckets_.size());
168  return buckets_[idx]->id;
169 }
170 
171 //------------------------------------------------------------------------------
172 
177  assert(hash);
178  shash::HashString(header_, hash);
179 }
180 
182  : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) {
183  unsigned N = pack->GetNoObjects();
184  // rough guess, most likely a little too much
185  header_.reserve(30 + N * (2 * shash::kMaxDigestSize + 5));
186 
187  InitializeHeader(2, N, pack->size(), &header_);
188 
189  for (unsigned i = 0; i < N; ++i) {
191  pack->BucketSize(i), "", &header_);
192  }
193 }
194 
196  const std::string &file_name)
197  : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) {
198  int fd = fileno(big_file_);
199  assert(fd >= 0);
200  platform_stat64 info;
201  int retval = platform_fstat(fd, &info);
202  assert(retval == 0);
203 
204  InitializeHeader(2, 1, info.st_size, &header_);
205 
206  AppendItemToHeader(ObjectPack::kNamed, id.ToString(true), info.st_size,
207  file_name, &header_);
208 
209  rewind(big_file);
210 }
211 
216 unsigned ObjectPackProducer::ProduceNext(const unsigned buf_size,
217  unsigned char *buf) {
218  const unsigned remaining_in_header =
219  (pos_ < header_.size()) ? (header_.size() - pos_) : 0;
220  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
221  if (nbytes_header) {
222  memcpy(buf, header_.data() + pos_, nbytes_header);
223  pos_ += nbytes_header;
224  }
225 
226  unsigned remaining_in_buf = buf_size - nbytes_header;
227  if (remaining_in_buf == 0) return nbytes_header;
228  unsigned nbytes_payload = 0;
229 
230  if (big_file_) {
231  size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf, big_file_);
232  nbytes_payload = nbytes;
233  pos_ += nbytes_payload;
234  } else if (idx_ < pack_->GetNoObjects()) {
235  // Copy a few buckets more
236  while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) {
237  const unsigned remaining_in_bucket =
239  const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket);
240  memcpy(buf + nbytes_header + nbytes_payload,
241  pack_->BucketContent(idx_) + pos_in_bucket_, nbytes);
242 
243  pos_in_bucket_ += nbytes;
244  nbytes_payload += nbytes;
245  remaining_in_buf -= nbytes;
246  if (nbytes == remaining_in_bucket) {
247  pos_in_bucket_ = 0;
248  idx_++;
249  }
250  }
251  }
252 
253  return nbytes_header + nbytes_payload;
254 }
255 
256 //------------------------------------------------------------------------------
257 
259  const unsigned expected_header_size)
260  : expected_digest_(expected_digest),
261  expected_header_size_(expected_header_size),
262  pos_(0),
263  idx_(0),
264  pos_in_object_(0),
265  pos_in_accu_(0),
266  state_(ObjectPackBuild::kStateContinue),
267  size_(0) {
268  // Upper limit of 100B per entry
269  if (expected_header_size > (100 * ObjectPack::kMaxObjects)) {
271  return;
272  }
273 
274  raw_header_.reserve(expected_header_size);
275 }
276 
282  const unsigned buf_size, const unsigned char *buf) {
283  if (buf_size == 0) return state_;
286  return state_;
287  }
289 
290  const unsigned remaining_in_header =
292  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
293  if (nbytes_header) {
294  raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header);
295  pos_ += nbytes_header;
296  }
297 
299 
300  // This condition can only be true once through the lifetime of the
301  // Consumer.
302  if (nbytes_header && (pos_ == expected_header_size_)) {
304  shash::HashString(raw_header_, &digest);
305  if (digest != expected_digest_) {
307  return state_;
308  } else {
309  bool retval = ParseHeader();
310  if (!retval) {
312  return state_;
313  }
314  // We don't need the raw string anymore
315  raw_header_.clear();
316  }
317 
318  // Empty pack?
319  if ((buf_size == nbytes_header) && (index_.size() == 0)) {
321  return state_;
322  }
323  }
324 
325  unsigned remaining_in_buf = buf_size - nbytes_header;
326  const unsigned char *payload = buf + nbytes_header;
327  return ConsumePayload(remaining_in_buf, payload);
328 }
329 
339  const unsigned buf_size, const unsigned char *buf) {
340  uint64_t pos_in_buf = 0;
341  while ((idx_ < index_.size()) &&
342  ((pos_in_buf < buf_size) || (index_[idx_].size == 0))) {
343  // Fill the accumulator or process next small object
344  uint64_t nbytes; // How many bytes are consumed in this iteration
345  const uint64_t remaining_in_buf = buf_size - pos_in_buf;
346  const uint64_t remaining_in_object = index_[idx_].size - pos_in_object_;
347  const bool is_small_rest = remaining_in_buf < kAccuSize;
348 
349  // We use the accumulator if there is already something in or if we have a
350  // small piece of data of a larger object.
351  nbytes = std::min(remaining_in_object, remaining_in_buf);
352  if ((pos_in_accu_ > 0) ||
353  ((remaining_in_buf < remaining_in_object) && is_small_rest)) {
354  const uint64_t remaining_in_accu = kAccuSize - pos_in_accu_;
355  nbytes = std::min(remaining_in_accu, nbytes);
356  memcpy(accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes);
357  pos_in_accu_ += nbytes;
358  if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) {
360  index_[idx_].id, index_[idx_].size, pos_in_accu_, accumulator_,
361  index_[idx_].entry_type, index_[idx_].entry_name));
362  pos_in_accu_ = 0;
363  }
364  } else { // directly trigger listeners using buf
366  index_[idx_].id, index_[idx_].size, nbytes, buf + pos_in_buf,
367  index_[idx_].entry_type, index_[idx_].entry_name));
368  }
369 
370  pos_in_buf += nbytes;
371  pos_in_object_ += nbytes;
372  if (nbytes == remaining_in_object) {
373  idx_++;
374  pos_in_object_ = 0;
375  }
376  }
377 
378  pos_ += buf_size;
379 
380  if (idx_ == index_.size())
381  state_ = (pos_in_buf == buf_size) ? ObjectPackBuild::kStateDone
383  else
385  return state_;
386 }
387 
389  map<char, string> header;
390  const unsigned char *data =
391  reinterpret_cast<const unsigned char *>(raw_header_.data());
392  ParseKeyvalMem(data, raw_header_.size(), &header);
393  if (header.find('V') == header.end()) return false;
394  if (header['V'] != "2") return false;
395  size_ = String2Uint64(header['S']);
396  unsigned nobjects = String2Uint64(header['N']);
397 
398  if (nobjects == 0) return true;
399 
400  // Build the object index
401  const size_t separator_idx = raw_header_.find("--\n");
402  if (separator_idx == string::npos) return false;
403  unsigned index_idx = separator_idx + 3;
404  if (index_idx >= raw_header_.size()) return false;
405 
406  uint64_t sum_size = 0;
407  do {
408  const unsigned remaining_in_header = raw_header_.size() - index_idx;
409  string line =
410  GetLineMem(raw_header_.data() + index_idx, remaining_in_header);
411  if (line == "") break;
412 
413  IndexEntry entry;
414  if (!ParseItem(line, &entry, &sum_size)) {
415  break;
416  }
417 
418  index_.push_back(entry);
419  index_idx += line.size() + 1;
420  } while (index_idx < raw_header_.size());
421 
422  return (nobjects == index_.size()) && (size_ == sum_size);
423 }
424 
425 bool ObjectPackConsumer::ParseItem(const std::string &line,
427  uint64_t *sum_size) {
428  if (!entry || !sum_size) {
429  return false;
430  }
431 
432  if (line[0] == 'C') { // CAS blob
434 
435  // We could use SplitString but we can have many lines so we do something
436  // more efficient here
437  const size_t separator = line.find(' ', 2);
438  if ((separator == string::npos) || (separator == (line.size() - 1))) {
439  return false;
440  }
441 
442  uint64_t size = String2Uint64(line.substr(separator + 1));
443  *sum_size += size;
444 
445  // Warning do not construct a HexPtr with an rvalue!
446  // The constructor takes the address of its argument.
447  const std::string hash_string = line.substr(2, separator - 2);
448  shash::HexPtr hex_ptr(hash_string);
449 
450  entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
451  entry->size = size;
452  entry->entry_type = entry_type;
453  entry->entry_name = "";
454  } else if (line[0] == 'N') { // Named file
456 
457  // First separator, before the size field
458  const size_t separator1 = line.find(' ', 2);
459  if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) {
460  return false;
461  }
462 
463  // Second separator, before the name field
464  const size_t separator2 = line.find(' ', separator1 + 1);
465  if ((separator1 == 0) || (separator1 == string::npos) ||
466  (separator1 == (line.size() - 1))) {
467  return false;
468  }
469 
470  uint64_t size =
471  String2Uint64(line.substr(separator1 + 1, separator2 - separator1 - 1));
472 
473  std::string name;
474  if (!Debase64(line.substr(separator2 + 1), &name)) {
475  return false;
476  }
477 
478  *sum_size += size;
479 
480  // Warning do not construct a HexPtr with an rvalue!
481  // The constructor takes the address of its argument.
482  const std::string hash_string = line.substr(2, separator1 - 2);
483  shash::HexPtr hex_ptr(hash_string);
484 
485  entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
486  entry->size = size;
487  entry->entry_type = entry_type;
488  entry->entry_name = name;
489  } else { // Error
490  return false;
491  }
492 
493  return true;
494 }
void AppendItemToHeader(ObjectPack::BucketContentType object_type, const std::string &hash_str, const size_t object_size, const std::string &object_name, std::string *header)
Definition: pack.cc:32
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:268
struct stat64 platform_stat64
unsigned char accumulator_[kAccuSize]
Definition: pack.h:283
string GetLineMem(const char *text, const int text_size)
Definition: string.cc:398
BucketContentType
Definition: pack.h:53
ObjectPackConsumer(const shash::Any &expected_digest, const unsigned expected_header_size)
Definition: pack.cc:258
std::string header_
Definition: pack.h:220
unsigned idx_
Definition: pack.h:272
#define PANIC(...)
Definition: exception.h:29
void TransferBucket(const BucketHandle handle, ObjectPack *other)
Definition: pack.cc:149
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:249
void Add(const void *buf, const uint64_t buf_size)
Definition: pack.cc:66
uint64_t size_
Definition: pack.h:122
static const uint64_t kMaxObjects
Definition: pack.h:62
std::string entry_name
Definition: pack.h:251
void NotifyListeners(const ObjectPackBuild::Event &parameter)
FILE * big_file_
Definition: pack.h:200
const shash::Any & BucketId(size_t idx) const
Definition: pack.cc:166
pthread_mutex_t * lock_
Definition: pack.h:113
assert((mem||(size==0))&&"Out Of Memory")
ObjectPackProducer(ObjectPack *pack)
Definition: pack.cc:181
Algorithms algorithm
Definition: hash.h:125
unsigned char * BucketContent(size_t idx) const
Definition: pack.cc:156
size_t idx_
Definition: pack.h:210
bool Debase64(const string &data, string *decoded)
Definition: string.cc:582
uint64_t size() const
Definition: pack.h:82
unsigned pos_in_object_
Definition: pack.h:277
uint64_t size
Definition: pack.h:249
string Base64Url(const string &data)
Definition: string.cc:553
BucketHandle NewBucket()
Definition: pack.cc:102
unsigned expected_header_size_
Definition: pack.h:262
static const unsigned kAccuSize
Definition: pack.h:241
size_t pos_in_bucket_
Definition: pack.h:215
unsigned pos_in_accu_
Definition: pack.h:288
std::string name
Definition: pack.h:105
ObjectPackBuild::State ConsumePayload(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:338
uint64_t BucketSize(size_t idx) const
Definition: pack.cc:161
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:281
Definition: pack.h:243
void InitLock()
Definition: pack.cc:139
bool ParseHeader()
Definition: pack.cc:388
BucketContentType content_type
Definition: pack.h:104
std::vector< IndexEntry > index_
Definition: pack.h:310
std::vector< BucketHandle > buckets_
Definition: pack.h:130
void GetDigest(shash::Any *hash)
Definition: pack.cc:176
unsigned char digest[20]
string StringifyInt(const int64_t value)
Definition: string.cc:78
void DiscardBucket(const BucketHandle handle)
Definition: pack.cc:133
bool ParseItem(const std::string &line, IndexEntry *entry, uint64_t *sum_size)
Definition: pack.cc:425
uint64_t size
Definition: pack.h:101
std::set< BucketHandle > open_buckets_
Definition: pack.h:126
ObjectPack::BucketContentType entry_type
Definition: pack.h:250
shash::Any id
Definition: pack.h:248
~ObjectPack()
Definition: pack.cc:85
uint64_t String2Uint64(const string &value)
Definition: string.cc:246
Any MkFromSuffixedHexPtr(const HexPtr hex)
Definition: hash.cc:105
Definition: mutex.h:42
ObjectPack(const uint64_t limit=kDefaultLimit)
Definition: pack.cc:81
uint64_t limit_
Definition: pack.h:118
shash::Any id
Definition: pack.h:103
uint64_t size_
Definition: pack.h:305
const unsigned kMaxDigestSize
Definition: hash.h:72
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
Definition: pack.cc:113
ObjectPack * pack_
Definition: pack.h:195
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
Definition: pack.cc:97
uint64_t pos_
Definition: pack.h:205
size_t GetNoObjects() const
Definition: pack.h:86
std::string raw_header_
Definition: pack.h:300
int platform_fstat(int filedes, platform_stat64 *buf)
ObjectPackBuild::State state_
Definition: pack.h:294
static void size_t size
Definition: smalloc.h:54
void InitializeHeader(const int version, const int num_objects, const size_t pack_size, std::string *header)
Definition: pack.cc:22
void ParseKeyvalMem(const unsigned char *buffer, const unsigned buffer_size, map< char, string > *content)
Definition: string.cc:355
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
Definition: pack.cc:216
shash::Any expected_digest_
Definition: pack.h:261
uint64_t pos_
Definition: pack.h:267