Directory: | cvmfs/ |
---|---|
File: | cvmfs/pack.cc |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 270 | 287 | 94.1% |
Branches: | 202 | 332 | 60.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "pack.h" | ||
6 | |||
7 | #include <algorithm> | ||
8 | #include <cassert> | ||
9 | #include <cstring> | ||
10 | #include <map> | ||
11 | |||
12 | #include "util/concurrency.h" | ||
13 | #include "util/exception.h" | ||
14 | #include "util/platform.h" | ||
15 | #include "util/smalloc.h" | ||
16 | #include "util/string.h" | ||
17 | |||
18 | using namespace std; // NOLINT | ||
19 | |||
20 | namespace { // some private utility functions used by ObjectPackProducer | ||
21 | |||
22 | 80 | void InitializeHeader(const int version, const int num_objects, | |
23 | const size_t pack_size, std::string *header) { | ||
24 |
1/2✓ Branch 0 taken 80 times.
✗ Branch 1 not taken.
|
80 | if (header) { |
25 |
2/4✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 80 times.
✗ Branch 6 not taken.
|
80 | *header = "V" + StringifyInt(version) + "\n"; |
26 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 80 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 80 times.
✗ Branch 9 not taken.
|
80 | *header += "S" + StringifyInt(pack_size) + "\n"; |
27 |
3/6✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 80 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 80 times.
✗ Branch 9 not taken.
|
80 | *header += "N" + StringifyInt(num_objects) + "\n"; |
28 | 80 | *header += "--\n"; | |
29 | } | ||
30 | 80 | } | |
31 | |||
32 | 6545 | void AppendItemToHeader(ObjectPack::BucketContentType object_type, | |
33 | const std::string &hash_str, const size_t object_size, | ||
34 | const std::string &object_name, std::string *header) { | ||
35 | // If the item type is kName, the "item_name" parameter should not be empty | ||
36 |
4/6✓ Branch 0 taken 3 times.
✓ Branch 1 taken 6542 times.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
|
6545 | assert((object_type == ObjectPack::kCas) || |
37 | ((object_type == ObjectPack::kNamed) && (!object_name.empty()))); | ||
38 |
1/2✓ Branch 2 taken 6545 times.
✗ Branch 3 not taken.
|
6545 | std::string line_prefix = ""; |
39 |
1/2✓ Branch 2 taken 6545 times.
✗ Branch 3 not taken.
|
6545 | std::string line_suffix = ""; |
40 |
2/3✓ Branch 0 taken 3 times.
✓ Branch 1 taken 6542 times.
✗ Branch 2 not taken.
|
6545 | switch (object_type) { |
41 | 3 | case ObjectPack::kNamed: | |
42 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | line_prefix = "N "; |
43 |
3/6✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 3 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 3 times.
✗ Branch 9 not taken.
|
3 | line_suffix = std::string(" ") + Base64Url(object_name); |
44 | 3 | break; | |
45 | 6542 | case ObjectPack::kCas: | |
46 |
1/2✓ Branch 1 taken 6542 times.
✗ Branch 2 not taken.
|
6542 | line_prefix = "C "; |
47 | 6542 | break; | |
48 | ✗ | default: | |
49 | ✗ | PANIC(kLogStderr, "Unknown object pack type to be added to header."); | |
50 | } | ||
51 |
1/2✓ Branch 0 taken 6545 times.
✗ Branch 1 not taken.
|
6545 | if (header) { |
52 |
5/10✓ Branch 1 taken 6545 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6545 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6545 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 6545 times.
✗ Branch 11 not taken.
✓ Branch 13 taken 6545 times.
✗ Branch 14 not taken.
|
13090 | *header += line_prefix + hash_str + " " + StringifyInt(object_size) + |
53 |
2/4✓ Branch 1 taken 6545 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6545 times.
✗ Branch 5 not taken.
|
6545 | line_suffix + "\n"; |
54 | } | ||
55 | 6545 | } | |
56 | |||
57 | } // namespace | ||
58 | |||
59 | 100369 | ObjectPack::Bucket::Bucket() | |
60 | 100369 | : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize))), | |
61 | 100369 | size(0), | |
62 | 100369 | capacity(kInitialSize), | |
63 | 100369 | content_type(kEmpty), | |
64 | 200738 | name() {} | |
65 | |||
66 | 358 | void ObjectPack::Bucket::Add(const void *buf, const uint64_t buf_size) { | |
67 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 358 times.
|
358 | if (buf_size == 0) return; |
68 | |||
69 |
2/2✓ Branch 0 taken 4055 times.
✓ Branch 1 taken 358 times.
|
4413 | while (size + buf_size > capacity) { |
70 | 4055 | capacity *= 2; | |
71 | 4055 | content = reinterpret_cast<unsigned char *>(srealloc(content, capacity)); | |
72 | } | ||
73 | 358 | memcpy(content + size, buf, buf_size); | |
74 | 358 | size += buf_size; | |
75 | } | ||
76 | |||
77 | 100369 | ObjectPack::Bucket::~Bucket() { free(content); } | |
78 | |||
79 | //------------------------------------------------------------------------------ | ||
80 | |||
81 | 67 | ObjectPack::ObjectPack(const uint64_t limit) : limit_(limit), size_(0) { | |
82 | 67 | InitLock(); | |
83 | 67 | } | |
84 | |||
85 | 66 | ObjectPack::~ObjectPack() { | |
86 | 132 | for (std::set<BucketHandle>::const_iterator i = open_buckets_.begin(), | |
87 | 66 | iEnd = open_buckets_.end(); | |
88 |
2/2✓ Branch 2 taken 66 times.
✓ Branch 3 taken 66 times.
|
132 | i != iEnd; ++i) { |
89 |
1/2✓ Branch 1 taken 66 times.
✗ Branch 2 not taken.
|
66 | delete *i; |
90 | } | ||
91 | |||
92 |
3/4✓ Branch 1 taken 100299 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 100299 times.
✓ Branch 7 taken 66 times.
|
100365 | for (unsigned i = 0; i < buckets_.size(); ++i) delete buckets_[i]; |
93 | 66 | pthread_mutex_destroy(lock_); | |
94 | 66 | free(lock_); | |
95 | 66 | } | |
96 | |||
97 | 93 | void ObjectPack::AddToBucket(const void *buf, const uint64_t size, | |
98 | const ObjectPack::BucketHandle handle) { | ||
99 | 93 | handle->Add(buf, size); | |
100 | 93 | } | |
101 | |||
102 | 100368 | ObjectPack::BucketHandle ObjectPack::NewBucket() { | |
103 |
2/4✓ Branch 1 taken 100368 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 100368 times.
✗ Branch 5 not taken.
|
100368 | BucketHandle handle = new Bucket(); |
104 | |||
105 | 100368 | MutexLockGuard mutex_guard(lock_); | |
106 |
1/2✓ Branch 1 taken 100368 times.
✗ Branch 2 not taken.
|
100368 | open_buckets_.insert(handle); |
107 | 100368 | return handle; | |
108 | 100368 | } | |
109 | |||
110 | /** | ||
111 | * Can only fail due to insufficient remaining space in the ObjectPack. | ||
112 | */ | ||
113 | 100382 | bool ObjectPack::CommitBucket(const BucketContentType type, | |
114 | const shash::Any &id, | ||
115 | const ObjectPack::BucketHandle handle, | ||
116 | const std::string &name) { | ||
117 | 100382 | handle->id = id; | |
118 | |||
119 | 100382 | handle->content_type = type; | |
120 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 100382 times.
|
100382 | if (type == kNamed) { |
121 | ✗ | handle->name = name; | |
122 | } | ||
123 | |||
124 | 100382 | MutexLockGuard mutex_guard(lock_); | |
125 |
2/2✓ Branch 1 taken 1 times.
✓ Branch 2 taken 100381 times.
|
100382 | if (buckets_.size() >= kMaxObjects) return false; |
126 |
2/2✓ Branch 0 taken 82 times.
✓ Branch 1 taken 100299 times.
|
100381 | if (size_ + handle->size > limit_) return false; |
127 |
1/2✓ Branch 1 taken 100299 times.
✗ Branch 2 not taken.
|
100299 | open_buckets_.erase(handle); |
128 |
1/2✓ Branch 1 taken 100299 times.
✗ Branch 2 not taken.
|
100299 | buckets_.push_back(handle); |
129 | 100299 | size_ += handle->size; | |
130 | 100299 | return true; | |
131 | 100382 | } | |
132 | |||
133 | 3 | void ObjectPack::DiscardBucket(const BucketHandle handle) { | |
134 | 3 | MutexLockGuard mutex_guard(lock_); | |
135 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | open_buckets_.erase(handle); |
136 |
1/2✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
|
3 | delete handle; |
137 | 3 | } | |
138 | |||
139 | 67 | void ObjectPack::InitLock() { | |
140 | 67 | lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
141 | 67 | int retval = pthread_mutex_init(lock_, NULL); | |
142 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 67 times.
|
67 | assert(retval == 0); |
143 | 67 | } | |
144 | |||
145 | /** | ||
146 | * If a commit failed, an open Bucket can be transferred to another ObjectPack | ||
147 | * with more space. | ||
148 | */ | ||
149 | 23 | void ObjectPack::TransferBucket(const ObjectPack::BucketHandle handle, | |
150 | ObjectPack *other) { | ||
151 | 23 | MutexLockGuard mutex_guard(lock_); | |
152 |
1/2✓ Branch 1 taken 23 times.
✗ Branch 2 not taken.
|
23 | open_buckets_.erase(handle); |
153 |
1/2✓ Branch 1 taken 23 times.
✗ Branch 2 not taken.
|
23 | other->open_buckets_.insert(handle); |
154 | 23 | } | |
155 | |||
156 | 37641 | unsigned char *ObjectPack::BucketContent(size_t idx) const { | |
157 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 37641 times.
|
37641 | assert(idx < buckets_.size()); |
158 | 37641 | return buckets_[idx]->content; | |
159 | } | ||
160 | |||
161 | 44183 | uint64_t ObjectPack::BucketSize(size_t idx) const { | |
162 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 44183 times.
|
44183 | assert(idx < buckets_.size()); |
163 | 44183 | return buckets_[idx]->size; | |
164 | } | ||
165 | |||
166 | 6542 | const shash::Any &ObjectPack::BucketId(size_t idx) const { | |
167 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 6542 times.
|
6542 | assert(idx < buckets_.size()); |
168 | 6542 | return buckets_[idx]->id; | |
169 | } | ||
170 | |||
171 | //------------------------------------------------------------------------------ | ||
172 | |||
173 | /** | ||
174 | * Hash over the header. The hash algorithm needs to be provided by hash. | ||
175 | */ | ||
176 | 73 | void ObjectPackProducer::GetDigest(shash::Any *hash) { | |
177 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 73 times.
|
73 | assert(hash); |
178 | 73 | shash::HashString(header_, hash); | |
179 | 73 | } | |
180 | |||
181 | 77 | ObjectPackProducer::ObjectPackProducer(ObjectPack *pack) | |
182 | 77 | : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) { | |
183 | 77 | unsigned N = pack->GetNoObjects(); | |
184 | // rough guess, most likely a little too much | ||
185 |
1/2✓ Branch 1 taken 77 times.
✗ Branch 2 not taken.
|
77 | header_.reserve(30 + N * (2 * shash::kMaxDigestSize + 5)); |
186 | |||
187 |
1/2✓ Branch 2 taken 77 times.
✗ Branch 3 not taken.
|
77 | InitializeHeader(2, N, pack->size(), &header_); |
188 | |||
189 |
2/2✓ Branch 0 taken 6542 times.
✓ Branch 1 taken 77 times.
|
6619 | for (unsigned i = 0; i < N; ++i) { |
190 |
3/6✓ Branch 2 taken 6542 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 6542 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 6542 times.
✗ Branch 11 not taken.
|
6542 | AppendItemToHeader(ObjectPack::kCas, pack->BucketId(i).ToString(true), |
191 | pack->BucketSize(i), "", &header_); | ||
192 | } | ||
193 | 77 | } | |
194 | |||
195 | 3 | ObjectPackProducer::ObjectPackProducer(const shash::Any &id, FILE *big_file, | |
196 | 3 | const std::string &file_name) | |
197 | 3 | : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) { | |
198 | 3 | int fd = fileno(big_file_); | |
199 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | assert(fd >= 0); |
200 | platform_stat64 info; | ||
201 | 3 | int retval = platform_fstat(fd, &info); | |
202 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
|
3 | assert(retval == 0); |
203 | |||
204 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | InitializeHeader(2, 1, info.st_size, &header_); |
205 | |||
206 |
2/4✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3 times.
✗ Branch 5 not taken.
|
3 | AppendItemToHeader(ObjectPack::kNamed, id.ToString(true), info.st_size, |
207 | file_name, &header_); | ||
208 | |||
209 |
1/2✓ Branch 1 taken 3 times.
✗ Branch 2 not taken.
|
3 | rewind(big_file); |
210 | 3 | } | |
211 | |||
212 | /** | ||
213 | * Copies as many bytes as possible into buf. If the returned number of bytes | ||
214 | * is shorter than buf_size, everything has been produced. | ||
215 | */ | ||
216 | 31456 | unsigned ObjectPackProducer::ProduceNext(const unsigned buf_size, | |
217 | unsigned char *buf) { | ||
218 | const unsigned remaining_in_header = | ||
219 |
2/2✓ Branch 1 taken 331 times.
✓ Branch 2 taken 31125 times.
|
31456 | (pos_ < header_.size()) ? (header_.size() - pos_) : 0; |
220 | 31456 | const unsigned nbytes_header = std::min(remaining_in_header, buf_size); | |
221 |
2/2✓ Branch 0 taken 331 times.
✓ Branch 1 taken 31125 times.
|
31456 | if (nbytes_header) { |
222 | 331 | memcpy(buf, header_.data() + pos_, nbytes_header); | |
223 | 331 | pos_ += nbytes_header; | |
224 | } | ||
225 | |||
226 | 31456 | unsigned remaining_in_buf = buf_size - nbytes_header; | |
227 |
2/2✓ Branch 0 taken 254 times.
✓ Branch 1 taken 31202 times.
|
31456 | if (remaining_in_buf == 0) return nbytes_header; |
228 | 31202 | unsigned nbytes_payload = 0; | |
229 | |||
230 |
2/2✓ Branch 0 taken 19 times.
✓ Branch 1 taken 31183 times.
|
31202 | if (big_file_) { |
231 |
1/2✓ Branch 1 taken 19 times.
✗ Branch 2 not taken.
|
19 | size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf, big_file_); |
232 | 19 | nbytes_payload = nbytes; | |
233 | 19 | pos_ += nbytes_payload; | |
234 |
2/2✓ Branch 1 taken 31174 times.
✓ Branch 2 taken 9 times.
|
31183 | } else if (idx_ < pack_->GetNoObjects()) { |
235 | // Copy a few buckets more | ||
236 |
6/6✓ Branch 0 taken 37714 times.
✓ Branch 1 taken 31101 times.
✓ Branch 3 taken 37641 times.
✓ Branch 4 taken 73 times.
✓ Branch 5 taken 37641 times.
✓ Branch 6 taken 31174 times.
|
68815 | while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) { |
237 | const unsigned remaining_in_bucket = | ||
238 | 37641 | pack_->BucketSize(idx_) - pos_in_bucket_; | |
239 | 37641 | const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket); | |
240 | 75282 | memcpy(buf + nbytes_header + nbytes_payload, | |
241 | 37641 | pack_->BucketContent(idx_) + pos_in_bucket_, nbytes); | |
242 | |||
243 | 37641 | pos_in_bucket_ += nbytes; | |
244 | 37641 | nbytes_payload += nbytes; | |
245 | 37641 | remaining_in_buf -= nbytes; | |
246 |
2/2✓ Branch 0 taken 6542 times.
✓ Branch 1 taken 31099 times.
|
37641 | if (nbytes == remaining_in_bucket) { |
247 | 6542 | pos_in_bucket_ = 0; | |
248 | 6542 | idx_++; | |
249 | } | ||
250 | } | ||
251 | } | ||
252 | |||
253 | 31202 | return nbytes_header + nbytes_payload; | |
254 | } | ||
255 | |||
256 | //------------------------------------------------------------------------------ | ||
257 | |||
258 | 74 | ObjectPackConsumer::ObjectPackConsumer(const shash::Any &expected_digest, | |
259 | 74 | const unsigned expected_header_size) | |
260 | 74 | : expected_digest_(expected_digest), | |
261 | 74 | expected_header_size_(expected_header_size), | |
262 | 74 | pos_(0), | |
263 | 74 | idx_(0), | |
264 | 74 | pos_in_object_(0), | |
265 | 74 | pos_in_accu_(0), | |
266 | 74 | state_(ObjectPackBuild::kStateContinue), | |
267 | 74 | size_(0) { | |
268 | // Upper limit of 100B per entry | ||
269 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 74 times.
|
74 | if (expected_header_size > (100 * ObjectPack::kMaxObjects)) { |
270 | ✗ | state_ = ObjectPackBuild::kStateHeaderTooBig; | |
271 | ✗ | return; | |
272 | } | ||
273 | |||
274 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | raw_header_.reserve(expected_header_size); |
275 | } | ||
276 | |||
277 | /** | ||
278 | * At the end of the function, pos_ will have progressed by buf_size (unless | ||
279 | * the buffer contains trailing garbage bytes. | ||
280 | */ | ||
281 | 31407 | ObjectPackBuild::State ObjectPackConsumer::ConsumeNext( | |
282 | const unsigned buf_size, const unsigned char *buf) { | ||
283 |
2/2✓ Branch 0 taken 67 times.
✓ Branch 1 taken 31340 times.
|
31407 | if (buf_size == 0) return state_; |
284 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 31340 times.
|
31340 | if (state_ == ObjectPackBuild::kStateDone) { |
285 | ✗ | state_ = ObjectPackBuild::kStateTrailingBytes; | |
286 | ✗ | return state_; | |
287 | } | ||
288 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 31340 times.
|
31340 | if (state_ != ObjectPackBuild::kStateContinue) return state_; |
289 | |||
290 | 31094 | const unsigned remaining_in_header = | |
291 |
2/2✓ Branch 0 taken 246 times.
✓ Branch 1 taken 31094 times.
|
31340 | (pos_ < expected_header_size_) ? (expected_header_size_ - pos_) : 0; |
292 | 31340 | const unsigned nbytes_header = std::min(remaining_in_header, buf_size); | |
293 |
2/2✓ Branch 0 taken 246 times.
✓ Branch 1 taken 31094 times.
|
31340 | if (nbytes_header) { |
294 |
2/4✓ Branch 2 taken 246 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 246 times.
✗ Branch 6 not taken.
|
246 | raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header); |
295 | 246 | pos_ += nbytes_header; | |
296 | } | ||
297 | |||
298 |
2/2✓ Branch 0 taken 172 times.
✓ Branch 1 taken 31168 times.
|
31340 | if (pos_ < expected_header_size_) return ObjectPackBuild::kStateContinue; |
299 | |||
300 | // This condition can only be true once through the lifetime of the | ||
301 | // Consumer. | ||
302 |
3/4✓ Branch 0 taken 74 times.
✓ Branch 1 taken 31094 times.
✓ Branch 2 taken 74 times.
✗ Branch 3 not taken.
|
31168 | if (nbytes_header && (pos_ == expected_header_size_)) { |
303 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | shash::Any digest(expected_digest_.algorithm); |
304 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | shash::HashString(raw_header_, &digest); |
305 |
2/4✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 74 times.
|
74 | if (digest != expected_digest_) { |
306 | ✗ | state_ = ObjectPackBuild::kStateCorrupt; | |
307 | 2 | return state_; | |
308 | } else { | ||
309 |
1/2✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
|
74 | bool retval = ParseHeader(); |
310 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 74 times.
|
74 | if (!retval) { |
311 | ✗ | state_ = ObjectPackBuild::kStateBadFormat; | |
312 | ✗ | return state_; | |
313 | } | ||
314 | // We don't need the raw string anymore | ||
315 | 74 | raw_header_.clear(); | |
316 | } | ||
317 | |||
318 | // Empty pack? | ||
319 |
6/6✓ Branch 0 taken 3 times.
✓ Branch 1 taken 71 times.
✓ Branch 3 taken 2 times.
✓ Branch 4 taken 1 times.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 72 times.
|
74 | if ((buf_size == nbytes_header) && (index_.size() == 0)) { |
320 | 2 | state_ = ObjectPackBuild::kStateDone; | |
321 | 2 | return state_; | |
322 | } | ||
323 | } | ||
324 | |||
325 | 31166 | unsigned remaining_in_buf = buf_size - nbytes_header; | |
326 | 31166 | const unsigned char *payload = buf + nbytes_header; | |
327 |
1/2✓ Branch 1 taken 31166 times.
✗ Branch 2 not taken.
|
31166 | return ConsumePayload(remaining_in_buf, payload); |
328 | } | ||
329 | |||
330 | /** | ||
331 | * Informs listeners for small complete objects. For large objects, buffers | ||
332 | * the | ||
333 | * input into reasonably sized chunks. buf can contain both a chunk of data | ||
334 | * that needs to be added to the consumer's accumulator and a bunch of | ||
335 | * complete small objects. We use the accumulator only if necessary to avoid | ||
336 | * unnecessary memory copies. | ||
337 | */ | ||
338 | 31166 | ObjectPackBuild::State ObjectPackConsumer::ConsumePayload( | |
339 | const unsigned buf_size, const unsigned char *buf) { | ||
340 | 31166 | uint64_t pos_in_buf = 0; | |
341 |
4/4✓ Branch 1 taken 73359 times.
✓ Branch 2 taken 72 times.
✓ Branch 3 taken 42265 times.
✓ Branch 4 taken 31166 times.
|
146790 | while ((idx_ < index_.size()) && |
342 |
4/4✓ Branch 0 taken 31095 times.
✓ Branch 1 taken 42264 times.
✓ Branch 3 taken 1 times.
✓ Branch 4 taken 31094 times.
|
73359 | ((pos_in_buf < buf_size) || (index_[idx_].size == 0))) { |
343 | // Fill the accumulator or process next small object | ||
344 | uint64_t nbytes; // How many bytes are consumed in this iteration | ||
345 | 42265 | const uint64_t remaining_in_buf = buf_size - pos_in_buf; | |
346 | 42265 | const uint64_t remaining_in_object = index_[idx_].size - pos_in_object_; | |
347 | 42265 | const bool is_small_rest = remaining_in_buf < kAccuSize; | |
348 | |||
349 | // We use the accumulator if there is already something in or if we have a | ||
350 | // small piece of data of a larger object. | ||
351 | 42265 | nbytes = std::min(remaining_in_object, remaining_in_buf); | |
352 |
2/2✓ Branch 0 taken 32124 times.
✓ Branch 1 taken 10141 times.
|
42265 | if ((pos_in_accu_ > 0) || |
353 |
4/4✓ Branch 0 taken 25689 times.
✓ Branch 1 taken 6435 times.
✓ Branch 2 taken 4738 times.
✓ Branch 3 taken 20951 times.
|
32124 | ((remaining_in_buf < remaining_in_object) && is_small_rest)) { |
354 | 14879 | const uint64_t remaining_in_accu = kAccuSize - pos_in_accu_; | |
355 | 14879 | nbytes = std::min(remaining_in_accu, nbytes); | |
356 | 14879 | memcpy(accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes); | |
357 | 14879 | pos_in_accu_ += nbytes; | |
358 |
4/4✓ Branch 0 taken 10243 times.
✓ Branch 1 taken 4636 times.
✓ Branch 2 taken 102 times.
✓ Branch 3 taken 10141 times.
|
14879 | if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) { |
359 |
1/2✓ Branch 1 taken 4738 times.
✗ Branch 2 not taken.
|
4738 | NotifyListeners(ObjectPackBuild::Event( |
360 |
1/2✓ Branch 3 taken 4738 times.
✗ Branch 4 not taken.
|
4738 | index_[idx_].id, index_[idx_].size, pos_in_accu_, accumulator_, |
361 | 4738 | index_[idx_].entry_type, index_[idx_].entry_name)); | |
362 | 4738 | pos_in_accu_ = 0; | |
363 | } | ||
364 | 14879 | } else { // directly trigger listeners using buf | |
365 |
1/2✓ Branch 1 taken 27386 times.
✗ Branch 2 not taken.
|
27386 | NotifyListeners(ObjectPackBuild::Event( |
366 |
1/2✓ Branch 3 taken 27386 times.
✗ Branch 4 not taken.
|
27386 | index_[idx_].id, index_[idx_].size, nbytes, buf + pos_in_buf, |
367 | 27386 | index_[idx_].entry_type, index_[idx_].entry_name)); | |
368 | } | ||
369 | |||
370 | 42265 | pos_in_buf += nbytes; | |
371 | 42265 | pos_in_object_ += nbytes; | |
372 |
2/2✓ Branch 0 taken 6538 times.
✓ Branch 1 taken 35727 times.
|
42265 | if (nbytes == remaining_in_object) { |
373 | 6538 | idx_++; | |
374 | 6538 | pos_in_object_ = 0; | |
375 | } | ||
376 | } | ||
377 | |||
378 | 31166 | pos_ += buf_size; | |
379 | |||
380 |
2/2✓ Branch 1 taken 72 times.
✓ Branch 2 taken 31094 times.
|
31166 | if (idx_ == index_.size()) |
381 |
1/2✓ Branch 0 taken 72 times.
✗ Branch 1 not taken.
|
72 | state_ = (pos_in_buf == buf_size) ? ObjectPackBuild::kStateDone |
382 | : ObjectPackBuild::kStateTrailingBytes; | ||
383 | else | ||
384 | 31094 | state_ = ObjectPackBuild::kStateContinue; | |
385 | 31166 | return state_; | |
386 | } | ||
387 | |||
388 | 74 | bool ObjectPackConsumer::ParseHeader() { | |
389 | 74 | map<char, string> header; | |
390 | const unsigned char *data = | ||
391 | 74 | reinterpret_cast<const unsigned char *>(raw_header_.data()); | |
392 |
1/2✓ Branch 2 taken 74 times.
✗ Branch 3 not taken.
|
74 | ParseKeyvalMem(data, raw_header_.size(), &header); |
393 |
2/5✓ Branch 2 taken 74 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 74 times.
|
74 | if (header.find('V') == header.end()) return false; |
394 |
3/7✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 74 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 74 times.
|
74 | if (header['V'] != "2") return false; |
395 |
2/4✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 74 times.
✗ Branch 5 not taken.
|
74 | size_ = String2Uint64(header['S']); |
396 |
2/4✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 74 times.
✗ Branch 5 not taken.
|
74 | unsigned nobjects = String2Uint64(header['N']); |
397 | |||
398 |
2/2✓ Branch 0 taken 2 times.
✓ Branch 1 taken 72 times.
|
74 | if (nobjects == 0) return true; |
399 | |||
400 | // Build the object index | ||
401 | 72 | const size_t separator_idx = raw_header_.find("--\n"); | |
402 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 72 times.
|
72 | if (separator_idx == string::npos) return false; |
403 | 72 | unsigned index_idx = separator_idx + 3; | |
404 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 72 times.
|
72 | if (index_idx >= raw_header_.size()) return false; |
405 | |||
406 | 72 | uint64_t sum_size = 0; | |
407 | do { | ||
408 | 6538 | const unsigned remaining_in_header = raw_header_.size() - index_idx; | |
409 | string line = | ||
410 |
1/2✓ Branch 2 taken 6538 times.
✗ Branch 3 not taken.
|
6538 | GetLineMem(raw_header_.data() + index_idx, remaining_in_header); |
411 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 6538 times.
|
6538 | if (line == "") break; |
412 | |||
413 |
1/2✓ Branch 1 taken 6538 times.
✗ Branch 2 not taken.
|
6538 | IndexEntry entry; |
414 |
2/4✓ Branch 1 taken 6538 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 6538 times.
|
6538 | if (!ParseItem(line, &entry, &sum_size)) { |
415 | ✗ | break; | |
416 | } | ||
417 | |||
418 |
1/2✓ Branch 1 taken 6538 times.
✗ Branch 2 not taken.
|
6538 | index_.push_back(entry); |
419 | 6538 | index_idx += line.size() + 1; | |
420 |
4/6✓ Branch 1 taken 6538 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6538 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 6466 times.
✓ Branch 8 taken 72 times.
|
13076 | } while (index_idx < raw_header_.size()); |
421 | |||
422 |
2/4✓ Branch 1 taken 72 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 72 times.
✗ Branch 4 not taken.
|
72 | return (nobjects == index_.size()) && (size_ == sum_size); |
423 | 74 | } | |
424 | |||
425 | 6538 | bool ObjectPackConsumer::ParseItem(const std::string &line, | |
426 | ObjectPackConsumer::IndexEntry *entry, | ||
427 | uint64_t *sum_size) { | ||
428 |
2/4✓ Branch 0 taken 6538 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 6538 times.
|
6538 | if (!entry || !sum_size) { |
429 | ✗ | return false; | |
430 | } | ||
431 | |||
432 |
2/2✓ Branch 1 taken 6537 times.
✓ Branch 2 taken 1 times.
|
6538 | if (line[0] == 'C') { // CAS blob |
433 | 6537 | const ObjectPack::BucketContentType entry_type = ObjectPack::kCas; | |
434 | |||
435 | // We could use SplitString but we can have many lines so we do something | ||
436 | // more efficient here | ||
437 | 6537 | const size_t separator = line.find(' ', 2); | |
438 |
3/6✓ Branch 0 taken 6537 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 6537 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 6537 times.
|
6537 | if ((separator == string::npos) || (separator == (line.size() - 1))) { |
439 | ✗ | return false; | |
440 | } | ||
441 | |||
442 |
2/4✓ Branch 1 taken 6537 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 6537 times.
✗ Branch 5 not taken.
|
6537 | uint64_t size = String2Uint64(line.substr(separator + 1)); |
443 | 6537 | *sum_size += size; | |
444 | |||
445 | // Warning do not construct a HexPtr with an rvalue! | ||
446 | // The constructor takes the address of its argument. | ||
447 |
1/2✓ Branch 1 taken 6537 times.
✗ Branch 2 not taken.
|
6537 | const std::string hash_string = line.substr(2, separator - 2); |
448 | 6537 | shash::HexPtr hex_ptr(hash_string); | |
449 | |||
450 |
1/2✓ Branch 1 taken 6537 times.
✗ Branch 2 not taken.
|
6537 | entry->id = shash::MkFromSuffixedHexPtr(hex_ptr); |
451 | 6537 | entry->size = size; | |
452 | 6537 | entry->entry_type = entry_type; | |
453 |
1/2✓ Branch 1 taken 6537 times.
✗ Branch 2 not taken.
|
6537 | entry->entry_name = ""; |
454 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
6538 | } else if (line[0] == 'N') { // Named file |
455 | 1 | const ObjectPack::BucketContentType entry_type = ObjectPack::kNamed; | |
456 | |||
457 | // First separator, before the size field | ||
458 | 1 | const size_t separator1 = line.find(' ', 2); | |
459 |
3/6✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 1 times.
|
1 | if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) { |
460 | ✗ | return false; | |
461 | } | ||
462 | |||
463 | // Second separator, before the name field | ||
464 | 1 | const size_t separator2 = line.find(' ', separator1 + 1); | |
465 |
3/6✓ Branch 0 taken 1 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 1 times.
|
2 | if ((separator1 == 0) || (separator1 == string::npos) || |
466 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1 times.
|
1 | (separator1 == (line.size() - 1))) { |
467 | ✗ | return false; | |
468 | } | ||
469 | |||
470 | uint64_t size = | ||
471 |
2/4✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
|
1 | String2Uint64(line.substr(separator1 + 1, separator2 - separator1 - 1)); |
472 | |||
473 | 1 | std::string name; | |
474 |
3/7✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 1 times.
|
1 | if (!Debase64(line.substr(separator2 + 1), &name)) { |
475 | ✗ | return false; | |
476 | } | ||
477 | |||
478 | 1 | *sum_size += size; | |
479 | |||
480 | // Warning do not construct a HexPtr with an rvalue! | ||
481 | // The constructor takes the address of its argument. | ||
482 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | const std::string hash_string = line.substr(2, separator1 - 2); |
483 | 1 | shash::HexPtr hex_ptr(hash_string); | |
484 | |||
485 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | entry->id = shash::MkFromSuffixedHexPtr(hex_ptr); |
486 | 1 | entry->size = size; | |
487 | 1 | entry->entry_type = entry_type; | |
488 |
1/2✓ Branch 1 taken 1 times.
✗ Branch 2 not taken.
|
1 | entry->entry_name = name; |
489 |
1/2✓ Branch 2 taken 1 times.
✗ Branch 3 not taken.
|
1 | } else { // Error |
490 | ✗ | return false; | |
491 | } | ||
492 | |||
493 | 6538 | return true; | |
494 | } | ||
495 |