Directory: | cvmfs/ |
---|---|
File: | cvmfs/pack.cc |
Date: | 2025-08-31 02:39:21 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 283 | 307 | 92.2% |
Branches: | 202 | 332 | 60.8% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #include "pack.h" | ||
6 | |||
7 | #include <algorithm> | ||
8 | #include <cassert> | ||
9 | #include <cstring> | ||
10 | #include <map> | ||
11 | |||
12 | #include "util/concurrency.h" | ||
13 | #include "util/exception.h" | ||
14 | #include "util/platform.h" | ||
15 | #include "util/smalloc.h" | ||
16 | #include "util/string.h" | ||
17 | |||
18 | using namespace std; // NOLINT | ||
19 | |||
20 | namespace { // some private utility functions used by ObjectPackProducer | ||
21 | |||
22 | 3294 | void InitializeHeader(const int version, const int num_objects, | |
23 | const size_t pack_size, std::string *header) { | ||
24 |
1/2✓ Branch 0 taken 3294 times.
✗ Branch 1 not taken.
|
3294 | if (header) { |
25 |
2/4✓ Branch 2 taken 3294 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3294 times.
✗ Branch 6 not taken.
|
3294 | *header = "V" + StringifyInt(version) + "\n"; |
26 |
3/6✓ Branch 2 taken 3294 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3294 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 3294 times.
✗ Branch 9 not taken.
|
3294 | *header += "S" + StringifyInt(pack_size) + "\n"; |
27 |
3/6✓ Branch 2 taken 3294 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3294 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 3294 times.
✗ Branch 9 not taken.
|
3294 | *header += "N" + StringifyInt(num_objects) + "\n"; |
28 | 3294 | *header += "--\n"; | |
29 | } | ||
30 | 3294 | } | |
31 | |||
32 | 259832 | void AppendItemToHeader(ObjectPack::BucketContentType object_type, | |
33 | const std::string &hash_str, const size_t object_size, | ||
34 | const std::string &object_name, std::string *header) { | ||
35 | // If the item type is kName, the "item_name" parameter should not be empty | ||
36 |
4/6✓ Branch 0 taken 129 times.
✓ Branch 1 taken 259703 times.
✓ Branch 2 taken 129 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 129 times.
✗ Branch 6 not taken.
|
259832 | assert((object_type == ObjectPack::kCas) |
37 | || ((object_type == ObjectPack::kNamed) && (!object_name.empty()))); | ||
38 |
1/2✓ Branch 2 taken 259832 times.
✗ Branch 3 not taken.
|
259832 | std::string line_prefix = ""; |
39 |
1/2✓ Branch 2 taken 259832 times.
✗ Branch 3 not taken.
|
259832 | std::string line_suffix = ""; |
40 |
2/3✓ Branch 0 taken 129 times.
✓ Branch 1 taken 259703 times.
✗ Branch 2 not taken.
|
259832 | switch (object_type) { |
41 | 129 | case ObjectPack::kNamed: | |
42 |
1/2✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
|
129 | line_prefix = "N "; |
43 |
3/6✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 129 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 129 times.
✗ Branch 9 not taken.
|
129 | line_suffix = std::string(" ") + Base64Url(object_name); |
44 | 129 | break; | |
45 | 259703 | case ObjectPack::kCas: | |
46 |
1/2✓ Branch 1 taken 259703 times.
✗ Branch 2 not taken.
|
259703 | line_prefix = "C "; |
47 | 259703 | break; | |
48 | ✗ | default: | |
49 | ✗ | PANIC(kLogStderr, "Unknown object pack type to be added to header."); | |
50 | } | ||
51 |
1/2✓ Branch 0 taken 259832 times.
✗ Branch 1 not taken.
|
259832 | if (header) { |
52 |
4/8✓ Branch 1 taken 259832 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 259832 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 259832 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 259832 times.
✗ Branch 11 not taken.
|
519664 | *header += line_prefix + hash_str + " " + StringifyInt(object_size) |
53 |
3/6✓ Branch 1 taken 259832 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 259832 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 259832 times.
✗ Branch 8 not taken.
|
259832 | + line_suffix + "\n"; |
54 | } | ||
55 | 259832 | } | |
56 | |||
57 | } // namespace | ||
58 | |||
59 | 4313603 | ObjectPack::Bucket::Bucket() | |
60 | 4313603 | : content(reinterpret_cast<unsigned char *>(smalloc(kInitialSize))) | |
61 | 4313603 | , size(0) | |
62 | 4313603 | , capacity(kInitialSize) | |
63 | 4313603 | , content_type(kEmpty) | |
64 | 8627206 | , name() { } | |
65 | |||
66 | 13130 | void ObjectPack::Bucket::Add(const void *buf, const uint64_t buf_size) { | |
67 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 13130 times.
|
13130 | if (buf_size == 0) |
68 | ✗ | return; | |
69 | |||
70 |
2/2✓ Branch 0 taken 161841 times.
✓ Branch 1 taken 13130 times.
|
174971 | while (size + buf_size > capacity) { |
71 | 161841 | capacity *= 2; | |
72 | 161841 | content = reinterpret_cast<unsigned char *>(srealloc(content, capacity)); | |
73 | } | ||
74 | 13130 | memcpy(content + size, buf, buf_size); | |
75 | 13130 | size += buf_size; | |
76 | } | ||
77 | |||
78 | 4313603 | ObjectPack::Bucket::~Bucket() { free(content); } | |
79 | |||
80 | //------------------------------------------------------------------------------ | ||
81 | |||
82 | 1526 | ObjectPack::ObjectPack(const uint64_t limit) : limit_(limit), size_(0) { | |
83 | 1526 | InitLock(); | |
84 | 1526 | } | |
85 | |||
86 | 1522 | ObjectPack::~ObjectPack() { | |
87 | 3044 | for (std::set<BucketHandle>::const_iterator i = open_buckets_.begin(), | |
88 | 1522 | iEnd = open_buckets_.end(); | |
89 |
2/2✓ Branch 1 taken 2838 times.
✓ Branch 2 taken 1522 times.
|
4360 | i != iEnd; |
90 | 2838 | ++i) { | |
91 |
1/2✓ Branch 1 taken 2838 times.
✗ Branch 2 not taken.
|
2838 | delete *i; |
92 | } | ||
93 | |||
94 |
2/2✓ Branch 1 taken 4310593 times.
✓ Branch 2 taken 1522 times.
|
4312115 | for (unsigned i = 0; i < buckets_.size(); ++i) |
95 |
1/2✓ Branch 1 taken 4310593 times.
✗ Branch 2 not taken.
|
4310593 | delete buckets_[i]; |
96 | 1522 | pthread_mutex_destroy(lock_); | |
97 | 1522 | free(lock_); | |
98 | 1522 | } | |
99 | |||
100 | 1864 | void ObjectPack::AddToBucket(const void *buf, const uint64_t size, | |
101 | const ObjectPack::BucketHandle handle) { | ||
102 | 1864 | handle->Add(buf, size); | |
103 | 1864 | } | |
104 | |||
105 | 4313560 | ObjectPack::BucketHandle ObjectPack::NewBucket() { | |
106 |
2/4✓ Branch 1 taken 4313560 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 4313560 times.
✗ Branch 5 not taken.
|
4313560 | BucketHandle handle = new Bucket(); |
107 | |||
108 | 4313560 | const MutexLockGuard mutex_guard(lock_); | |
109 |
1/2✓ Branch 1 taken 4313560 times.
✗ Branch 2 not taken.
|
4313560 | open_buckets_.insert(handle); |
110 | 4313560 | return handle; | |
111 | 4313560 | } | |
112 | |||
113 | /** | ||
114 | * Can only fail due to insufficient remaining space in the ObjectPack. | ||
115 | */ | ||
116 | 4313538 | bool ObjectPack::CommitBucket(const BucketContentType type, | |
117 | const shash::Any &id, | ||
118 | const ObjectPack::BucketHandle handle, | ||
119 | const std::string &name) { | ||
120 | 4313538 | handle->id = id; | |
121 | |||
122 | 4313538 | handle->content_type = type; | |
123 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 4313538 times.
|
4313538 | if (type == kNamed) { |
124 | ✗ | handle->name = name; | |
125 | } | ||
126 | |||
127 | 4313538 | const MutexLockGuard mutex_guard(lock_); | |
128 |
2/2✓ Branch 1 taken 43 times.
✓ Branch 2 taken 4313495 times.
|
4313538 | if (buckets_.size() >= kMaxObjects) |
129 | 43 | return false; | |
130 |
2/2✓ Branch 0 taken 2902 times.
✓ Branch 1 taken 4310593 times.
|
4313495 | if (size_ + handle->size > limit_) |
131 | 2902 | return false; | |
132 |
1/2✓ Branch 1 taken 4310593 times.
✗ Branch 2 not taken.
|
4310593 | open_buckets_.erase(handle); |
133 |
1/2✓ Branch 1 taken 4310593 times.
✗ Branch 2 not taken.
|
4310593 | buckets_.push_back(handle); |
134 | 4310593 | size_ += handle->size; | |
135 | 4310593 | return true; | |
136 | 4313538 | } | |
137 | |||
138 | 129 | void ObjectPack::DiscardBucket(const BucketHandle handle) { | |
139 | 129 | const MutexLockGuard mutex_guard(lock_); | |
140 |
1/2✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
|
129 | open_buckets_.erase(handle); |
141 |
1/2✓ Branch 0 taken 129 times.
✗ Branch 1 not taken.
|
129 | delete handle; |
142 | 129 | } | |
143 | |||
144 | 1526 | void ObjectPack::InitLock() { | |
145 | 1526 | lock_ = reinterpret_cast<pthread_mutex_t *>(smalloc(sizeof(pthread_mutex_t))); | |
146 | 1526 | const int retval = pthread_mutex_init(lock_, NULL); | |
147 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1526 times.
|
1526 | assert(retval == 0); |
148 | 1526 | } | |
149 | |||
150 | /** | ||
151 | * If a commit failed, an open Bucket can be transferred to another ObjectPack | ||
152 | * with more space. | ||
153 | */ | ||
154 | 131 | void ObjectPack::TransferBucket(const ObjectPack::BucketHandle handle, | |
155 | ObjectPack *other) { | ||
156 | 131 | const MutexLockGuard mutex_guard(lock_); | |
157 |
1/2✓ Branch 1 taken 131 times.
✗ Branch 2 not taken.
|
131 | open_buckets_.erase(handle); |
158 |
1/2✓ Branch 1 taken 131 times.
✗ Branch 2 not taken.
|
131 | other->open_buckets_.insert(handle); |
159 | 131 | } | |
160 | |||
161 | 1591709 | unsigned char *ObjectPack::BucketContent(size_t idx) const { | |
162 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1591709 times.
|
1591709 | assert(idx < buckets_.size()); |
163 | 1591709 | return buckets_[idx]->content; | |
164 | } | ||
165 | |||
166 | 1851412 | uint64_t ObjectPack::BucketSize(size_t idx) const { | |
167 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 1851412 times.
|
1851412 | assert(idx < buckets_.size()); |
168 | 1851412 | return buckets_[idx]->size; | |
169 | } | ||
170 | |||
171 | 259703 | const shash::Any &ObjectPack::BucketId(size_t idx) const { | |
172 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 259703 times.
|
259703 | assert(idx < buckets_.size()); |
173 | 259703 | return buckets_[idx]->id; | |
174 | } | ||
175 | |||
176 | //------------------------------------------------------------------------------ | ||
177 | |||
178 | /** | ||
179 | * Hash over the header. The hash algorithm needs to be provided by hash. | ||
180 | */ | ||
181 | 3071 | void ObjectPackProducer::GetDigest(shash::Any *hash) { | |
182 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3071 times.
|
3071 | assert(hash); |
183 | 3071 | shash::HashString(header_, hash); | |
184 | 3071 | } | |
185 | |||
186 | 3165 | ObjectPackProducer::ObjectPackProducer(ObjectPack *pack) | |
187 | 3165 | : pack_(pack), big_file_(NULL), pos_(0), idx_(0), pos_in_bucket_(0) { | |
188 | 3165 | const unsigned N = pack->GetNoObjects(); | |
189 | // rough guess, most likely a little too much | ||
190 |
1/2✓ Branch 1 taken 3165 times.
✗ Branch 2 not taken.
|
3165 | header_.reserve(30 + N * (2 * shash::kMaxDigestSize + 5)); |
191 | |||
192 |
1/2✓ Branch 2 taken 3165 times.
✗ Branch 3 not taken.
|
3165 | InitializeHeader(2, N, pack->size(), &header_); |
193 | |||
194 |
2/2✓ Branch 0 taken 259703 times.
✓ Branch 1 taken 3165 times.
|
262868 | for (unsigned i = 0; i < N; ++i) { |
195 |
3/6✓ Branch 2 taken 259703 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 259703 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 259703 times.
✗ Branch 11 not taken.
|
259703 | AppendItemToHeader(ObjectPack::kCas, pack->BucketId(i).ToString(true), |
196 | pack->BucketSize(i), "", &header_); | ||
197 | } | ||
198 | 3165 | } | |
199 | |||
200 | 129 | ObjectPackProducer::ObjectPackProducer(const shash::Any &id, FILE *big_file, | |
201 | 129 | const std::string &file_name) | |
202 | 129 | : pack_(NULL), big_file_(big_file), pos_(0), idx_(0), pos_in_bucket_(0) { | |
203 | 129 | const int fd = fileno(big_file_); | |
204 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 129 times.
|
129 | assert(fd >= 0); |
205 | platform_stat64 info; | ||
206 | 129 | const int retval = platform_fstat(fd, &info); | |
207 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 129 times.
|
129 | assert(retval == 0); |
208 | |||
209 |
1/2✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
|
129 | InitializeHeader(2, 1, info.st_size, &header_); |
210 | |||
211 |
2/4✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 129 times.
✗ Branch 5 not taken.
|
129 | AppendItemToHeader(ObjectPack::kNamed, id.ToString(true), info.st_size, |
212 | file_name, &header_); | ||
213 | |||
214 |
1/2✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
|
129 | rewind(big_file); |
215 | 129 | } | |
216 | |||
217 | /** | ||
218 | * Copies as many bytes as possible into buf. If the returned number of bytes | ||
219 | * is shorter than buf_size, everything has been produced. | ||
220 | */ | ||
221 | 1347026 | unsigned ObjectPackProducer::ProduceNext(const unsigned buf_size, | |
222 | unsigned char *buf) { | ||
223 | 1347026 | const unsigned remaining_in_header = (pos_ < header_.size()) | |
224 | 14087 | ? (header_.size() - pos_) | |
225 |
2/2✓ Branch 0 taken 14087 times.
✓ Branch 1 taken 1332939 times.
|
1361113 | : 0; |
226 | 1347026 | const unsigned nbytes_header = std::min(remaining_in_header, buf_size); | |
227 |
2/2✓ Branch 0 taken 14087 times.
✓ Branch 1 taken 1332939 times.
|
1347026 | if (nbytes_header) { |
228 | 14087 | memcpy(buf, header_.data() + pos_, nbytes_header); | |
229 | 14087 | pos_ += nbytes_header; | |
230 | } | ||
231 | |||
232 | 1347026 | unsigned remaining_in_buf = buf_size - nbytes_header; | |
233 |
2/2✓ Branch 0 taken 10922 times.
✓ Branch 1 taken 1336104 times.
|
1347026 | if (remaining_in_buf == 0) |
234 | 10922 | return nbytes_header; | |
235 | 1336104 | unsigned nbytes_payload = 0; | |
236 | |||
237 |
2/2✓ Branch 0 taken 817 times.
✓ Branch 1 taken 1335287 times.
|
1336104 | if (big_file_) { |
238 |
1/2✓ Branch 1 taken 817 times.
✗ Branch 2 not taken.
|
817 | const size_t nbytes = fread(buf + nbytes_header, 1, remaining_in_buf, |
239 | big_file_); | ||
240 | 817 | nbytes_payload = nbytes; | |
241 | 817 | pos_ += nbytes_payload; | |
242 |
2/2✓ Branch 1 taken 1335085 times.
✓ Branch 2 taken 202 times.
|
1335287 | } else if (idx_ < pack_->GetNoObjects()) { |
243 | // Copy a few buckets more | ||
244 |
6/6✓ Branch 0 taken 1594702 times.
✓ Branch 1 taken 1332092 times.
✓ Branch 3 taken 1591709 times.
✓ Branch 4 taken 2993 times.
✓ Branch 5 taken 1591709 times.
✓ Branch 6 taken 1335085 times.
|
2926794 | while ((remaining_in_buf) > 0 && (idx_ < pack_->GetNoObjects())) { |
245 | 1591709 | const unsigned remaining_in_bucket = pack_->BucketSize(idx_) | |
246 | 1591709 | - pos_in_bucket_; | |
247 | 1591709 | const unsigned nbytes = std::min(remaining_in_buf, remaining_in_bucket); | |
248 | 3183418 | memcpy(buf + nbytes_header + nbytes_payload, | |
249 | 1591709 | pack_->BucketContent(idx_) + pos_in_bucket_, nbytes); | |
250 | |||
251 | 1591709 | pos_in_bucket_ += nbytes; | |
252 | 1591709 | nbytes_payload += nbytes; | |
253 | 1591709 | remaining_in_buf -= nbytes; | |
254 |
2/2✓ Branch 0 taken 259703 times.
✓ Branch 1 taken 1332006 times.
|
1591709 | if (nbytes == remaining_in_bucket) { |
255 | 259703 | pos_in_bucket_ = 0; | |
256 | 259703 | idx_++; | |
257 | } | ||
258 | } | ||
259 | } | ||
260 | |||
261 | 1336104 | return nbytes_header + nbytes_payload; | |
262 | } | ||
263 | |||
264 | //------------------------------------------------------------------------------ | ||
265 | |||
266 | 3114 | ObjectPackConsumer::ObjectPackConsumer(const shash::Any &expected_digest, | |
267 | 3114 | const unsigned expected_header_size) | |
268 | 3114 | : expected_digest_(expected_digest) | |
269 | 3114 | , expected_header_size_(expected_header_size) | |
270 | 3114 | , pos_(0) | |
271 | 3114 | , idx_(0) | |
272 | 3114 | , pos_in_object_(0) | |
273 | 3114 | , pos_in_accu_(0) | |
274 | 3114 | , state_(ObjectPackBuild::kStateContinue) | |
275 | 3114 | , size_(0) { | |
276 | // Upper limit of 100B per entry | ||
277 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3114 times.
|
3114 | if (expected_header_size > (100 * ObjectPack::kMaxObjects)) { |
278 | ✗ | state_ = ObjectPackBuild::kStateHeaderTooBig; | |
279 | ✗ | return; | |
280 | } | ||
281 | |||
282 |
1/2✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
|
3114 | raw_header_.reserve(expected_header_size); |
283 | } | ||
284 | |||
285 | /** | ||
286 | * At the end of the function, pos_ will have progressed by buf_size (unless | ||
287 | * the buffer contains trailing garbage bytes. | ||
288 | */ | ||
289 | 1345445 | ObjectPackBuild::State ObjectPackConsumer::ConsumeNext( | |
290 | const unsigned buf_size, const unsigned char *buf) { | ||
291 |
2/2✓ Branch 0 taken 2881 times.
✓ Branch 1 taken 1342564 times.
|
1345445 | if (buf_size == 0) |
292 | 2881 | return state_; | |
293 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1342564 times.
|
1342564 | if (state_ == ObjectPackBuild::kStateDone) { |
294 | ✗ | state_ = ObjectPackBuild::kStateTrailingBytes; | |
295 | ✗ | return state_; | |
296 | } | ||
297 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1342564 times.
|
1342564 | if (state_ != ObjectPackBuild::kStateContinue) |
298 | ✗ | return state_; | |
299 | |||
300 |
2/2✓ Branch 0 taken 10510 times.
✓ Branch 1 taken 1332054 times.
|
1342564 | const unsigned remaining_in_header = (pos_ < expected_header_size_) |
301 | 10510 | ? (expected_header_size_ - pos_) | |
302 | : 0; | ||
303 | 1342564 | const unsigned nbytes_header = std::min(remaining_in_header, buf_size); | |
304 |
2/2✓ Branch 0 taken 10510 times.
✓ Branch 1 taken 1332054 times.
|
1342564 | if (nbytes_header) { |
305 |
2/4✓ Branch 2 taken 10510 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 10510 times.
✗ Branch 6 not taken.
|
10510 | raw_header_ += string(reinterpret_cast<const char *>(buf), nbytes_header); |
306 | 10510 | pos_ += nbytes_header; | |
307 | } | ||
308 | |||
309 |
2/2✓ Branch 0 taken 7396 times.
✓ Branch 1 taken 1335168 times.
|
1342564 | if (pos_ < expected_header_size_) |
310 | 7396 | return ObjectPackBuild::kStateContinue; | |
311 | |||
312 | // This condition can only be true once through the lifetime of the | ||
313 | // Consumer. | ||
314 |
3/4✓ Branch 0 taken 3114 times.
✓ Branch 1 taken 1332054 times.
✓ Branch 2 taken 3114 times.
✗ Branch 3 not taken.
|
1335168 | if (nbytes_header && (pos_ == expected_header_size_)) { |
315 |
1/2✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
|
3114 | shash::Any digest(expected_digest_.algorithm); |
316 |
1/2✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
|
3114 | shash::HashString(raw_header_, &digest); |
317 |
2/4✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3114 times.
|
3114 | if (digest != expected_digest_) { |
318 | ✗ | state_ = ObjectPackBuild::kStateCorrupt; | |
319 | 86 | return state_; | |
320 | } else { | ||
321 |
1/2✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
|
3114 | const bool retval = ParseHeader(); |
322 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3114 times.
|
3114 | if (!retval) { |
323 | ✗ | state_ = ObjectPackBuild::kStateBadFormat; | |
324 | ✗ | return state_; | |
325 | } | ||
326 | // We don't need the raw string anymore | ||
327 | 3114 | raw_header_.clear(); | |
328 | } | ||
329 | |||
330 | // Empty pack? | ||
331 |
6/6✓ Branch 0 taken 129 times.
✓ Branch 1 taken 2985 times.
✓ Branch 3 taken 86 times.
✓ Branch 4 taken 43 times.
✓ Branch 5 taken 86 times.
✓ Branch 6 taken 3028 times.
|
3114 | if ((buf_size == nbytes_header) && (index_.size() == 0)) { |
332 | 86 | state_ = ObjectPackBuild::kStateDone; | |
333 | 86 | return state_; | |
334 | } | ||
335 | } | ||
336 | |||
337 | 1335082 | const unsigned remaining_in_buf = buf_size - nbytes_header; | |
338 | 1335082 | const unsigned char *payload = buf + nbytes_header; | |
339 |
1/2✓ Branch 1 taken 1335082 times.
✗ Branch 2 not taken.
|
1335082 | return ConsumePayload(remaining_in_buf, payload); |
340 | } | ||
341 | |||
342 | /** | ||
343 | * Informs listeners for small complete objects. For large objects, buffers | ||
344 | * the | ||
345 | * input into reasonably sized chunks. buf can contain both a chunk of data | ||
346 | * that needs to be added to the consumer's accumulator and a bunch of | ||
347 | * complete small objects. We use the accumulator only if necessary to avoid | ||
348 | * unnecessary memory copies. | ||
349 | */ | ||
350 | 1335082 | ObjectPackBuild::State ObjectPackConsumer::ConsumePayload( | |
351 | const unsigned buf_size, const unsigned char *buf) { | ||
352 | 1335082 | uint64_t pos_in_buf = 0; | |
353 | 3116289 | while ((idx_ < index_.size()) | |
354 |
8/8✓ Branch 0 taken 3113261 times.
✓ Branch 1 taken 3028 times.
✓ Branch 2 taken 1332097 times.
✓ Branch 3 taken 1781164 times.
✓ Branch 5 taken 43 times.
✓ Branch 6 taken 1332054 times.
✓ Branch 7 taken 1781207 times.
✓ Branch 8 taken 1335082 times.
|
3116289 | && ((pos_in_buf < buf_size) || (index_[idx_].size == 0))) { |
355 | // Fill the accumulator or process next small object | ||
356 | uint64_t nbytes; // How many bytes are consumed in this iteration | ||
357 | 1781207 | const uint64_t remaining_in_buf = buf_size - pos_in_buf; | |
358 | 1781207 | const uint64_t remaining_in_object = index_[idx_].size - pos_in_object_; | |
359 | 1781207 | const bool is_small_rest = remaining_in_buf < kAccuSize; | |
360 | |||
361 | // We use the accumulator if there is already something in or if we have a | ||
362 | // small piece of data of a larger object. | ||
363 | 1781207 | nbytes = std::min(remaining_in_object, remaining_in_buf); | |
364 |
2/2✓ Branch 0 taken 1356883 times.
✓ Branch 1 taken 424324 times.
|
1781207 | if ((pos_in_accu_ > 0) |
365 |
4/4✓ Branch 0 taken 1101574 times.
✓ Branch 1 taken 255309 times.
✓ Branch 2 taken 193930 times.
✓ Branch 3 taken 907644 times.
|
1356883 | || ((remaining_in_buf < remaining_in_object) && is_small_rest)) { |
366 | 618254 | const uint64_t remaining_in_accu = kAccuSize - pos_in_accu_; | |
367 | 618254 | nbytes = std::min(remaining_in_accu, nbytes); | |
368 | 618254 | memcpy(accumulator_ + pos_in_accu_, buf + pos_in_buf, nbytes); | |
369 | 618254 | pos_in_accu_ += nbytes; | |
370 |
4/4✓ Branch 0 taken 428581 times.
✓ Branch 1 taken 189673 times.
✓ Branch 2 taken 4257 times.
✓ Branch 3 taken 424324 times.
|
618254 | if ((pos_in_accu_ == kAccuSize) || (nbytes == remaining_in_object)) { |
371 |
1/2✓ Branch 1 taken 193930 times.
✗ Branch 2 not taken.
|
193930 | NotifyListeners(ObjectPackBuild::Event( |
372 |
1/2✓ Branch 3 taken 193930 times.
✗ Branch 4 not taken.
|
193930 | index_[idx_].id, index_[idx_].size, pos_in_accu_, accumulator_, |
373 | 193930 | index_[idx_].entry_type, index_[idx_].entry_name)); | |
374 | 193930 | pos_in_accu_ = 0; | |
375 | } | ||
376 | 618254 | } else { // directly trigger listeners using buf | |
377 |
1/2✓ Branch 1 taken 1162953 times.
✗ Branch 2 not taken.
|
1162953 | NotifyListeners(ObjectPackBuild::Event( |
378 |
1/2✓ Branch 3 taken 1162953 times.
✗ Branch 4 not taken.
|
1162953 | index_[idx_].id, index_[idx_].size, nbytes, buf + pos_in_buf, |
379 | 1162953 | index_[idx_].entry_type, index_[idx_].entry_name)); | |
380 | } | ||
381 | |||
382 | 1781207 | pos_in_buf += nbytes; | |
383 | 1781207 | pos_in_object_ += nbytes; | |
384 |
2/2✓ Branch 0 taken 259609 times.
✓ Branch 1 taken 1521598 times.
|
1781207 | if (nbytes == remaining_in_object) { |
385 | 259609 | idx_++; | |
386 | 259609 | pos_in_object_ = 0; | |
387 | } | ||
388 | } | ||
389 | |||
390 | 1335082 | pos_ += buf_size; | |
391 | |||
392 |
2/2✓ Branch 1 taken 3028 times.
✓ Branch 2 taken 1332054 times.
|
1335082 | if (idx_ == index_.size()) |
393 |
1/2✓ Branch 0 taken 3028 times.
✗ Branch 1 not taken.
|
3028 | state_ = (pos_in_buf == buf_size) ? ObjectPackBuild::kStateDone |
394 | : ObjectPackBuild::kStateTrailingBytes; | ||
395 | else | ||
396 | 1332054 | state_ = ObjectPackBuild::kStateContinue; | |
397 | 1335082 | return state_; | |
398 | } | ||
399 | |||
400 | 3114 | bool ObjectPackConsumer::ParseHeader() { | |
401 | 3114 | map<char, string> header; | |
402 | const unsigned char *data = reinterpret_cast<const unsigned char *>( | ||
403 | 3114 | raw_header_.data()); | |
404 |
1/2✓ Branch 2 taken 3114 times.
✗ Branch 3 not taken.
|
3114 | ParseKeyvalMem(data, raw_header_.size(), &header); |
405 |
2/5✓ Branch 2 taken 3114 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 3114 times.
|
3114 | if (header.find('V') == header.end()) |
406 | ✗ | return false; | |
407 |
3/7✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3114 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 3114 times.
|
3114 | if (header['V'] != "2") |
408 | ✗ | return false; | |
409 |
2/4✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3114 times.
✗ Branch 5 not taken.
|
3114 | size_ = String2Uint64(header['S']); |
410 |
2/4✓ Branch 1 taken 3114 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3114 times.
✗ Branch 5 not taken.
|
3114 | const unsigned nobjects = String2Uint64(header['N']); |
411 | |||
412 |
2/2✓ Branch 0 taken 86 times.
✓ Branch 1 taken 3028 times.
|
3114 | if (nobjects == 0) |
413 | 86 | return true; | |
414 | |||
415 | // Build the object index | ||
416 | 3028 | const size_t separator_idx = raw_header_.find("--\n"); | |
417 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 3028 times.
|
3028 | if (separator_idx == string::npos) |
418 | ✗ | return false; | |
419 | 3028 | unsigned index_idx = separator_idx + 3; | |
420 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 3028 times.
|
3028 | if (index_idx >= raw_header_.size()) |
421 | ✗ | return false; | |
422 | |||
423 | 3028 | uint64_t sum_size = 0; | |
424 | do { | ||
425 | 259609 | const unsigned remaining_in_header = raw_header_.size() - index_idx; | |
426 | 519218 | const string line = GetLineMem(raw_header_.data() + index_idx, | |
427 |
1/2✓ Branch 2 taken 259609 times.
✗ Branch 3 not taken.
|
259609 | remaining_in_header); |
428 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 259609 times.
|
259609 | if (line == "") |
429 | ✗ | break; | |
430 | |||
431 |
1/2✓ Branch 1 taken 259609 times.
✗ Branch 2 not taken.
|
259609 | IndexEntry entry; |
432 |
2/4✓ Branch 1 taken 259609 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 259609 times.
|
259609 | if (!ParseItem(line, &entry, &sum_size)) { |
433 | ✗ | break; | |
434 | } | ||
435 | |||
436 |
1/2✓ Branch 1 taken 259609 times.
✗ Branch 2 not taken.
|
259609 | index_.push_back(entry); |
437 | 259609 | index_idx += line.size() + 1; | |
438 |
4/6✓ Branch 1 taken 259609 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 259609 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 256581 times.
✓ Branch 8 taken 3028 times.
|
519218 | } while (index_idx < raw_header_.size()); |
439 | |||
440 |
2/4✓ Branch 1 taken 3028 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3028 times.
✗ Branch 4 not taken.
|
3028 | return (nobjects == index_.size()) && (size_ == sum_size); |
441 | 3114 | } | |
442 | |||
443 | 259609 | bool ObjectPackConsumer::ParseItem(const std::string &line, | |
444 | ObjectPackConsumer::IndexEntry *entry, | ||
445 | uint64_t *sum_size) { | ||
446 |
2/4✓ Branch 0 taken 259609 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 259609 times.
|
259609 | if (!entry || !sum_size) { |
447 | ✗ | return false; | |
448 | } | ||
449 | |||
450 |
2/2✓ Branch 1 taken 259566 times.
✓ Branch 2 taken 43 times.
|
259609 | if (line[0] == 'C') { // CAS blob |
451 | 259566 | const ObjectPack::BucketContentType entry_type = ObjectPack::kCas; | |
452 | |||
453 | // We could use SplitString but we can have many lines so we do something | ||
454 | // more efficient here | ||
455 | 259566 | const size_t separator = line.find(' ', 2); | |
456 |
3/6✓ Branch 0 taken 259566 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 259566 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 259566 times.
|
259566 | if ((separator == string::npos) || (separator == (line.size() - 1))) { |
457 | ✗ | return false; | |
458 | } | ||
459 | |||
460 |
2/4✓ Branch 1 taken 259566 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 259566 times.
✗ Branch 5 not taken.
|
259566 | const uint64_t size = String2Uint64(line.substr(separator + 1)); |
461 | 259566 | *sum_size += size; | |
462 | |||
463 | // Warning do not construct a HexPtr with an rvalue! | ||
464 | // The constructor takes the address of its argument. | ||
465 |
1/2✓ Branch 1 taken 259566 times.
✗ Branch 2 not taken.
|
259566 | const std::string hash_string = line.substr(2, separator - 2); |
466 | 259566 | const shash::HexPtr hex_ptr(hash_string); | |
467 | |||
468 |
1/2✓ Branch 1 taken 259566 times.
✗ Branch 2 not taken.
|
259566 | entry->id = shash::MkFromSuffixedHexPtr(hex_ptr); |
469 | 259566 | entry->size = size; | |
470 | 259566 | entry->entry_type = entry_type; | |
471 |
1/2✓ Branch 1 taken 259566 times.
✗ Branch 2 not taken.
|
259566 | entry->entry_name = ""; |
472 |
1/2✓ Branch 2 taken 43 times.
✗ Branch 3 not taken.
|
259609 | } else if (line[0] == 'N') { // Named file |
473 | 43 | const ObjectPack::BucketContentType entry_type = ObjectPack::kNamed; | |
474 | |||
475 | // First separator, before the size field | ||
476 | 43 | const size_t separator1 = line.find(' ', 2); | |
477 |
3/6✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 43 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 43 times.
|
43 | if ((separator1 == string::npos) || (separator1 == (line.size() - 1))) { |
478 | ✗ | return false; | |
479 | } | ||
480 | |||
481 | // Second separator, before the name field | ||
482 | 43 | const size_t separator2 = line.find(' ', separator1 + 1); | |
483 |
1/2✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
|
43 | if ((separator1 == 0) || (separator1 == string::npos) |
484 |
3/6✓ Branch 0 taken 43 times.
✗ Branch 1 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 43 times.
✗ Branch 5 not taken.
✓ Branch 6 taken 43 times.
|
86 | || (separator1 == (line.size() - 1))) { |
485 | ✗ | return false; | |
486 | } | ||
487 | |||
488 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | const uint64_t size = String2Uint64( |
489 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
86 | line.substr(separator1 + 1, separator2 - separator1 - 1)); |
490 | |||
491 | 43 | std::string name; | |
492 |
3/7✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 43 times.
✗ Branch 5 not taken.
✗ Branch 7 not taken.
✓ Branch 8 taken 43 times.
|
43 | if (!Debase64(line.substr(separator2 + 1), &name)) { |
493 | ✗ | return false; | |
494 | } | ||
495 | |||
496 | 43 | *sum_size += size; | |
497 | |||
498 | // Warning do not construct a HexPtr with an rvalue! | ||
499 | // The constructor takes the address of its argument. | ||
500 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | const std::string hash_string = line.substr(2, separator1 - 2); |
501 | 43 | const shash::HexPtr hex_ptr(hash_string); | |
502 | |||
503 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | entry->id = shash::MkFromSuffixedHexPtr(hex_ptr); |
504 | 43 | entry->size = size; | |
505 | 43 | entry->entry_type = entry_type; | |
506 |
1/2✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
|
43 | entry->entry_name = name; |
507 |
1/2✓ Branch 2 taken 43 times.
✗ Branch 3 not taken.
|
43 | } else { // Error |
508 | ✗ | return false; | |
509 | } | ||
510 | |||
511 | 259609 | return true; | |
512 | } | ||
513 |