23 const size_t pack_size, std::string *header) {
33 const std::string &hash_str,
const size_t object_size,
34 const std::string &object_name, std::string *header) {
38 std::string line_prefix =
"";
39 std::string line_suffix =
"";
40 switch (object_type) {
43 line_suffix = std::string(
" ") +
Base64Url(object_name);
52 *header += line_prefix + hash_str +
" " +
StringifyInt(object_size)
60 : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize)))
62 , capacity(kInitialSize)
63 , content_type(kEmpty)
70 while (
size + buf_size > capacity) {
72 content =
reinterpret_cast<unsigned char *
>(srealloc(content, capacity));
74 memcpy(content +
size, buf, buf_size);
87 for (std::set<BucketHandle>::const_iterator i =
open_buckets_.begin(),
94 for (
unsigned i = 0; i <
buckets_.size(); ++i)
96 pthread_mutex_destroy(
lock_);
102 handle->
Add(buf, size);
119 const std::string &name) {
145 lock_ =
reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
146 int retval = pthread_mutex_init(
lock_, NULL);
187 : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) {
194 for (
unsigned i = 0; i < N; ++i) {
201 const std::string &file_name)
202 : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) {
222 unsigned char *buf) {
223 const unsigned remaining_in_header = (
pos_ <
header_.size())
226 const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
229 pos_ += nbytes_header;
232 unsigned remaining_in_buf = buf_size - nbytes_header;
233 if (remaining_in_buf == 0)
234 return nbytes_header;
235 unsigned nbytes_payload = 0;
238 size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf,
big_file_);
239 nbytes_payload = nbytes;
240 pos_ += nbytes_payload;
241 }
else if (idx_ < pack_->GetNoObjects()) {
243 while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) {
246 const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket);
247 memcpy(buf + nbytes_header + nbytes_payload,
251 nbytes_payload += nbytes;
252 remaining_in_buf -= nbytes;
253 if (nbytes == remaining_in_bucket) {
260 return nbytes_header + nbytes_payload;
266 const unsigned expected_header_size)
267 : expected_digest_(expected_digest)
268 , expected_header_size_(expected_header_size)
289 const unsigned buf_size,
const unsigned char *buf) {
302 const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
304 raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header);
305 pos_ += nbytes_header;
330 if ((buf_size == nbytes_header) && (
index_.size() == 0)) {
336 unsigned remaining_in_buf = buf_size - nbytes_header;
337 const unsigned char *payload = buf + nbytes_header;
350 const unsigned buf_size,
const unsigned char *buf) {
351 uint64_t pos_in_buf = 0;
353 && ((pos_in_buf < buf_size) || (
index_[
idx_].size == 0))) {
356 const uint64_t remaining_in_buf = buf_size - pos_in_buf;
358 const bool is_small_rest = remaining_in_buf <
kAccuSize;
362 nbytes = std::min(remaining_in_object, remaining_in_buf);
364 || ((remaining_in_buf < remaining_in_object) && is_small_rest)) {
365 const uint64_t remaining_in_accu = kAccuSize -
pos_in_accu_;
366 nbytes = std::min(remaining_in_accu, nbytes);
367 memcpy(
accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes);
368 pos_in_accu_ += nbytes;
369 if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) {
381 pos_in_buf += nbytes;
383 if (nbytes == remaining_in_object) {
400 map<char, string> header;
401 const unsigned char *data =
reinterpret_cast<const unsigned char *
>(
404 if (header.find(
'V') == header.end())
406 if (header[
'V'] !=
"2")
415 const size_t separator_idx =
raw_header_.find(
"--\n");
416 if (separator_idx == string::npos)
418 unsigned index_idx = separator_idx + 3;
422 uint64_t sum_size = 0;
424 const unsigned remaining_in_header =
raw_header_.size() - index_idx;
426 remaining_in_header);
431 if (!
ParseItem(line, &entry, &sum_size)) {
436 index_idx += line.size() + 1;
439 return (nobjects ==
index_.size()) && (
size_ == sum_size);
444 uint64_t *sum_size) {
445 if (!entry || !sum_size) {
449 if (line[0] ==
'C') {
454 const size_t separator = line.find(
' ', 2);
455 if ((separator == string::npos) || (separator == (line.size() - 1))) {
464 const std::string hash_string = line.substr(2, separator - 2);
471 }
else if (line[0] ==
'N') {
475 const size_t separator1 = line.find(
' ', 2);
476 if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) {
481 const size_t separator2 = line.find(
' ', separator1 + 1);
482 if ((separator1 == 0) || (separator1 == string::npos)
483 || (separator1 == (line.size() - 1))) {
488 line.substr(separator1 + 1, separator2 - separator1 - 1));
491 if (!
Debase64(line.substr(separator2 + 1), &name)) {
499 const std::string hash_string = line.substr(2, separator1 - 2);
void AppendItemToHeader(ObjectPack::BucketContentType object_type, const std::string &hash_str, const size_t object_size, const std::string &object_name, std::string *header)
void HashString(const std::string &content, Any *any_digest)
unsigned char accumulator_[kAccuSize]
string GetLineMem(const char *text, const int text_size)
ObjectPackConsumer(const shash::Any &expected_digest, const unsigned expected_header_size)
void TransferBucket(const BucketHandle handle, ObjectPack *other)
std::string ToString(const bool with_suffix=false) const
void Add(const void *buf, const uint64_t buf_size)
static const uint64_t kMaxObjects
void NotifyListeners(const ObjectPackBuild::Event ¶meter)
const shash::Any & BucketId(size_t idx) const
assert((mem||(size==0))&&"Out Of Memory")
ObjectPackProducer(ObjectPack *pack)
unsigned char * BucketContent(size_t idx) const
bool Debase64(const string &data, string *decoded)
string Base64Url(const string &data)
unsigned expected_header_size_
static const unsigned kAccuSize
ObjectPackBuild::State ConsumePayload(const unsigned buf_size, const unsigned char *buf)
uint64_t BucketSize(size_t idx) const
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
BucketContentType content_type
std::vector< IndexEntry > index_
std::vector< BucketHandle > buckets_
void GetDigest(shash::Any *hash)
string StringifyInt(const int64_t value)
void DiscardBucket(const BucketHandle handle)
bool ParseItem(const std::string &line, IndexEntry *entry, uint64_t *sum_size)
std::set< BucketHandle > open_buckets_
ObjectPack::BucketContentType entry_type
uint64_t String2Uint64(const string &value)
Any MkFromSuffixedHexPtr(const HexPtr hex)
ObjectPack(const uint64_t limit=kDefaultLimit)
const unsigned kMaxDigestSize
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
size_t GetNoObjects() const
ObjectPackBuild::State state_
void InitializeHeader(const int version, const int num_objects, const size_t pack_size, std::string *header)
void ParseKeyvalMem(const unsigned char *buffer, const unsigned buffer_size, map< char, string > *content)
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
shash::Any expected_digest_