23 const size_t pack_size, std::string *header) {
33 const std::string &hash_str,
const size_t object_size,
34 const std::string &object_name, std::string *header) {
38 std::string line_prefix =
"";
39 std::string line_suffix =
"";
40 switch (object_type) {
43 line_suffix = std::string(
" ") +
Base64Url(object_name);
52 *header += line_prefix + hash_str +
" " +
StringifyInt(object_size) +
60 : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize))),
62 capacity(kInitialSize),
67 if (buf_size == 0)
return;
69 while (
size + buf_size > capacity) {
71 content =
reinterpret_cast<unsigned char *
>(srealloc(content, capacity));
73 memcpy(content +
size, buf, buf_size);
86 for (std::set<BucketHandle>::const_iterator i =
open_buckets_.begin(),
93 pthread_mutex_destroy(
lock_);
99 handle->
Add(buf, size);
116 const std::string &name) {
140 lock_ =
reinterpret_cast<pthread_mutex_t *
>(smalloc(
sizeof(pthread_mutex_t)));
141 int retval = pthread_mutex_init(
lock_, NULL);
182 : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) {
189 for (
unsigned i = 0; i < N; ++i) {
196 const std::string &file_name)
197 : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) {
217 unsigned char *buf) {
218 const unsigned remaining_in_header =
220 const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
223 pos_ += nbytes_header;
226 unsigned remaining_in_buf = buf_size - nbytes_header;
227 if (remaining_in_buf == 0)
return nbytes_header;
228 unsigned nbytes_payload = 0;
231 size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf,
big_file_);
232 nbytes_payload = nbytes;
233 pos_ += nbytes_payload;
234 }
else if (idx_ < pack_->GetNoObjects()) {
236 while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) {
237 const unsigned remaining_in_bucket =
239 const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket);
240 memcpy(buf + nbytes_header + nbytes_payload,
244 nbytes_payload += nbytes;
245 remaining_in_buf -= nbytes;
246 if (nbytes == remaining_in_bucket) {
253 return nbytes_header + nbytes_payload;
259 const unsigned expected_header_size)
260 : expected_digest_(expected_digest),
261 expected_header_size_(expected_header_size),
282 const unsigned buf_size,
const unsigned char *buf) {
283 if (buf_size == 0)
return state_;
290 const unsigned remaining_in_header =
292 const unsigned nbytes_header = std::min(remaining_in_header, buf_size);
294 raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header);
295 pos_ += nbytes_header;
319 if ((buf_size == nbytes_header) && (
index_.size() == 0)) {
325 unsigned remaining_in_buf = buf_size - nbytes_header;
326 const unsigned char *payload = buf + nbytes_header;
339 const unsigned buf_size,
const unsigned char *buf) {
340 uint64_t pos_in_buf = 0;
342 ((pos_in_buf < buf_size) || (
index_[
idx_].size == 0))) {
345 const uint64_t remaining_in_buf = buf_size - pos_in_buf;
347 const bool is_small_rest = remaining_in_buf <
kAccuSize;
351 nbytes = std::min(remaining_in_object, remaining_in_buf);
353 ((remaining_in_buf < remaining_in_object) && is_small_rest)) {
354 const uint64_t remaining_in_accu = kAccuSize -
pos_in_accu_;
355 nbytes = std::min(remaining_in_accu, nbytes);
356 memcpy(
accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes);
357 pos_in_accu_ += nbytes;
358 if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) {
370 pos_in_buf += nbytes;
372 if (nbytes == remaining_in_object) {
389 map<char, string> header;
390 const unsigned char *data =
391 reinterpret_cast<const unsigned char *
>(
raw_header_.data());
393 if (header.find(
'V') == header.end())
return false;
394 if (header[
'V'] !=
"2")
return false;
398 if (nobjects == 0)
return true;
401 const size_t separator_idx =
raw_header_.find(
"--\n");
402 if (separator_idx == string::npos)
return false;
403 unsigned index_idx = separator_idx + 3;
406 uint64_t sum_size = 0;
408 const unsigned remaining_in_header =
raw_header_.size() - index_idx;
411 if (line ==
"")
break;
414 if (!
ParseItem(line, &entry, &sum_size)) {
419 index_idx += line.size() + 1;
422 return (nobjects ==
index_.size()) && (
size_ == sum_size);
427 uint64_t *sum_size) {
428 if (!entry || !sum_size) {
432 if (line[0] ==
'C') {
437 const size_t separator = line.find(
' ', 2);
438 if ((separator == string::npos) || (separator == (line.size() - 1))) {
447 const std::string hash_string = line.substr(2, separator - 2);
454 }
else if (line[0] ==
'N') {
458 const size_t separator1 = line.find(
' ', 2);
459 if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) {
464 const size_t separator2 = line.find(
' ', separator1 + 1);
465 if ((separator1 == 0) || (separator1 == string::npos) ||
466 (separator1 == (line.size() - 1))) {
471 String2Uint64(line.substr(separator1 + 1, separator2 - separator1 - 1));
474 if (!
Debase64(line.substr(separator2 + 1), &name)) {
482 const std::string hash_string = line.substr(2, separator1 - 2);
void AppendItemToHeader(ObjectPack::BucketContentType object_type, const std::string &hash_str, const size_t object_size, const std::string &object_name, std::string *header)
void HashString(const std::string &content, Any *any_digest)
unsigned char accumulator_[kAccuSize]
string GetLineMem(const char *text, const int text_size)
ObjectPackConsumer(const shash::Any &expected_digest, const unsigned expected_header_size)
void TransferBucket(const BucketHandle handle, ObjectPack *other)
std::string ToString(const bool with_suffix=false) const
void Add(const void *buf, const uint64_t buf_size)
static const uint64_t kMaxObjects
void NotifyListeners(const ObjectPackBuild::Event ¶meter)
const shash::Any & BucketId(size_t idx) const
assert((mem||(size==0))&&"Out Of Memory")
ObjectPackProducer(ObjectPack *pack)
unsigned char * BucketContent(size_t idx) const
bool Debase64(const string &data, string *decoded)
string Base64Url(const string &data)
unsigned expected_header_size_
static const unsigned kAccuSize
ObjectPackBuild::State ConsumePayload(const unsigned buf_size, const unsigned char *buf)
uint64_t BucketSize(size_t idx) const
ObjectPackBuild::State ConsumeNext(const unsigned buf_size, const unsigned char *buf)
BucketContentType content_type
std::vector< IndexEntry > index_
std::vector< BucketHandle > buckets_
void GetDigest(shash::Any *hash)
string StringifyInt(const int64_t value)
void DiscardBucket(const BucketHandle handle)
bool ParseItem(const std::string &line, IndexEntry *entry, uint64_t *sum_size)
std::set< BucketHandle > open_buckets_
ObjectPack::BucketContentType entry_type
uint64_t String2Uint64(const string &value)
Any MkFromSuffixedHexPtr(const HexPtr hex)
ObjectPack(const uint64_t limit=kDefaultLimit)
const unsigned kMaxDigestSize
bool CommitBucket(const BucketContentType type, const shash::Any &id, const BucketHandle handle, const std::string &name="")
static void AddToBucket(const void *buf, const uint64_t size, const BucketHandle handle)
size_t GetNoObjects() const
ObjectPackBuild::State state_
void InitializeHeader(const int version, const int num_objects, const size_t pack_size, std::string *header)
void ParseKeyvalMem(const unsigned char *buffer, const unsigned buffer_size, map< char, string > *content)
unsigned ProduceNext(const unsigned buf_size, unsigned char *buf)
shash::Any expected_digest_