CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pack.cc
Go to the documentation of this file.
1 
5 #include "pack.h"
6 
7 #include <algorithm>
8 #include <cassert>
9 #include <cstring>
10 #include <map>
11 
12 #include "util/concurrency.h"
13 #include "util/exception.h"
14 #include "util/platform.h"
15 #include "util/smalloc.h"
16 #include "util/string.h"
17 
18 using namespace std; // NOLINT
19 
20 namespace { // some private utility functions used by ObjectPackProducer
21 
22 void InitializeHeader(const int version, const int num_objects,
23  const size_t pack_size, std::string *header) {
24  if (header) {
25  *header = "V" + StringifyInt(version) + "\n";
26  *header += "S" + StringifyInt(pack_size) + "\n";
27  *header += "N" + StringifyInt(num_objects) + "\n";
28  *header += "--\n";
29  }
30 }
31 
33  const std::string &hash_str, const size_t object_size,
34  const std::string &object_name, std::string *header) {
35  // If the item type is kName, the "item_name" parameter should not be empty
36  assert((object_type == ObjectPack::kCas)
37  || ((object_type == ObjectPack::kNamed) && (!object_name.empty())));
38  std::string line_prefix = "";
39  std::string line_suffix = "";
40  switch (object_type) {
41  case ObjectPack::kNamed:
42  line_prefix = "N ";
43  line_suffix = std::string(" ") + Base64Url(object_name);
44  break;
45  case ObjectPack::kCas:
46  line_prefix = "C ";
47  break;
48  default:
49  PANIC(kLogStderr, "Unknown object pack type to be added to header.");
50  }
51  if (header) {
52  *header += line_prefix + hash_str + " " + StringifyInt(object_size)
53  + line_suffix + "\n";
54  }
55 }
56 
57 } // namespace
58 
60  : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize)))
61  , size(0)
62  , capacity(kInitialSize)
63  , content_type(kEmpty)
64  , name() { }
65 
66 void ObjectPack::Bucket::Add(const void *buf, const uint64_t buf_size) {
67  if (buf_size == 0)
68  return;
69 
70  while (size + buf_size > capacity) {
71  capacity *= 2;
72  content = reinterpret_cast<unsigned char *>(srealloc(content, capacity));
73  }
74  memcpy(content + size, buf, buf_size);
75  size += buf_size;
76 }
77 
78 ObjectPack::Bucket::~Bucket() { free(content); }
79 
80 //------------------------------------------------------------------------------
81 
82 ObjectPack::ObjectPack(const uint64_t limit) : limit_(limit), size_(0) {
83  InitLock();
84 }
85 
87  for (std::set<BucketHandle>::const_iterator i = open_buckets_.begin(),
88  iEnd = open_buckets_.end();
89  i != iEnd;
90  ++i) {
91  delete *i;
92  }
93 
94  for (unsigned i = 0; i < buckets_.size(); ++i)
95  delete buckets_[i];
96  pthread_mutex_destroy(lock_);
97  free(lock_);
98 }
99 
100 void ObjectPack::AddToBucket(const void *buf, const uint64_t size,
101  const ObjectPack::BucketHandle handle) {
102  handle->Add(buf, size);
103 }
104 
106  BucketHandle handle = new Bucket();
107 
108  MutexLockGuard mutex_guard(lock_);
109  open_buckets_.insert(handle);
110  return handle;
111 }
112 
117  const shash::Any &id,
118  const ObjectPack::BucketHandle handle,
119  const std::string &name) {
120  handle->id = id;
121 
122  handle->content_type = type;
123  if (type == kNamed) {
124  handle->name = name;
125  }
126 
127  MutexLockGuard mutex_guard(lock_);
128  if (buckets_.size() >= kMaxObjects)
129  return false;
130  if (size_ + handle->size > limit_)
131  return false;
132  open_buckets_.erase(handle);
133  buckets_.push_back(handle);
134  size_ += handle->size;
135  return true;
136 }
137 
139  MutexLockGuard mutex_guard(lock_);
140  open_buckets_.erase(handle);
141  delete handle;
142 }
143 
145  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
146  int retval = pthread_mutex_init(lock_, NULL);
147  assert(retval == 0);
148 }
149 
155  ObjectPack *other) {
156  MutexLockGuard mutex_guard(lock_);
157  open_buckets_.erase(handle);
158  other->open_buckets_.insert(handle);
159 }
160 
161 unsigned char *ObjectPack::BucketContent(size_t idx) const {
162  assert(idx < buckets_.size());
163  return buckets_[idx]->content;
164 }
165 
166 uint64_t ObjectPack::BucketSize(size_t idx) const {
167  assert(idx < buckets_.size());
168  return buckets_[idx]->size;
169 }
170 
171 const shash::Any &ObjectPack::BucketId(size_t idx) const {
172  assert(idx < buckets_.size());
173  return buckets_[idx]->id;
174 }
175 
176 //------------------------------------------------------------------------------
177 
182  assert(hash);
183  shash::HashString(header_, hash);
184 }
185 
187  : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) {
188  unsigned N = pack->GetNoObjects();
189  // rough guess, most likely a little too much
190  header_.reserve(30 + N * (2 * shash::kMaxDigestSize + 5));
191 
192  InitializeHeader(2, N, pack->size(), &header_);
193 
194  for (unsigned i = 0; i < N; ++i) {
196  pack->BucketSize(i), "", &header_);
197  }
198 }
199 
201  const std::string &file_name)
202  : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) {
203  int fd = fileno(big_file_);
204  assert(fd >= 0);
205  platform_stat64 info;
206  int retval = platform_fstat(fd, &info);
207  assert(retval == 0);
208 
209  InitializeHeader(2, 1, info.st_size, &header_);
210 
211  AppendItemToHeader(ObjectPack::kNamed, id.ToString(true), info.st_size,
212  file_name, &header_);
213 
214  rewind(big_file);
215 }
216 
221 unsigned ObjectPackProducer::ProduceNext(const unsigned buf_size,
222  unsigned char *buf) {
223  const unsigned remaining_in_header = (pos_ < header_.size())
224  ? (header_.size() - pos_)
225  : 0;
226  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
227  if (nbytes_header) {
228  memcpy(buf, header_.data() + pos_, nbytes_header);
229  pos_ += nbytes_header;
230  }
231 
232  unsigned remaining_in_buf = buf_size - nbytes_header;
233  if (remaining_in_buf == 0)
234  return nbytes_header;
235  unsigned nbytes_payload = 0;
236 
237  if (big_file_) {
238  size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf, big_file_);
239  nbytes_payload = nbytes;
240  pos_ += nbytes_payload;
241  } else if (idx_ < pack_->GetNoObjects()) {
242  // Copy a few buckets more
243  while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) {
244  const unsigned remaining_in_bucket = pack_->BucketSize(idx_)
245  - pos_in_bucket_;
246  const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket);
247  memcpy(buf + nbytes_header + nbytes_payload,
248  pack_->BucketContent(idx_) + pos_in_bucket_, nbytes);
249 
250  pos_in_bucket_ += nbytes;
251  nbytes_payload += nbytes;
252  remaining_in_buf -= nbytes;
253  if (nbytes == remaining_in_bucket) {
254  pos_in_bucket_ = 0;
255  idx_++;
256  }
257  }
258  }
259 
260  return nbytes_header + nbytes_payload;
261 }
262 
263 //------------------------------------------------------------------------------
264 
266  const unsigned expected_header_size)
267  : expected_digest_(expected_digest)
268  , expected_header_size_(expected_header_size)
269  , pos_(0)
270  , idx_(0)
271  , pos_in_object_(0)
272  , pos_in_accu_(0)
273  , state_(ObjectPackBuild::kStateContinue)
274  , size_(0) {
275  // Upper limit of 100B per entry
276  if (expected_header_size > (100 * ObjectPack::kMaxObjects)) {
278  return;
279  }
280 
281  raw_header_.reserve(expected_header_size);
282 }
283 
289  const unsigned buf_size, const unsigned char *buf) {
290  if (buf_size == 0)
291  return state_;
294  return state_;
295  }
297  return state_;
298 
299  const unsigned remaining_in_header = (pos_ < expected_header_size_)
301  : 0;
302  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
303  if (nbytes_header) {
304  raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header);
305  pos_ += nbytes_header;
306  }
307 
310 
311  // This condition can only be true once through the lifetime of the
312  // Consumer.
313  if (nbytes_header && (pos_ == expected_header_size_)) {
315  shash::HashString(raw_header_, &digest);
316  if (digest != expected_digest_) {
318  return state_;
319  } else {
320  bool retval = ParseHeader();
321  if (!retval) {
323  return state_;
324  }
325  // We don't need the raw string anymore
326  raw_header_.clear();
327  }
328 
329  // Empty pack?
330  if ((buf_size == nbytes_header) && (index_.size() == 0)) {
332  return state_;
333  }
334  }
335 
336  unsigned remaining_in_buf = buf_size - nbytes_header;
337  const unsigned char *payload = buf + nbytes_header;
338  return ConsumePayload(remaining_in_buf, payload);
339 }
340 
350  const unsigned buf_size, const unsigned char *buf) {
351  uint64_t pos_in_buf = 0;
352  while ((idx_ < index_.size())
353  && ((pos_in_buf < buf_size) || (index_[idx_].size == 0))) {
354  // Fill the accumulator or process next small object
355  uint64_t nbytes; // How many bytes are consumed in this iteration
356  const uint64_t remaining_in_buf = buf_size - pos_in_buf;
357  const uint64_t remaining_in_object = index_[idx_].size - pos_in_object_;
358  const bool is_small_rest = remaining_in_buf < kAccuSize;
359 
360  // We use the accumulator if there is already something in or if we have a
361  // small piece of data of a larger object.
362  nbytes = std::min(remaining_in_object, remaining_in_buf);
363  if ((pos_in_accu_ > 0)
364  || ((remaining_in_buf < remaining_in_object) && is_small_rest)) {
365  const uint64_t remaining_in_accu = kAccuSize - pos_in_accu_;
366  nbytes = std::min(remaining_in_accu, nbytes);
367  memcpy(accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes);
368  pos_in_accu_ += nbytes;
369  if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) {
371  index_[idx_].id, index_[idx_].size, pos_in_accu_, accumulator_,
372  index_[idx_].entry_type, index_[idx_].entry_name));
373  pos_in_accu_ = 0;
374  }
375  } else { // directly trigger listeners using buf
377  index_[idx_].id, index_[idx_].size, nbytes, buf + pos_in_buf,
378  index_[idx_].entry_type, index_[idx_].entry_name));
379  }
380 
381  pos_in_buf += nbytes;
382  pos_in_object_ += nbytes;
383  if (nbytes == remaining_in_object) {
384  idx_++;
385  pos_in_object_ = 0;
386  }
387  }
388 
389  pos_ += buf_size;
390 
391  if (idx_ == index_.size())
392  state_ = (pos_in_buf == buf_size) ? ObjectPackBuild::kStateDone
394  else
396  return state_;
397 }
398 
400  map<char, string> header;
401  const unsigned char *data = reinterpret_cast<const unsigned char *>(
402  raw_header_.data());
403  ParseKeyvalMem(data, raw_header_.size(), &header);
404  if (header.find('V') == header.end())
405  return false;
406  if (header['V'] != "2")
407  return false;
408  size_ = String2Uint64(header['S']);
409  unsigned nobjects = String2Uint64(header['N']);
410 
411  if (nobjects == 0)
412  return true;
413 
414  // Build the object index
415  const size_t separator_idx = raw_header_.find("--\n");
416  if (separator_idx == string::npos)
417  return false;
418  unsigned index_idx = separator_idx + 3;
419  if (index_idx >= raw_header_.size())
420  return false;
421 
422  uint64_t sum_size = 0;
423  do {
424  const unsigned remaining_in_header = raw_header_.size() - index_idx;
425  string line = GetLineMem(raw_header_.data() + index_idx,
426  remaining_in_header);
427  if (line == "")
428  break;
429 
430  IndexEntry entry;
431  if (!ParseItem(line, &entry, &sum_size)) {
432  break;
433  }
434 
435  index_.push_back(entry);
436  index_idx += line.size() + 1;
437  } while (index_idx < raw_header_.size());
438 
439  return (nobjects == index_.size()) && (size_ == sum_size);
440 }
441 
442 bool ObjectPackConsumer::ParseItem(const std::string &line,
444  uint64_t *sum_size) {
445  if (!entry || !sum_size) {
446  return false;
447  }
448 
449  if (line[0] == 'C') { // CAS blob
451 
452  // We could use SplitString but we can have many lines so we do something
453  // more efficient here
454  const size_t separator = line.find(' ', 2);
455  if ((separator == string::npos) || (separator == (line.size() - 1))) {
456  return false;
457  }
458 
459  uint64_t size = String2Uint64(line.substr(separator + 1));
460  *sum_size += size;
461 
462  // Warning do not construct a HexPtr with an rvalue!
463  // The constructor takes the address of its argument.
464  const std::string hash_string = line.substr(2, separator - 2);
465  shash::HexPtr hex_ptr(hash_string);
466 
467  entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
468  entry->size = size;
469  entry->entry_type = entry_type;
470  entry->entry_name = "";
471  } else if (line[0] == 'N') { // Named file
473 
474  // First separator, before the size field
475  const size_t separator1 = line.find(' ', 2);
476  if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) {
477  return false;
478  }
479 
480  // Second separator, before the name field
481  const size_t separator2 = line.find(' ', separator1 + 1);
482  if ((separator1 == 0) || (separator1 == string::npos)
483  || (separator1 == (line.size() - 1))) {
484  return false;
485  }
486 
487  uint64_t size = String2Uint64(
488  line.substr(separator1 + 1, separator2 - separator1 - 1));
489 
490  std::string name;
491  if (!Debase64(line.substr(separator2 + 1), &name)) {
492  return false;
493  }
494 
495  *sum_size += size;
496 
497  // Warning do not construct a HexPtr with an rvalue!
498  // The constructor takes the address of its argument.
499  const std::string hash_string = line.substr(2, separator1 - 2);
500  shash::HexPtr hex_ptr(hash_string);
501 
502  entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
503  entry->size = size;
504  entry->entry_type = entry_type;
505  entry->entry_name = name;
506  } else { // Error
507  return false;
508  }
509 
510  return true;
511 }
void AppendItemToHeader(ObjectPack::BucketContentType object_type, const std::string &hash_str, const size_t object_size, const std::string &object_name, std::string *header)
Definition: pack.cc:32
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:269
struct stat64 platform_stat64
unsigned char accumulator_[kAccuSize]
Definition: pack.h:287
string GetLineMem(const char *text, const int text_size)
Definition: string.cc:415
BucketContentType
Definition: pack.h:53
ObjectPackConsumer(const shash::Any &expected_digest, const unsigned expected_header_size)
Definition: pack.cc:265
std::string header_
Definition: pack.h:224
unsigned idx_
Definition: pack.h:276
#define PANIC(...)
Definition: exception.h:29
void TransferBucket(const BucketHandle handle, ObjectPack *other)
Definition: pack.cc:154
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
void Add(const void *buf, const uint64_t buf_size)
Definition: pack.cc:66
uint64_t size_
Definition: pack.h:126
static const uint64_t kMaxObjects
Definition: pack.h:66
std::string entry_name
Definition: pack.h:255
void NotifyListeners(const ObjectPackBuild::Event &parameter)
FILE * big_file_
Definition: pack.h:204
const shash::Any & BucketId(size_t idx) const
Definition: pack.cc:171
pthread_mutex_t * lock_
Definition: pack.h:117
assert((mem||(size==0))&&"Out Of Memory")
ObjectPackProducer(ObjectPack *pack)
Definition: pack.cc:186
Algorithms algorithm
Definition: hash.h:122
unsigned char * BucketContent(size_t idx) const
Definition: pack.cc:161
size_t idx_
Definition: pack.h:214
bool Debase64(const string &data, string *decoded)
Definition: string.cc:598
uint64_t size() const
Definition: pack.h:86
unsigned pos_in_object_
Definition: pack.h:281
uint64_t size
Definition: pack.h:253
string Base64Url(const string &data)
Definition: string.cc:568
BucketHandle NewBucket()
Definition: pack.cc:105
unsigned expected_header_size_
Definition: pack.h:266
static const unsigned kAccuSize
Definition: pack.h:245
size_t pos_in_bucket_
Definition: pack.h:219
unsigned pos_in_accu_
Definition: pack.h:292
std::string name
Definition: pack.h:109
ObjectPackBuild::State ConsumePayload(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:349
uint64_t BucketSize(size_t idx) const
Definition: pack.cc:166
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:288
Definition: pack.h:247
void InitLock()
Definition: pack.cc:144
bool ParseHeader()
Definition: pack.cc:399
BucketContentType content_type
Definition: pack.h:108
std::vector< IndexEntry > index_
Definition: pack.h:314
std::vector< BucketHandle > buckets_
Definition: pack.h:134
void GetDigest(shash::Any *hash)
Definition: pack.cc:181
unsigned char digest[20]
string StringifyInt(const int64_t value)
Definition: string.cc:77
void DiscardBucket(const BucketHandle handle)
Definition: pack.cc:138
bool ParseItem(const std::string &line, IndexEntry *entry, uint64_t *sum_size)
Definition: pack.cc:442
uint64_t size
Definition: pack.h:105
std::set< BucketHandle > open_buckets_
Definition: pack.h:130
ObjectPack::BucketContentType entry_type
Definition: pack.h:254
shash::Any id
Definition: pack.h:252
~ObjectPack()
Definition: pack.cc:86
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
Any MkFromSuffixedHexPtr(const HexPtr hex)
Definition: hash.cc:104
Definition: mutex.h:42
ObjectPack(const uint64_t limit=kDefaultLimit)
Definition: pack.cc:82
uint64_t limit_
Definition: pack.h:122
shash::Any id
Definition: pack.h:107
uint64_t size_
Definition: pack.h:309
const unsigned kMaxDigestSize
Definition: hash.h:71
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
Definition: pack.cc:116
ObjectPack * pack_
Definition: pack.h:199
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
Definition: pack.cc:100
uint64_t pos_
Definition: pack.h:209
size_t GetNoObjects() const
Definition: pack.h:90
std::string raw_header_
Definition: pack.h:304
int platform_fstat(int filedes, platform_stat64 *buf)
ObjectPackBuild::State state_
Definition: pack.h:298
static void size_t size
Definition: smalloc.h:54
void InitializeHeader(const int version, const int num_objects, const size_t pack_size, std::string *header)
Definition: pack.cc:22
void ParseKeyvalMem(const unsigned char *buffer, const unsigned buffer_size, map< char, string > *content)
Definition: string.cc:369
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
Definition: pack.cc:221
shash::Any expected_digest_
Definition: pack.h:265
uint64_t pos_
Definition: pack.h:271