Directory: | cvmfs/ |
---|---|
File: | cvmfs/pack.cc |
Date: | 2025-07-06 02:35:01 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 283 | 307 | 92.2% |
Branches: | 202 | 332 | 60.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "pack.h" | ||
6 | |||
7 | #include <algorithm> | ||
8 | #include <cassert> | ||
9 | #include <cstring> | ||
10 | #include <map> | ||
11 | |||
12 | #include "util/concurrency.h" | ||
13 | #include "util/exception.h" | ||
14 | #include "util/platform.h" | ||
15 | #include "util/smalloc.h" | ||
16 | #include "util/string.h" | ||
17 | |||
18 | using namespace std; // NOLINT | ||
19 | |||
20 | namespace { // some private utility functions used by ObjectPackProducer | ||
21 | |||
22 | 2436 | void InitializeHeader(const int version, const int num_objects, | |
23 | const size_t pack_size, std::string *header) { | ||
24 |
1/2✓ Branch 0 taken 2436 times.
✗ Branch 1 not taken.
|
2436 | if (header) { |
25 |
2/4✓ Branch 2 taken 2436 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2436 times.
✗ Branch 6 not taken.
|
2436 | *header = "V" + StringifyInt(version) + "\n"; |
26 |
3/6✓ Branch 2 taken 2436 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2436 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2436 times.
✗ Branch 9 not taken.
|
2436 | *header += "S" + StringifyInt(pack_size) + "\n"; |
27 |
3/6✓ Branch 2 taken 2436 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2436 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 2436 times.
✗ Branch 9 not taken.
|
2436 | *header += "N" + StringifyInt(num_objects) + "\n"; |
28 | 2436 | *header += "--\n"; | |
29 | } | ||
30 | 2436 | } | |
31 | |||
32 | 220596 | void AppendItemToHeader(ObjectPack::BucketContentType object_type, | |
33 | const std::string &hash_str, const size_t object_size, | ||
34 | const std::string &object_name, std::string *header) { | ||
35 | // If the item type is kName, the "item_name" parameter should not be empty | ||
36 |
4/6✓ Branch 0 taken 90 times.
✓ Branch 1 taken 220506 times.
✓ Branch 2 taken 90 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 90 times.
✗ Branch 6 not taken.
|
220596 | assert((object_type == ObjectPack::kCas) |
37 | || ((object_type == ObjectPack::kNamed) && (!object_name.empty()))); | ||
38 |
1/2✓ Branch 2 taken 220596 times.
✗ Branch 3 not taken.
|
220596 | std::string line_prefix = ""; |
39 |
1/2✓ Branch 2 taken 220596 times.
✗ Branch 3 not taken.
|
220596 | std::string line_suffix = ""; |
40 |
2/3✓ Branch 0 taken 90 times.
✓ Branch 1 taken 220506 times.
✗ Branch 2 not taken.
|
220596 | switch (object_type) { |
41 | 90 | case ObjectPack::kNamed: | |
42 |
1/2✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
|
90 | line_prefix = "N "; |
43 |
3/6✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 90 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 90 times.
✗ Branch 9 not taken.
|
90 | line_suffix = std::string(" ") + Base64Url(object_name); |
44 | 90 | break; | |
45 | 220506 | case ObjectPack::kCas: | |
46 |
1/2✓ Branch 1 taken 220506 times.
✗ Branch 2 not taken.
|
220506 | line_prefix = "C "; |
47 | 220506 | break; | |
48 | ✗ | default: | |
49 | ✗ | PANIC(kLogStderr, "Unknown object pack type to be added to header."); | |
50 | } | ||
51 |
1/2✓ Branch 0 taken 220596 times.
✗ Branch 1 not taken.
|
220596 | if (header) { |
52 |
4/8✓ Branch 1 taken 220596 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 220596 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 220596 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 220596 times.
✗ Branch 11 not taken.
|
441192 | *header += line_prefix + hash_str + " " + StringifyInt(object_size) |
53 |
3/6✓ Branch 1 taken 220596 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 220596 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 220596 times.
✗ Branch 8 not taken.
|
220596 | + line_suffix + "\n"; |
54 | } | ||
55 | 220596 | } | |
56 | |||
57 | } // namespace | ||
58 | |||
59 | 3012504 | ObjectPack::Bucket::Bucket() | |
60 | 3012504 | : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize))) | |
61 | 3012504 | , size(0) | |
62 | 3012504 | , capacity(kInitialSize) | |
63 | 3012504 | , content_type(kEmpty) | |
64 | 6025008 | , name() { } | |
65 | |||
66 | 12174 | void ObjectPack::Bucket::Add(const void *buf, const uint64_t buf_size) { | |
67 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12174 times.
|
12174 | if (buf_size == 0) |
68 | ✗ | return; | |
69 | |||
70 |
2/2✓ Branch 0 taken 131130 times.
✓ Branch 1 taken 12174 times.
|
143304 | while (size + buf_size > capacity) { |
71 | 131130 | capacity *= 2; | |
72 | 131130 | content = reinterpret_cast<unsigned char *>(srealloc(content, capacity)); | |
73 | } | ||
74 | 12174 | memcpy(content + size, buf, buf_size); | |
75 | 12174 | size += buf_size; | |
76 | } | ||
77 | |||
78 | 3012504 | ObjectPack::Bucket::~Bucket() { free(content); } | |
79 | |||
80 | //------------------------------------------------------------------------------ | ||
81 | |||
82 | 2604 | ObjectPack::ObjectPack(const uint64_t limit) : limit_(limit), size_(0) { | |
83 | 2604 | InitLock(); | |
84 | 2604 | } | |
85 | |||
86 | 2556 | ObjectPack::~ObjectPack() { | |
87 | 5112 | for (std::set<BucketHandle>::const_iterator i = open_buckets_.begin(), | |
88 | 2556 | iEnd = open_buckets_.end(); | |
89 |
2/2✓ Branch 1 taken 1980 times.
✓ Branch 2 taken 2556 times.
|
4536 | i != iEnd; |
90 | 1980 | ++i) { | |
91 |
1/2✓ Branch 1 taken 1980 times.
✗ Branch 2 not taken.
|
1980 | delete *i; |
92 | } | ||
93 | |||
94 |
2/2✓ Branch 1 taken 3010404 times.
✓ Branch 2 taken 2556 times.
|
3012960 | for (unsigned i = 0; i < buckets_.size(); ++i) |
95 |
1/2✓ Branch 1 taken 3010404 times.
✗ Branch 2 not taken.
|
3010404 | delete buckets_[i]; |
96 | 2556 | pthread_mutex_destroy(lock_); | |
97 | 2556 | free(lock_); | |
98 | 2556 | } | |
99 | |||
100 | 3744 | void ObjectPack::AddToBucket(const void *buf, const uint64_t size, | |
101 | const ObjectPack::BucketHandle handle) { | ||
102 | 3744 | handle->Add(buf, size); | |
103 | 3744 | } | |
104 | |||
105 | 3012474 | ObjectPack::BucketHandle ObjectPack::NewBucket() { | |
106 |
2/4✓ Branch 1 taken 3012474 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3012474 times.
✗ Branch 5 not taken.
|
3012474 | BucketHandle handle = new Bucket(); |
107 | |||
108 | 3012474 | const MutexLockGuard mutex_guard(lock_); | |
109 |
1/2✓ Branch 1 taken 3012474 times.
✗ Branch 2 not taken.
|
3012474 | open_buckets_.insert(handle); |
110 | 3012474 | return handle; | |
111 | 3012474 | } | |
112 | |||
113 | /** | ||
114 | * Can only fail due to insufficient remaining space in the ObjectPack. | ||
115 | */ | ||
116 | 3013182 | bool ObjectPack::CommitBucket(const BucketContentType type, | |
117 | const shash::Any &id, | ||
118 | const ObjectPack::BucketHandle handle, | ||
119 | const std::string &name) { | ||
120 | 3013182 | handle->id = id; | |
121 | |||
122 | 3013182 | handle->content_type = type; | |
123 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3013182 times.
|
3013182 | if (type == kNamed) { |
124 | ✗ | handle->name = name; | |
125 | } | ||
126 | |||
127 | 3013182 | const MutexLockGuard mutex_guard(lock_); | |
128 |
2/2✓ Branch 1 taken 30 times.
✓ Branch 2 taken 3013152 times.
|
3013182 | if (buckets_.size() >= kMaxObjects) |
129 | 30 | return false; | |
130 |
2/2✓ Branch 0 taken 2748 times.
✓ Branch 1 taken 3010404 times.
|
3013152 | if (size_ + handle->size > limit_) |
131 | 2748 | return false; | |
132 |
1/2✓ Branch 1 taken 3010404 times.
✗ Branch 2 not taken.
|
3010404 | open_buckets_.erase(handle); |
133 |
1/2✓ Branch 1 taken 3010404 times.
✗ Branch 2 not taken.
|
3010404 | buckets_.push_back(handle); |
134 | 3010404 | size_ += handle->size; | |
135 | 3010404 | return true; | |
136 | 3013182 | } | |
137 | |||
138 | 90 | void ObjectPack::DiscardBucket(const BucketHandle handle) { | |
139 | 90 | const MutexLockGuard mutex_guard(lock_); | |
140 |
1/2✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
|
90 | open_buckets_.erase(handle); |
141 |
1/2✓ Branch 0 taken 90 times.
✗ Branch 1 not taken.
|
90 | delete handle; |
142 | 90 | } | |
143 | |||
144 | 2604 | void ObjectPack::InitLock() { | |
145 | 2604 | lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
146 | 2604 | const int retval = pthread_mutex_init(lock_, NULL); | |
147 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2604 times.
|
2604 | assert(retval == 0); |
148 | 2604 | } | |
149 | |||
150 | /** | ||
151 | * If a commit failed, an open Bucket can be transferred to another ObjectPack | ||
152 | * with more space. | ||
153 | */ | ||
154 | 1086 | void ObjectPack::TransferBucket(const ObjectPack::BucketHandle handle, | |
155 | ObjectPack *other) { | ||
156 | 1086 | const MutexLockGuard mutex_guard(lock_); | |
157 |
1/2✓ Branch 1 taken 1086 times.
✗ Branch 2 not taken.
|
1086 | open_buckets_.erase(handle); |
158 |
1/2✓ Branch 1 taken 1086 times.
✗ Branch 2 not taken.
|
1086 | other->open_buckets_.insert(handle); |
159 | 1086 | } | |
160 | |||
161 | 1151136 | unsigned char *ObjectPack::BucketContent(size_t idx) const { | |
162 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1151136 times.
|
1151136 | assert(idx < buckets_.size()); |
163 | 1151136 | return buckets_[idx]->content; | |
164 | } | ||
165 | |||
166 | 1371642 | uint64_t ObjectPack::BucketSize(size_t idx) const { | |
167 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1371642 times.
|
1371642 | assert(idx < buckets_.size()); |
168 | 1371642 | return buckets_[idx]->size; | |
169 | } | ||
170 | |||
171 | 220506 | const shash::Any &ObjectPack::BucketId(size_t idx) const { | |
172 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 220506 times.
|
220506 | assert(idx < buckets_.size()); |
173 | 220506 | return buckets_[idx]->id; | |
174 | } | ||
175 | |||
176 | //------------------------------------------------------------------------------ | ||
177 | |||
178 | /** | ||
179 | * Hash over the header. The hash algorithm needs to be provided by hash. | ||
180 | */ | ||
181 | 2190 | void ObjectPackProducer::GetDigest(shash::Any *hash) { | |
182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2190 times.
|
2190 | assert(hash); |
183 | 2190 | shash::HashString(header_, hash); | |
184 | 2190 | } | |
185 | |||
186 | 2346 | ObjectPackProducer::ObjectPackProducer(ObjectPack *pack) | |
187 | 2346 | : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) { | |
188 | 2346 | const unsigned N = pack->GetNoObjects(); | |
189 | // rough guess, most likely a little too much | ||
190 |
1/2✓ Branch 1 taken 2346 times.
✗ Branch 2 not taken.
|
2346 | header_.reserve(30 + N * (2 * shash::kMaxDigestSize + 5)); |
191 | |||
192 |
1/2✓ Branch 2 taken 2346 times.
✗ Branch 3 not taken.
|
2346 | InitializeHeader(2, N, pack->size(), &header_); |
193 | |||
194 |
2/2✓ Branch 0 taken 220506 times.
✓ Branch 1 taken 2346 times.
|
222852 | for (unsigned i = 0; i < N; ++i) { |
195 |
3/6✓ Branch 2 taken 220506 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 220506 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 220506 times.
✗ Branch 11 not taken.
|
220506 | AppendItemToHeader(ObjectPack::kCas, pack->BucketId(i).ToString(true), |
196 | pack->BucketSize(i), "", &header_); | ||
197 | } | ||
198 | 2346 | } | |
199 | |||
200 | 90 | ObjectPackProducer::ObjectPackProducer(const shash::Any &id, FILE *big_file, | |
201 | 90 | const std::string &file_name) | |
202 | 90 | : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) { | |
203 | 90 | const int fd = fileno(big_file_); | |
204 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
|
90 | assert(fd >= 0); |
205 | platform_stat64 info; | ||
206 | 90 | const int retval = platform_fstat(fd, &info); | |
207 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 90 times.
|
90 | assert(retval == 0); |
208 | |||
209 |
1/2✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
|
90 | InitializeHeader(2, 1, info.st_size, &header_); |
210 | |||
211 |
2/4✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 90 times.
✗ Branch 5 not taken.
|
90 | AppendItemToHeader(ObjectPack::kNamed, id.ToString(true), info.st_size, |
212 | file_name, &header_); | ||
213 | |||
214 |
1/2✓ Branch 1 taken 90 times.
✗ Branch 2 not taken.
|
90 | rewind(big_file); |
215 | 90 | } | |
216 | |||
217 | /** | ||
218 | * Copies as many bytes as possible into buf. If the returned number of bytes | ||
219 | * is shorter than buf_size, everything has been produced. | ||
220 | */ | ||
221 | 941400 | unsigned ObjectPackProducer::ProduceNext(const unsigned buf_size, | |
222 | unsigned char *buf) { | ||
223 | 941400 | const unsigned remaining_in_header = (pos_ < header_.size()) | |
224 | 9936 | ? (header_.size() - pos_) | |
225 |
2/2✓ Branch 0 taken 9936 times.
✓ Branch 1 taken 931464 times.
|
951336 | : 0; |
226 | 941400 | const unsigned nbytes_header = std::min(remaining_in_header, buf_size); | |
227 |
2/2✓ Branch 0 taken 9936 times.
✓ Branch 1 taken 931464 times.
|
941400 | if (nbytes_header) { |
228 | 9936 | memcpy(buf, header_.data() + pos_, nbytes_header); | |
229 | 9936 | pos_ += nbytes_header; | |
230 | } | ||
231 | |||
232 | 941400 | unsigned remaining_in_buf = buf_size - nbytes_header; | |
233 |
2/2✓ Branch 0 taken 7590 times.
✓ Branch 1 taken 933810 times.
|
941400 | if (remaining_in_buf == 0) |
234 | 7590 | return nbytes_header; | |
235 | 933810 | unsigned nbytes_payload = 0; | |
236 | |||
237 |
2/2✓ Branch 0 taken 570 times.
✓ Branch 1 taken 933240 times.
|
933810 | if (big_file_) { |
238 |
1/2✓ Branch 1 taken 570 times.
✗ Branch 2 not taken.
|
570 | const size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf, |
239 | big_file_); | ||
240 | 570 | nbytes_payload = nbytes; | |
241 | 570 | pos_ += nbytes_payload; | |
242 |
2/2✓ Branch 1 taken 932916 times.
✓ Branch 2 taken 324 times.
|
933240 | } else if (idx_ < pack_->GetNoObjects()) { |
243 | // Copy a few buckets more | ||
244 |
6/6✓ Branch 0 taken 1153362 times.
✓ Branch 1 taken 930690 times.
✓ Branch 3 taken 1151136 times.
✓ Branch 4 taken 2226 times.
✓ Branch 5 taken 1151136 times.
✓ Branch 6 taken 932916 times.
|
2084052 | while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) { |
245 | 1151136 | const unsigned remaining_in_bucket = pack_->BucketSize(idx_) | |
246 | 1151136 | - pos_in_bucket_; | |
247 | 1151136 | const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket); | |
248 | 2302272 | memcpy(buf + nbytes_header + nbytes_payload, | |
249 | 1151136 | pack_->BucketContent(idx_) + pos_in_bucket_, nbytes); | |
250 | |||
251 | 1151136 | pos_in_bucket_ += nbytes; | |
252 | 1151136 | nbytes_payload += nbytes; | |
253 | 1151136 | remaining_in_buf -= nbytes; | |
254 |
2/2✓ Branch 0 taken 220506 times.
✓ Branch 1 taken 930630 times.
|
1151136 | if (nbytes == remaining_in_bucket) { |
255 | 220506 | pos_in_bucket_ = 0; | |
256 | 220506 | idx_++; | |
257 | } | ||
258 | } | ||
259 | } | ||
260 | |||
261 | 933810 | return nbytes_header + nbytes_payload; | |
262 | } | ||
263 | |||
264 | //------------------------------------------------------------------------------ | ||
265 | |||
266 | 2220 | ObjectPackConsumer::ObjectPackConsumer(const shash::Any &expected_digest, | |
267 | 2220 | const unsigned expected_header_size) | |
268 | 2220 | : expected_digest_(expected_digest) | |
269 | 2220 | , expected_header_size_(expected_header_size) | |
270 | 2220 | , pos_(0) | |
271 | 2220 | , idx_(0) | |
272 | 2220 | , pos_in_object_(0) | |
273 | 2220 | , pos_in_accu_(0) | |
274 | 2220 | , state_(ObjectPackBuild::kStateContinue) | |
275 | 2220 | , size_(0) { | |
276 | // Upper limit of 100B per entry | ||
277 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2220 times.
|
2220 | if (expected_header_size > (100 * ObjectPack::kMaxObjects)) { |
278 | ✗ | state_ = ObjectPackBuild::kStateHeaderTooBig; | |
279 | ✗ | return; | |
280 | } | ||
281 | |||
282 |
1/2✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
|
2220 | raw_header_.reserve(expected_header_size); |
283 | } | ||
284 | |||
285 | /** | ||
286 | * At the end of the function, pos_ will have progressed by buf_size (unless | ||
287 | * the buffer contains trailing garbage bytes. | ||
288 | */ | ||
289 | 939750 | ObjectPackBuild::State ObjectPackConsumer::ConsumeNext( | |
290 | const unsigned buf_size, const unsigned char *buf) { | ||
291 |
2/2✓ Branch 0 taken 2010 times.
✓ Branch 1 taken 937740 times.
|
939750 | if (buf_size == 0) |
292 | 2010 | return state_; | |
293 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 937740 times.
|
937740 | if (state_ == ObjectPackBuild::kStateDone) { |
294 | ✗ | state_ = ObjectPackBuild::kStateTrailingBytes; | |
295 | ✗ | return state_; | |
296 | } | ||
297 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 937740 times.
|
937740 | if (state_ != ObjectPackBuild::kStateContinue) |
298 | ✗ | return state_; | |
299 | |||
300 |
2/2✓ Branch 0 taken 7350 times.
✓ Branch 1 taken 930390 times.
|
937740 | const unsigned remaining_in_header = (pos_ < expected_header_size_) |
301 | 7350 | ? (expected_header_size_ - pos_) | |
302 | : 0; | ||
303 | 937740 | const unsigned nbytes_header = std::min(remaining_in_header, buf_size); | |
304 |
2/2✓ Branch 0 taken 7350 times.
✓ Branch 1 taken 930390 times.
|
937740 | if (nbytes_header) { |
305 |
2/4✓ Branch 2 taken 7350 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 7350 times.
✗ Branch 6 not taken.
|
7350 | raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header); |
306 | 7350 | pos_ += nbytes_header; | |
307 | } | ||
308 | |||
309 |
2/2✓ Branch 0 taken 5130 times.
✓ Branch 1 taken 932610 times.
|
937740 | if (pos_ < expected_header_size_) |
310 | 5130 | return ObjectPackBuild::kStateContinue; | |
311 | |||
312 | // This condition can only be true once through the lifetime of the | ||
313 | // Consumer. | ||
314 |
3/4✓ Branch 0 taken 2220 times.
✓ Branch 1 taken 930390 times.
✓ Branch 2 taken 2220 times.
✗ Branch 3 not taken.
|
932610 | if (nbytes_header && (pos_ == expected_header_size_)) { |
315 |
1/2✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
|
2220 | shash::Any digest(expected_digest_.algorithm); |
316 |
1/2✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
|
2220 | shash::HashString(raw_header_, &digest); |
317 |
2/4✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2220 times.
|
2220 | if (digest != expected_digest_) { |
318 | ✗ | state_ = ObjectPackBuild::kStateCorrupt; | |
319 | 60 | return state_; | |
320 | } else { | ||
321 |
1/2✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
|
2220 | const bool retval = ParseHeader(); |
322 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2220 times.
|
2220 | if (!retval) { |
323 | ✗ | state_ = ObjectPackBuild::kStateBadFormat; | |
324 | ✗ | return state_; | |
325 | } | ||
326 | // We don't need the raw string anymore | ||
327 | 2220 | raw_header_.clear(); | |
328 | } | ||
329 | |||
330 | // Empty pack? | ||
331 |
6/6✓ Branch 0 taken 90 times.
✓ Branch 1 taken 2130 times.
✓ Branch 3 taken 60 times.
✓ Branch 4 taken 30 times.
✓ Branch 5 taken 60 times.
✓ Branch 6 taken 2160 times.
|
2220 | if ((buf_size == nbytes_header) && (index_.size() == 0)) { |
332 | 60 | state_ = ObjectPackBuild::kStateDone; | |
333 | 60 | return state_; | |
334 | } | ||
335 | } | ||
336 | |||
337 | 932550 | const unsigned remaining_in_buf = buf_size - nbytes_header; | |
338 | 932550 | const unsigned char *payload = buf + nbytes_header; | |
339 |
1/2✓ Branch 1 taken 932550 times.
✗ Branch 2 not taken.
|
932550 | return ConsumePayload(remaining_in_buf, payload); |
340 | } | ||
341 | |||
342 | /** | ||
343 | * Informs listeners for small complete objects. For large objects, buffers | ||
344 | * the | ||
345 | * input into reasonably sized chunks. buf can contain both a chunk of data | ||
346 | * that needs to be added to the consumer's accumulator and a bunch of | ||
347 | * complete small objects. We use the accumulator only if necessary to avoid | ||
348 | * unnecessary memory copies. | ||
349 | */ | ||
350 | 932550 | ObjectPackBuild::State ObjectPackConsumer::ConsumePayload( | |
351 | const unsigned buf_size, const unsigned char *buf) { | ||
352 | 932550 | uint64_t pos_in_buf = 0; | |
353 | 2220240 | while ((idx_ < index_.size()) | |
354 |
8/8✓ Branch 0 taken 2218080 times.
✓ Branch 1 taken 2160 times.
✓ Branch 2 taken 930420 times.
✓ Branch 3 taken 1287660 times.
✓ Branch 5 taken 30 times.
✓ Branch 6 taken 930390 times.
✓ Branch 7 taken 1287690 times.
✓ Branch 8 taken 932550 times.
|
2220240 | && ((pos_in_buf < buf_size) || (index_[idx_].size == 0))) { |
355 | // Fill the accumulator or process next small object | ||
356 | uint64_t nbytes; // How many bytes are consumed in this iteration | ||
357 | 1287690 | const uint64_t remaining_in_buf = buf_size - pos_in_buf; | |
358 | 1287690 | const uint64_t remaining_in_object = index_[idx_].size - pos_in_object_; | |
359 | 1287690 | const bool is_small_rest = remaining_in_buf < kAccuSize; | |
360 | |||
361 | // We use the accumulator if there is already something in or if we have a | ||
362 | // small piece of data of a larger object. | ||
363 | 1287690 | nbytes = std::min(remaining_in_object, remaining_in_buf); | |
364 |
2/2✓ Branch 0 taken 984510 times.
✓ Branch 1 taken 303180 times.
|
1287690 | if ((pos_in_accu_ > 0) |
365 |
4/4✓ Branch 0 taken 768420 times.
✓ Branch 1 taken 216090 times.
✓ Branch 2 taken 141270 times.
✓ Branch 3 taken 627150 times.
|
984510 | || ((remaining_in_buf < remaining_in_object) && is_small_rest)) { |
366 | 444450 | const uint64_t remaining_in_accu = kAccuSize - pos_in_accu_; | |
367 | 444450 | nbytes = std::min(remaining_in_accu, nbytes); | |
368 | 444450 | memcpy(accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes); | |
369 | 444450 | pos_in_accu_ += nbytes; | |
370 |
4/4✓ Branch 0 taken 307410 times.
✓ Branch 1 taken 137040 times.
✓ Branch 2 taken 4230 times.
✓ Branch 3 taken 303180 times.
|
444450 | if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) { |
371 |
1/2✓ Branch 1 taken 141270 times.
✗ Branch 2 not taken.
|
141270 | NotifyListeners(ObjectPackBuild::Event( |
372 |
1/2✓ Branch 3 taken 141270 times.
✗ Branch 4 not taken.
|
141270 | index_[idx_].id, index_[idx_].size, pos_in_accu_, accumulator_, |
373 | 141270 | index_[idx_].entry_type, index_[idx_].entry_name)); | |
374 | 141270 | pos_in_accu_ = 0; | |
375 | } | ||
376 | 444450 | } else { // directly trigger listeners using buf | |
377 |
1/2✓ Branch 1 taken 843240 times.
✗ Branch 2 not taken.
|
843240 | NotifyListeners(ObjectPackBuild::Event( |
378 |
1/2✓ Branch 3 taken 843240 times.
✗ Branch 4 not taken.
|
843240 | index_[idx_].id, index_[idx_].size, nbytes, buf + pos_in_buf, |
379 | 843240 | index_[idx_].entry_type, index_[idx_].entry_name)); | |
380 | } | ||
381 | |||
382 | 1287690 | pos_in_buf += nbytes; | |
383 | 1287690 | pos_in_object_ += nbytes; | |
384 |
2/2✓ Branch 0 taken 220350 times.
✓ Branch 1 taken 1067340 times.
|
1287690 | if (nbytes == remaining_in_object) { |
385 | 220350 | idx_++; | |
386 | 220350 | pos_in_object_ = 0; | |
387 | } | ||
388 | } | ||
389 | |||
390 | 932550 | pos_ += buf_size; | |
391 | |||
392 |
2/2✓ Branch 1 taken 2160 times.
✓ Branch 2 taken 930390 times.
|
932550 | if (idx_ == index_.size()) |
393 |
1/2✓ Branch 0 taken 2160 times.
✗ Branch 1 not taken.
|
2160 | state_ = (pos_in_buf == buf_size) ? ObjectPackBuild::kStateDone |
394 | : ObjectPackBuild::kStateTrailingBytes; | ||
395 | else | ||
396 | 930390 | state_ = ObjectPackBuild::kStateContinue; | |
397 | 932550 | return state_; | |
398 | } | ||
399 | |||
400 | 2220 | bool ObjectPackConsumer::ParseHeader() { | |
401 | 2220 | map<char, string> header; | |
402 | const unsigned char *data = reinterpret_cast<const unsigned char *>( | ||
403 | 2220 | raw_header_.data()); | |
404 |
1/2✓ Branch 2 taken 2220 times.
✗ Branch 3 not taken.
|
2220 | ParseKeyvalMem(data, raw_header_.size(), &header); |
405 |
2/5✓ Branch 2 taken 2220 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 2220 times.
|
2220 | if (header.find('V') == header.end()) |
406 | ✗ | return false; | |
407 |
3/7✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 2220 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 2220 times.
|
2220 | if (header['V'] != "2") |
408 | ✗ | return false; | |
409 |
2/4✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2220 times.
✗ Branch 5 not taken.
|
2220 | size_ = String2Uint64(header['S']); |
410 |
2/4✓ Branch 1 taken 2220 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2220 times.
✗ Branch 5 not taken.
|
2220 | const unsigned nobjects = String2Uint64(header['N']); |
411 | |||
412 |
2/2✓ Branch 0 taken 60 times.
✓ Branch 1 taken 2160 times.
|
2220 | if (nobjects == 0) |
413 | 60 | return true; | |
414 | |||
415 | // Build the object index | ||
416 | 2160 | const size_t separator_idx = raw_header_.find("--\n"); | |
417 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2160 times.
|
2160 | if (separator_idx == string::npos) |
418 | ✗ | return false; | |
419 | 2160 | unsigned index_idx = separator_idx + 3; | |
420 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 2160 times.
|
2160 | if (index_idx >= raw_header_.size()) |
421 | ✗ | return false; | |
422 | |||
423 | 2160 | uint64_t sum_size = 0; | |
424 | do { | ||
425 | 220350 | const unsigned remaining_in_header = raw_header_.size() - index_idx; | |
426 | 440700 | const string line = GetLineMem(raw_header_.data() + index_idx, | |
427 |
1/2✓ Branch 2 taken 220350 times.
✗ Branch 3 not taken.
|
220350 | remaining_in_header); |
428 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 220350 times.
|
220350 | if (line == "") |
429 | ✗ | break; | |
430 | |||
431 |
1/2✓ Branch 1 taken 220350 times.
✗ Branch 2 not taken.
|
220350 | IndexEntry entry; |
432 |
2/4✓ Branch 1 taken 220350 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 220350 times.
|
220350 | if (!ParseItem(line, &entry, &sum_size)) { |
433 | ✗ | break; | |
434 | } | ||
435 | |||
436 |
1/2✓ Branch 1 taken 220350 times.
✗ Branch 2 not taken.
|
220350 | index_.push_back(entry); |
437 | 220350 | index_idx += line.size() + 1; | |
438 |
4/6✓ Branch 1 taken 220350 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 220350 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 218190 times.
✓ Branch 8 taken 2160 times.
|
440700 | } while (index_idx < raw_header_.size()); |
439 | |||
440 |
2/4✓ Branch 1 taken 2160 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2160 times.
✗ Branch 4 not taken.
|
2160 | return (nobjects == index_.size()) && (size_ == sum_size); |
441 | 2220 | } | |
442 | |||
443 | 220350 | bool ObjectPackConsumer::ParseItem(const std::string &line, | |
444 | ObjectPackConsumer::IndexEntry *entry, | ||
445 | uint64_t *sum_size) { | ||
446 |
2/4✓ Branch 0 taken 220350 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 220350 times.
|
220350 | if (!entry || !sum_size) { |
447 | ✗ | return false; | |
448 | } | ||
449 | |||
450 |
2/2✓ Branch 1 taken 220320 times.
✓ Branch 2 taken 30 times.
|
220350 | if (line[0] == 'C') { // CAS blob |
451 | 220320 | const ObjectPack::BucketContentType entry_type = ObjectPack::kCas; | |
452 | |||
453 | // We could use SplitString but we can have many lines so we do something | ||
454 | // more efficient here | ||
455 | 220320 | const size_t separator = line.find(' ', 2); | |
456 |
3/6✓ Branch 0 taken 220320 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 220320 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 220320 times.
|
220320 | if ((separator == string::npos) || (separator == (line.size() - 1))) { |
457 | ✗ | return false; | |
458 | } | ||
459 | |||
460 |
2/4✓ Branch 1 taken 220320 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 220320 times.
✗ Branch 5 not taken.
|
220320 | const uint64_t size = String2Uint64(line.substr(separator + 1)); |
461 | 220320 | *sum_size += size; | |
462 | |||
463 | // Warning do not construct a HexPtr with an rvalue! | ||
464 | // The constructor takes the address of its argument. | ||
465 |
1/2✓ Branch 1 taken 220320 times.
✗ Branch 2 not taken.
|
220320 | const std::string hash_string = line.substr(2, separator - 2); |
466 | 220320 | const shash::HexPtr hex_ptr(hash_string); | |
467 | |||
468 |
1/2✓ Branch 1 taken 220320 times.
✗ Branch 2 not taken.
|
220320 | entry->id = shash::MkFromSuffixedHexPtr(hex_ptr); |
469 | 220320 | entry->size = size; | |
470 | 220320 | entry->entry_type = entry_type; | |
471 |
1/2✓ Branch 1 taken 220320 times.
✗ Branch 2 not taken.
|
220320 | entry->entry_name = ""; |
472 |
1/2✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
|
220350 | } else if (line[0] == 'N') { // Named file |
473 | 30 | const ObjectPack::BucketContentType entry_type = ObjectPack::kNamed; | |
474 | |||
475 | // First separator, before the size field | ||
476 | 30 | const size_t separator1 = line.find(' ', 2); | |
477 |
3/6✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 30 times.
|
30 | if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) { |
478 | ✗ | return false; | |
479 | } | ||
480 | |||
481 | // Second separator, before the name field | ||
482 | 30 | const size_t separator2 = line.find(' ', separator1 + 1); | |
483 |
1/2✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
|
30 | if ((separator1 == 0) || (separator1 == string::npos) |
484 |
3/6✓ Branch 0 taken 30 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 30 times.
|
60 | || (separator1 == (line.size() - 1))) { |
485 | ✗ | return false; | |
486 | } | ||
487 | |||
488 |
1/2✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
|
30 | const uint64_t size = String2Uint64( |
489 |
1/2✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
|
60 | line.substr(separator1 + 1, separator2 - separator1 - 1)); |
490 | |||
491 | 30 | std::string name; | |
492 |
3/7✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 30 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 30 times.
|
30 | if (!Debase64(line.substr(separator2 + 1), &name)) { |
493 | ✗ | return false; | |
494 | } | ||
495 | |||
496 | 30 | *sum_size += size; | |
497 | |||
498 | // Warning do not construct a HexPtr with an rvalue! | ||
499 | // The constructor takes the address of its argument. | ||
500 |
1/2✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
|
30 | const std::string hash_string = line.substr(2, separator1 - 2); |
501 | 30 | const shash::HexPtr hex_ptr(hash_string); | |
502 | |||
503 |
1/2✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
|
30 | entry->id = shash::MkFromSuffixedHexPtr(hex_ptr); |
504 | 30 | entry->size = size; | |
505 | 30 | entry->entry_type = entry_type; | |
506 |
1/2✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
|
30 | entry->entry_name = name; |
507 |
1/2✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
|
30 | } else { // Error |
508 | ✗ | return false; | |
509 | } | ||
510 | |||
511 | 220350 | return true; | |
512 | } | ||
513 |