GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/pack.h
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 11 11 100.0%
Branches: 0 0 -%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_PACK_H_
6 #define CVMFS_PACK_H_
7
8 #include <inttypes.h>
9 #include <pthread.h>
10
11 #include <cstdio>
12 #include <set>
13 #include <string>
14 #include <vector>
15
16 #include "crypto/hash.h"
17 #include "gtest/gtest_prod.h"
18 #include "util/concurrency.h"
19 #include "util/single_copy.h"
20
21 /**
22 * Multiple content-addressable objects in a single BLOB. A (serialized)
23 * ObjectPack has a header, an index containing all the objects and their
24 * offsets followed by the concatenated objects. The secure hash of the index
25 * is in the header.
26 *
27 * This allows to verify the hash of the index and the hash of all objects
28 * individually. Thus, objects can be read and written in parallel to and from
29 * the ObjectPack.
30 *
31 * Objects are used by "tentacles" to send change sets to the "octopus server"
32 * as well as by the stratum 0 to transfer object bulks to stratum 1s during
33 * replication.
34 */
35 class ObjectPack : SingleCopy {
36 FRIEND_TEST(T_Pack, Bucket);
37 FRIEND_TEST(T_Pack, ObjectPack);
38 FRIEND_TEST(T_Pack, ObjectPackTransfer);
39
40 private:
41 struct Bucket;
42
43 public:
44 typedef Bucket *BucketHandle;
45
46 /**
47 * This is used to identify the content type of different buckets. Initially,
48 * the contents of a bucket are identified as kEmpty. When committing a
49 * bucket, this is set to either kNamed - if the bucket holds the contents of
50 * a named file - or kCas - if the bucket holds the contents of a content
51 * addressable buffer.
52 */
53 enum BucketContentType {
54 kEmpty,
55 kNamed,
56 kCas
57 };
58
59 static const uint64_t kDefaultLimit = 200 * 1024 * 1024; // 200MB
60
61 /**
62 * Limit the maximum number of objects to avoid very large headers. Assuming
63 * Sha256 (71 bytes hex) + 9 bytes for the file sizes, a header with 100,000
64 * files should fit in 10M.
65 */
66 static const uint64_t kMaxObjects = 100000;
67
68 explicit ObjectPack(const uint64_t limit = kDefaultLimit);
69 ~ObjectPack();
70
71 static void AddToBucket(const void *buf, const uint64_t size,
72 const BucketHandle handle);
73
74 BucketHandle NewBucket();
75
76 bool CommitBucket(const BucketContentType type, const shash::Any &id,
77 const BucketHandle handle, const std::string &name = "");
78
79 void DiscardBucket(const BucketHandle handle);
80 void TransferBucket(const BucketHandle handle, ObjectPack *other);
81
82 unsigned char *BucketContent(size_t idx) const;
83 uint64_t BucketSize(size_t idx) const;
84 const shash::Any &BucketId(size_t idx) const;
85
86 8681 uint64_t size() const { return size_; }
87
88 // This returns the number of objects in the pack (equal to the number of
89 // committed buckets)
90 754025 size_t GetNoObjects() const { return buckets_.size(); }
91
92 private:
93 /**
94 * Wrapper around memory to which data can be added. The memory should
95 * represent a piece of content-addressable storage.
96 */
97 struct Bucket : SingleCopy {
98 static const unsigned kInitialSize = 128;
99
100 Bucket();
101 ~Bucket();
102 void Add(const void *buf, const uint64_t buf_size);
103
104 unsigned char *content;
105 uint64_t size;
106 uint64_t capacity;
107 shash::Any id;
108 BucketContentType content_type;
109 std::string name;
110 };
111
112 void InitLock();
113
114 /**
115 * Protects open_buckets_ and buckets_ collections.
116 */
117 pthread_mutex_t *lock_;
118
119 /**
120 * Maximum size of this object pack.
121 */
122 uint64_t limit_;
123 /**
124 * Accumulated size of all committed buckets.
125 */
126 uint64_t size_;
127 /**
128 * Buckets that were requested but that are not yet committed
129 */
130 std::set<BucketHandle> open_buckets_;
131 /**
132 * Buckets that are committed to the object pack.
133 */
134 std::vector<BucketHandle> buckets_;
135 };
136
137 /**
138 * Data structures required for the ObjectPack serialization. Event is a
139 * template parameter for the Observable base class of ObjectPack and hence
140 * moved into this base class.
141 */
142 namespace ObjectPackBuild {
143 struct Event {
144 347943 Event(const shash::Any &id, uint64_t size, unsigned buf_size, const void *buf,
145 ObjectPack::BucketContentType type, const std::string &name)
146 347943 : id(id)
147 347943 , size(size)
148 347943 , buf_size(buf_size)
149 347943 , buf(buf)
150 347943 , object_type(type)
151 347943 , object_name(name) { }
152
153 shash::Any id;
154 uint64_t size;
155 unsigned buf_size;
156 const void *buf;
157 ObjectPack::BucketContentType object_type;
158 std::string object_name;
159 };
160
161 enum State {
162 kStateContinue = 0,
163 kStateDone,
164 kStateCorrupt,
165 kStateBadFormat,
166 kStateHeaderTooBig,
167 kStateTrailingBytes,
168 };
169 } // namespace ObjectPackBuild
170
171 /**
172 * Serializes ObjectPacks. It can also serialize a single large file as an
173 * "object pack", which otherwise would need special treatment.
174 *
175 * The serialized format has a global, human readable header which has lines of
176 * character keys and string values (like the cvmfs manifest) followed by a "--"
177 * separator line followed by the index of objects. The index contains one line
178 * for each item in the pack. Each line contains the following space-separated
179 * tokens:
180 * 1. object type identifier ('N' for named files, 'C' for CAS blobs)
181 * 2. hash digest (hex)
182 * 3. object size (decimal)
183 * 4. object name - base64 encoding of the object name (optional - only if the
184 * object type is 'N')
185 */
186 class ObjectPackProducer {
187 public:
188 explicit ObjectPackProducer(ObjectPack *pack);
189 ObjectPackProducer(const shash::Any &id, FILE *big_file,
190 const std::string &file_name = "");
191 unsigned ProduceNext(const unsigned buf_size, unsigned char *buf);
192 void GetDigest(shash::Any *hash);
193 906 unsigned GetHeaderSize() { return header_.size(); }
194
195 private:
196 /**
197 * Unused if big_file_ is used.
198 */
199 ObjectPack *pack_;
200
201 /**
202 * Unused if object pack is used. Rewind before giving to ObjectPackProducer.
203 */
204 FILE *big_file_;
205
206 /**
207 * Keeps track of how many bytes have been produced.
208 */
209 uint64_t pos_;
210
211 /**
212 * Keeps track of the current index in pack_->buckets_
213 */
214 size_t idx_;
215
216 /**
217 * Keeps track of the current position in pack_->buckets_[idx_]
218 */
219 size_t pos_in_bucket_;
220
221 /**
222 * The header is created in the constructor.
223 */
224 std::string header_;
225 };
226
227 /**
228 * Deserializes an ObjectPack created by ObjectPackProducer. For every object
229 * it calls all listeners with a Event parameter at least once for every
230 * object. For large objects, it calls the listeners multiple times. It won't
231 * verify the incoming data, this is up to the listeners handling the data.
232 * The ObjectPackConsumer will verify the header digest, however.
233 */
234 class ObjectPackConsumer : public Observable<ObjectPackBuild::Event> {
235 public:
236 explicit ObjectPackConsumer(const shash::Any &expected_digest,
237 const unsigned expected_header_size);
238 ObjectPackBuild::State ConsumeNext(const unsigned buf_size,
239 const unsigned char *buf);
240
241 private:
242 /**
243 * For large objects, notify listeners in chunks of 128kB.
244 */
245 static const unsigned kAccuSize = 128 * 1024;
246
247 struct IndexEntry {
248 67223 IndexEntry() : id(), size(), entry_type(), entry_name() { }
249 IndexEntry(const shash::Any &id, const uint64_t size,
250 ObjectPack::BucketContentType type, const std::string &name)
251 : id(id), size(size), entry_type(type), entry_name(name) { }
252 shash::Any id;
253 uint64_t size;
254 ObjectPack::BucketContentType entry_type;
255 std::string entry_name;
256 };
257
258 bool ParseHeader();
259 bool ParseItem(const std::string &line, IndexEntry *entry,
260 uint64_t *sum_size);
261
262 ObjectPackBuild::State ConsumePayload(const unsigned buf_size,
263 const unsigned char *buf);
264
265 shash::Any expected_digest_;
266 unsigned expected_header_size_;
267
268 /**
269 * Keeps track of how many bytes have been consumed from the payload.
270 */
271 uint64_t pos_;
272
273 /**
274 * Keeps track of the current index in the array of objects (index_)
275 */
276 unsigned idx_;
277
278 /**
279 * Keeps track of how many bytes have been processed from the current object.
280 */
281 unsigned pos_in_object_;
282
283 /**
284 * Collects data for large objects so that the number of callbacks to the
285 * listeners is reduced.
286 */
287 unsigned char accumulator_[kAccuSize];
288
289 /**
290 * Keeps track of how many live bytes are stored in the accumulator_.
291 */
292 unsigned pos_in_accu_;
293
294 /**
295 * The state starts in kStateContinue and makes exactly one transition into
296 * one of the other states as more bytes are consumed.
297 */
298 ObjectPackBuild::State state_;
299
300 /**
301 * Temporary store for the incomplete header. Once completely consumed, the
302 * header is interpreted into global_header_ and object_index_.
303 */
304 std::string raw_header_;
305
306 /**
307 * Total size of all the objects in the pack (header not included).
308 */
309 uint64_t size_;
310
311 /**
312 * Hash id and size of the individual objects in order.
313 */
314 std::vector<IndexEntry> index_;
315 };
316
317 #endif // CVMFS_PACK_H_
318