CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pack.cc
Go to the documentation of this file.
1 
5 #include "pack.h"
6 
7 #include <algorithm>
8 #include <cassert>
9 #include <cstring>
10 #include <map>
11 
12 #include "util/concurrency.h"
13 #include "util/exception.h"
14 #include "util/platform.h"
15 #include "util/smalloc.h"
16 #include "util/string.h"
17 
18 using namespace std; // NOLINT
19 
20 namespace { // some private utility functions used by ObjectPackProducer
21 
22 void InitializeHeader(const int version, const int num_objects,
23  const size_t pack_size, std::string *header) {
24  if (header) {
25  *header = "V" + StringifyInt(version) + "\n";
26  *header += "S" + StringifyInt(pack_size) + "\n";
27  *header += "N" + StringifyInt(num_objects) + "\n";
28  *header += "--\n";
29  }
30 }
31 
33  const std::string &hash_str, const size_t object_size,
34  const std::string &object_name, std::string *header) {
35  // If the item type is kName, the "item_name" parameter should not be empty
36  assert((object_type == ObjectPack::kCas)
37  || ((object_type == ObjectPack::kNamed) && (!object_name.empty())));
38  std::string line_prefix = "";
39  std::string line_suffix = "";
40  switch (object_type) {
41  case ObjectPack::kNamed:
42  line_prefix = "N ";
43  line_suffix = std::string(" ") + Base64Url(object_name);
44  break;
45  case ObjectPack::kCas:
46  line_prefix = "C ";
47  break;
48  default:
49  PANIC(kLogStderr, "Unknown object pack type to be added to header.");
50  }
51  if (header) {
52  *header += line_prefix + hash_str + " " + StringifyInt(object_size)
53  + line_suffix + "\n";
54  }
55 }
56 
57 } // namespace
58 
60  : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize)))
61  , size(0)
62  , capacity(kInitialSize)
63  , content_type(kEmpty)
64  , name() { }
65 
66 void ObjectPack::Bucket::Add(const void *buf, const uint64_t buf_size) {
67  if (buf_size == 0)
68  return;
69 
70  while (size + buf_size > capacity) {
71  capacity *= 2;
72  content = reinterpret_cast<unsigned char *>(srealloc(content, capacity));
73  }
74  memcpy(content + size, buf, buf_size);
75  size += buf_size;
76 }
77 
78 ObjectPack::Bucket::~Bucket() { free(content); }
79 
80 //------------------------------------------------------------------------------
81 
82 ObjectPack::ObjectPack(const uint64_t limit) : limit_(limit), size_(0) {
83  InitLock();
84 }
85 
87  for (std::set<BucketHandle>::const_iterator i = open_buckets_.begin(),
88  iEnd = open_buckets_.end();
89  i != iEnd;
90  ++i) {
91  delete *i;
92  }
93 
94  for (unsigned i = 0; i < buckets_.size(); ++i)
95  delete buckets_[i];
96  pthread_mutex_destroy(lock_);
97  free(lock_);
98 }
99 
100 void ObjectPack::AddToBucket(const void *buf, const uint64_t size,
101  const ObjectPack::BucketHandle handle) {
102  handle->Add(buf, size);
103 }
104 
106  BucketHandle handle = new Bucket();
107 
108  const MutexLockGuard mutex_guard(lock_);
109  open_buckets_.insert(handle);
110  return handle;
111 }
112 
117  const shash::Any &id,
118  const ObjectPack::BucketHandle handle,
119  const std::string &name) {
120  handle->id = id;
121 
122  handle->content_type = type;
123  if (type == kNamed) {
124  handle->name = name;
125  }
126 
127  const MutexLockGuard mutex_guard(lock_);
128  if (buckets_.size() >= kMaxObjects)
129  return false;
130  if (size_ + handle->size > limit_)
131  return false;
132  open_buckets_.erase(handle);
133  buckets_.push_back(handle);
134  size_ += handle->size;
135  return true;
136 }
137 
139  const MutexLockGuard mutex_guard(lock_);
140  open_buckets_.erase(handle);
141  delete handle;
142 }
143 
145  lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t)));
146  const int retval = pthread_mutex_init(lock_, NULL);
147  assert(retval == 0);
148 }
149 
155  ObjectPack *other) {
156  const MutexLockGuard mutex_guard(lock_);
157  open_buckets_.erase(handle);
158  other->open_buckets_.insert(handle);
159 }
160 
161 unsigned char *ObjectPack::BucketContent(size_t idx) const {
162  assert(idx < buckets_.size());
163  return buckets_[idx]->content;
164 }
165 
166 uint64_t ObjectPack::BucketSize(size_t idx) const {
167  assert(idx < buckets_.size());
168  return buckets_[idx]->size;
169 }
170 
171 const shash::Any &ObjectPack::BucketId(size_t idx) const {
172  assert(idx < buckets_.size());
173  return buckets_[idx]->id;
174 }
175 
176 //------------------------------------------------------------------------------
177 
182  assert(hash);
183  shash::HashString(header_, hash);
184 }
185 
187  : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) {
188  const unsigned N = pack->GetNoObjects();
189  // rough guess, most likely a little too much
190  header_.reserve(30 + N * (2 * shash::kMaxDigestSize + 5));
191 
192  InitializeHeader(2, N, pack->size(), &header_);
193 
194  for (unsigned i = 0; i < N; ++i) {
196  pack->BucketSize(i), "", &header_);
197  }
198 }
199 
201  const std::string &file_name)
202  : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) {
203  const int fd = fileno(big_file_);
204  assert(fd >= 0);
205  platform_stat64 info;
206  const int retval = platform_fstat(fd, &info);
207  assert(retval == 0);
208 
209  InitializeHeader(2, 1, info.st_size, &header_);
210 
211  AppendItemToHeader(ObjectPack::kNamed, id.ToString(true), info.st_size,
212  file_name, &header_);
213 
214  rewind(big_file);
215 }
216 
221 unsigned ObjectPackProducer::ProduceNext(const unsigned buf_size,
222  unsigned char *buf) {
223  const unsigned remaining_in_header = (pos_ < header_.size())
224  ? (header_.size() - pos_)
225  : 0;
226  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
227  if (nbytes_header) {
228  memcpy(buf, header_.data() + pos_, nbytes_header);
229  pos_ += nbytes_header;
230  }
231 
232  unsigned remaining_in_buf = buf_size - nbytes_header;
233  if (remaining_in_buf == 0)
234  return nbytes_header;
235  unsigned nbytes_payload = 0;
236 
237  if (big_file_) {
238  const size_t nbytes =
239  fread(buf + nbytes_header, 1, remaining_in_buf, big_file_);
240  nbytes_payload = nbytes;
241  pos_ += nbytes_payload;
242  } else if (idx_ < pack_->GetNoObjects()) {
243  // Copy a few buckets more
244  while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) {
245  const unsigned remaining_in_bucket = pack_->BucketSize(idx_)
246  - pos_in_bucket_;
247  const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket);
248  memcpy(buf + nbytes_header + nbytes_payload,
249  pack_->BucketContent(idx_) + pos_in_bucket_, nbytes);
250 
251  pos_in_bucket_ += nbytes;
252  nbytes_payload += nbytes;
253  remaining_in_buf -= nbytes;
254  if (nbytes == remaining_in_bucket) {
255  pos_in_bucket_ = 0;
256  idx_++;
257  }
258  }
259  }
260 
261  return nbytes_header + nbytes_payload;
262 }
263 
264 //------------------------------------------------------------------------------
265 
267  const unsigned expected_header_size)
268  : expected_digest_(expected_digest)
269  , expected_header_size_(expected_header_size)
270  , pos_(0)
271  , idx_(0)
272  , pos_in_object_(0)
273  , pos_in_accu_(0)
274  , state_(ObjectPackBuild::kStateContinue)
275  , size_(0) {
276  // Upper limit of 100B per entry
277  if (expected_header_size > (100 * ObjectPack::kMaxObjects)) {
279  return;
280  }
281 
282  raw_header_.reserve(expected_header_size);
283 }
284 
290  const unsigned buf_size, const unsigned char *buf) {
291  if (buf_size == 0)
292  return state_;
295  return state_;
296  }
298  return state_;
299 
300  const unsigned remaining_in_header = (pos_ < expected_header_size_)
302  : 0;
303  const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
304  if (nbytes_header) {
305  raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header);
306  pos_ += nbytes_header;
307  }
308 
311 
312  // This condition can only be true once through the lifetime of the
313  // Consumer.
314  if (nbytes_header && (pos_ == expected_header_size_)) {
316  shash::HashString(raw_header_, &digest);
317  if (digest != expected_digest_) {
319  return state_;
320  } else {
321  const bool retval = ParseHeader();
322  if (!retval) {
324  return state_;
325  }
326  // We don't need the raw string anymore
327  raw_header_.clear();
328  }
329 
330  // Empty pack?
331  if ((buf_size == nbytes_header) && (index_.size() == 0)) {
333  return state_;
334  }
335  }
336 
337  const unsigned remaining_in_buf = buf_size - nbytes_header;
338  const unsigned char *payload = buf + nbytes_header;
339  return ConsumePayload(remaining_in_buf, payload);
340 }
341 
351  const unsigned buf_size, const unsigned char *buf) {
352  uint64_t pos_in_buf = 0;
353  while ((idx_ < index_.size())
354  && ((pos_in_buf < buf_size) || (index_[idx_].size == 0))) {
355  // Fill the accumulator or process next small object
356  uint64_t nbytes; // How many bytes are consumed in this iteration
357  const uint64_t remaining_in_buf = buf_size - pos_in_buf;
358  const uint64_t remaining_in_object = index_[idx_].size - pos_in_object_;
359  const bool is_small_rest = remaining_in_buf < kAccuSize;
360 
361  // We use the accumulator if there is already something in or if we have a
362  // small piece of data of a larger object.
363  nbytes = std::min(remaining_in_object, remaining_in_buf);
364  if ((pos_in_accu_ > 0)
365  || ((remaining_in_buf < remaining_in_object) && is_small_rest)) {
366  const uint64_t remaining_in_accu = kAccuSize - pos_in_accu_;
367  nbytes = std::min(remaining_in_accu, nbytes);
368  memcpy(accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes);
369  pos_in_accu_ += nbytes;
370  if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) {
372  index_[idx_].id, index_[idx_].size, pos_in_accu_, accumulator_,
373  index_[idx_].entry_type, index_[idx_].entry_name));
374  pos_in_accu_ = 0;
375  }
376  } else { // directly trigger listeners using buf
378  index_[idx_].id, index_[idx_].size, nbytes, buf + pos_in_buf,
379  index_[idx_].entry_type, index_[idx_].entry_name));
380  }
381 
382  pos_in_buf += nbytes;
383  pos_in_object_ += nbytes;
384  if (nbytes == remaining_in_object) {
385  idx_++;
386  pos_in_object_ = 0;
387  }
388  }
389 
390  pos_ += buf_size;
391 
392  if (idx_ == index_.size())
393  state_ = (pos_in_buf == buf_size) ? ObjectPackBuild::kStateDone
395  else
397  return state_;
398 }
399 
401  map<char, string> header;
402  const unsigned char *data = reinterpret_cast<const unsigned char *>(
403  raw_header_.data());
404  ParseKeyvalMem(data, raw_header_.size(), &header);
405  if (header.find('V') == header.end())
406  return false;
407  if (header['V'] != "2")
408  return false;
409  size_ = String2Uint64(header['S']);
410  const unsigned nobjects = String2Uint64(header['N']);
411 
412  if (nobjects == 0)
413  return true;
414 
415  // Build the object index
416  const size_t separator_idx = raw_header_.find("--\n");
417  if (separator_idx == string::npos)
418  return false;
419  unsigned index_idx = separator_idx + 3;
420  if (index_idx >= raw_header_.size())
421  return false;
422 
423  uint64_t sum_size = 0;
424  do {
425  const unsigned remaining_in_header = raw_header_.size() - index_idx;
426  const string line =
427  GetLineMem(raw_header_.data() + index_idx, remaining_in_header);
428  if (line == "")
429  break;
430 
431  IndexEntry entry;
432  if (!ParseItem(line, &entry, &sum_size)) {
433  break;
434  }
435 
436  index_.push_back(entry);
437  index_idx += line.size() + 1;
438  } while (index_idx < raw_header_.size());
439 
440  return (nobjects == index_.size()) && (size_ == sum_size);
441 }
442 
443 bool ObjectPackConsumer::ParseItem(const std::string &line,
445  uint64_t *sum_size) {
446  if (!entry || !sum_size) {
447  return false;
448  }
449 
450  if (line[0] == 'C') { // CAS blob
452 
453  // We could use SplitString but we can have many lines so we do something
454  // more efficient here
455  const size_t separator = line.find(' ', 2);
456  if ((separator == string::npos) || (separator == (line.size() - 1))) {
457  return false;
458  }
459 
460  const uint64_t size = String2Uint64(line.substr(separator + 1));
461  *sum_size += size;
462 
463  // Warning do not construct a HexPtr with an rvalue!
464  // The constructor takes the address of its argument.
465  const std::string hash_string = line.substr(2, separator - 2);
466  const shash::HexPtr hex_ptr(hash_string);
467 
468  entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
469  entry->size = size;
470  entry->entry_type = entry_type;
471  entry->entry_name = "";
472  } else if (line[0] == 'N') { // Named file
474 
475  // First separator, before the size field
476  const size_t separator1 = line.find(' ', 2);
477  if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) {
478  return false;
479  }
480 
481  // Second separator, before the name field
482  const size_t separator2 = line.find(' ', separator1 + 1);
483  if ((separator1 == 0) || (separator1 == string::npos)
484  || (separator1 == (line.size() - 1))) {
485  return false;
486  }
487 
488  const uint64_t size =
489  String2Uint64(line.substr(separator1 + 1, separator2 - separator1 - 1));
490 
491  std::string name;
492  if (!Debase64(line.substr(separator2 + 1), &name)) {
493  return false;
494  }
495 
496  *sum_size += size;
497 
498  // Warning do not construct a HexPtr with an rvalue!
499  // The constructor takes the address of its argument.
500  const std::string hash_string = line.substr(2, separator1 - 2);
501  const shash::HexPtr hex_ptr(hash_string);
502 
503  entry->id = shash::MkFromSuffixedHexPtr(hex_ptr);
504  entry->size = size;
505  entry->entry_type = entry_type;
506  entry->entry_name = name;
507  } else { // Error
508  return false;
509  }
510 
511  return true;
512 }
void AppendItemToHeader(ObjectPack::BucketContentType object_type, const std::string &hash_str, const size_t object_size, const std::string &object_name, std::string *header)
Definition: pack.cc:32
void HashString(const std::string &content, Any *any_digest)
Definition: hash.cc:267
struct stat64 platform_stat64
unsigned char accumulator_[kAccuSize]
Definition: pack.h:287
string GetLineMem(const char *text, const int text_size)
Definition: string.cc:415
BucketContentType
Definition: pack.h:53
ObjectPackConsumer(const shash::Any &expected_digest, const unsigned expected_header_size)
Definition: pack.cc:266
std::string header_
Definition: pack.h:224
unsigned idx_
Definition: pack.h:276
#define PANIC(...)
Definition: exception.h:29
void TransferBucket(const BucketHandle handle, ObjectPack *other)
Definition: pack.cc:154
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
void Add(const void *buf, const uint64_t buf_size)
Definition: pack.cc:66
uint64_t size_
Definition: pack.h:126
static const uint64_t kMaxObjects
Definition: pack.h:66
std::string entry_name
Definition: pack.h:255
void NotifyListeners(const ObjectPackBuild::Event &parameter)
FILE * big_file_
Definition: pack.h:204
const shash::Any & BucketId(size_t idx) const
Definition: pack.cc:171
pthread_mutex_t * lock_
Definition: pack.h:117
assert((mem||(size==0))&&"Out Of Memory")
ObjectPackProducer(ObjectPack *pack)
Definition: pack.cc:186
Algorithms algorithm
Definition: hash.h:122
unsigned char * BucketContent(size_t idx) const
Definition: pack.cc:161
size_t idx_
Definition: pack.h:214
bool Debase64(const string &data, string *decoded)
Definition: string.cc:598
uint64_t size() const
Definition: pack.h:86
unsigned pos_in_object_
Definition: pack.h:281
uint64_t size
Definition: pack.h:253
string Base64Url(const string &data)
Definition: string.cc:568
BucketHandle NewBucket()
Definition: pack.cc:105
unsigned expected_header_size_
Definition: pack.h:266
static const unsigned kAccuSize
Definition: pack.h:245
size_t pos_in_bucket_
Definition: pack.h:219
unsigned pos_in_accu_
Definition: pack.h:292
std::string name
Definition: pack.h:109
ObjectPackBuild::State ConsumePayload(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:350
uint64_t BucketSize(size_t idx) const
Definition: pack.cc:166
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
Definition: pack.cc:289
Definition: pack.h:247
void InitLock()
Definition: pack.cc:144
bool ParseHeader()
Definition: pack.cc:400
BucketContentType content_type
Definition: pack.h:108
std::vector< IndexEntry > index_
Definition: pack.h:314
std::vector< BucketHandle > buckets_
Definition: pack.h:134
void GetDigest(shash::Any *hash)
Definition: pack.cc:181
unsigned char digest[20]
string StringifyInt(const int64_t value)
Definition: string.cc:77
void DiscardBucket(const BucketHandle handle)
Definition: pack.cc:138
bool ParseItem(const std::string &line, IndexEntry *entry, uint64_t *sum_size)
Definition: pack.cc:443
uint64_t size
Definition: pack.h:105
std::set< BucketHandle > open_buckets_
Definition: pack.h:130
ObjectPack::BucketContentType entry_type
Definition: pack.h:254
shash::Any id
Definition: pack.h:252
~ObjectPack()
Definition: pack.cc:86
uint64_t String2Uint64(const string &value)
Definition: string.cc:240
Any MkFromSuffixedHexPtr(const HexPtr hex)
Definition: hash.cc:104
Definition: mutex.h:42
ObjectPack(const uint64_t limit=kDefaultLimit)
Definition: pack.cc:82
uint64_t limit_
Definition: pack.h:122
shash::Any id
Definition: pack.h:107
uint64_t size_
Definition: pack.h:309
const unsigned kMaxDigestSize
Definition: hash.h:71
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
Definition: pack.cc:116
ObjectPack * pack_
Definition: pack.h:199
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
Definition: pack.cc:100
uint64_t pos_
Definition: pack.h:209
size_t GetNoObjects() const
Definition: pack.h:90
std::string raw_header_
Definition: pack.h:304
int platform_fstat(int filedes, platform_stat64 *buf)
ObjectPackBuild::State state_
Definition: pack.h:298
static void size_t size
Definition: smalloc.h:54
void InitializeHeader(const int version, const int num_objects, const size_t pack_size, std::string *header)
Definition: pack.cc:22
void ParseKeyvalMem(const unsigned char *buffer, const unsigned buffer_size, map< char, string > *content)
Definition: string.cc:369
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
Definition: pack.cc:221
shash::Any expected_digest_
Definition: pack.h:265
uint64_t pos_
Definition: pack.h:271