23 const size_t pack_size, std::string *header) {
33 const std::string &hash_str,
const size_t object_size,
34 const std::string &object_name, std::string *header) {
38 std::string line_prefix =
"";
39 std::string line_suffix =
"";
40 switch (object_type) {
43 line_suffix = std::string(
" ") +
Base64Url(object_name);
52 *header += line_prefix + hash_str +
" " +
StringifyInt(object_size)
60 : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize)))
62 , capacity(kInitialSize)
63 , content_type(kEmpty)
70 while (
size + buf_size > capacity) {
72 content =
reinterpret_cast<unsigned char *
>(srealloc(content, capacity));
74 memcpy(content +
size, buf, buf_size);
87 for (std::set<BucketHandle>::const_iterator i =
open_buckets_.begin(),
94 for (
unsigned i = 0; i <
buckets_.size(); ++i)
96 pthread_mutex_destroy(
lock_);
102 handle->
Add(buf, size);
119 const std::string &name) {
145 lock_ =
reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
146 const int retval = pthread_mutex_init(
lock_, NULL);
187 : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) {
194 for (
unsigned i = 0; i < N; ++i) {
201 const std::string &file_name)
202 : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) {
222 unsigned char *buf) {
223 const unsigned remaining_in_header = (
pos_ <
header_.size())
226 const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
229 pos_ += nbytes_header;
232 unsigned remaining_in_buf = buf_size - nbytes_header;
233 if (remaining_in_buf == 0)
234 return nbytes_header;
235 unsigned nbytes_payload = 0;
238 const size_t nbytes =
239 fread(buf + nbytes_header, 1, remaining_in_buf,
big_file_);
240 nbytes_payload = nbytes;
241 pos_ += nbytes_payload;
242 }
else if (idx_ < pack_->GetNoObjects()) {
244 while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) {
247 const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket);
248 memcpy(buf + nbytes_header + nbytes_payload,
252 nbytes_payload += nbytes;
253 remaining_in_buf -= nbytes;
254 if (nbytes == remaining_in_bucket) {
261 return nbytes_header + nbytes_payload;
267 const unsigned expected_header_size)
268 : expected_digest_(expected_digest)
269 , expected_header_size_(expected_header_size)
290 const unsigned buf_size,
const unsigned char *buf) {
303 const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
305 raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header);
306 pos_ += nbytes_header;
331 if ((buf_size == nbytes_header) && (
index_.size() == 0)) {
337 const unsigned remaining_in_buf = buf_size - nbytes_header;
338 const unsigned char *payload = buf + nbytes_header;
351 const unsigned buf_size,
const unsigned char *buf) {
352 uint64_t pos_in_buf = 0;
354 && ((pos_in_buf < buf_size) || (
index_[
idx_].size == 0))) {
357 const uint64_t remaining_in_buf = buf_size - pos_in_buf;
359 const bool is_small_rest = remaining_in_buf <
kAccuSize;
363 nbytes = std::min(remaining_in_object, remaining_in_buf);
365 || ((remaining_in_buf < remaining_in_object) && is_small_rest)) {
366 const uint64_t remaining_in_accu = kAccuSize -
pos_in_accu_;
367 nbytes = std::min(remaining_in_accu, nbytes);
368 memcpy(
accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes);
369 pos_in_accu_ += nbytes;
370 if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) {
382 pos_in_buf += nbytes;
384 if (nbytes == remaining_in_object) {
401 map<char, string> header;
402 const unsigned char *data =
reinterpret_cast<const unsigned char *
>(
405 if (header.find(
'V') == header.end())
407 if (header[
'V'] !=
"2")
416 const size_t separator_idx =
raw_header_.find(
"--\n");
417 if (separator_idx == string::npos)
419 unsigned index_idx = separator_idx + 3;
423 uint64_t sum_size = 0;
425 const unsigned remaining_in_header =
raw_header_.size() - index_idx;
432 if (!
ParseItem(line, &entry, &sum_size)) {
437 index_idx += line.size() + 1;
440 return (nobjects ==
index_.size()) && (
size_ == sum_size);
445 uint64_t *sum_size) {
446 if (!entry || !sum_size) {
450 if (line[0] ==
'C') {
455 const size_t separator = line.find(
' ', 2);
456 if ((separator == string::npos) || (separator == (line.size() - 1))) {
465 const std::string hash_string = line.substr(2, separator - 2);
472 }
else if (line[0] ==
'N') {
476 const size_t separator1 = line.find(
' ', 2);
477 if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) {
482 const size_t separator2 = line.find(
' ', separator1 + 1);
483 if ((separator1 == 0) || (separator1 == string::npos)
484 || (separator1 == (line.size() - 1))) {
488 const uint64_t
size =
489 String2Uint64(line.substr(separator1 + 1, separator2 - separator1 - 1));
492 if (!
Debase64(line.substr(separator2 + 1), &name)) {
500 const std::string hash_string = line.substr(2, separator1 - 2);
void AppendItemToHeader(ObjectPack::BucketContentType object_type, const std::string &hash_str, const size_t object_size, const std::string &object_name, std::string *header)
void HashString(const std::string &content, Any *any_digest)
unsigned char accumulator_[kAccuSize]
string GetLineMem(const char *text, const int text_size)
ObjectPackConsumer(const shash::Any &expected_digest, const unsigned expected_header_size)
void TransferBucket(const BucketHandle handle, ObjectPack *other)
std::string ToString(const bool with_suffix=false) const
void Add(const void *buf, const uint64_t buf_size)
static const uint64_t kMaxObjects
void NotifyListeners(const ObjectPackBuild::Event ¶meter)
const shash::Any & BucketId(size_t idx) const
assert((mem||(size==0))&&"Out Of Memory")
ObjectPackProducer(ObjectPack *pack)
unsigned char * BucketContent(size_t idx) const
bool Debase64(const string &data, string *decoded)
string Base64Url(const string &data)
unsigned expected_header_size_
static const unsigned kAccuSize
ObjectPackBuild::State ConsumePayload(const unsigned buf_size, const unsigned char *buf)
uint64_t BucketSize(size_t idx) const
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
BucketContentType content_type
std::vector< IndexEntry > index_
std::vector< BucketHandle > buckets_
void GetDigest(shash::Any *hash)
string StringifyInt(const int64_t value)
void DiscardBucket(const BucketHandle handle)
bool ParseItem(const std::string &line, IndexEntry *entry, uint64_t *sum_size)
std::set< BucketHandle > open_buckets_
ObjectPack::BucketContentType entry_type
uint64_t String2Uint64(const string &value)
Any MkFromSuffixedHexPtr(const HexPtr hex)
ObjectPack(const uint64_t limit=kDefaultLimit)
const unsigned kMaxDigestSize
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
size_t GetNoObjects() const
ObjectPackBuild::State state_
void InitializeHeader(const int version, const int num_objects, const size_t pack_size, std::string *header)
void ParseKeyvalMem(const unsigned char *buffer, const unsigned buffer_size, map< char, string > *content)
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
shash::Any expected_digest_