GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/pack.h
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 11 11 100.0%
Branches: 0 0 -%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_PACK_H_
6 #define CVMFS_PACK_H_
7
8 #include <inttypes.h>
9 #include <pthread.h>
10
11 #include <cstdio>
12 #include <set>
13 #include <string>
14 #include <vector>
15
16 #include "crypto/hash.h"
17 #include "gtest/gtest_prod.h"
18 #include "util/concurrency.h"
19 #include "util/single_copy.h"
20
21 /**
22 * Multiple content-addressable objects in a single BLOB. A (serialized)
23 * ObjectPack has a header, an index containing all the objects and their
24 * offsets followed by the concatenated objects. The secure hash of the index
25 * is in the header.
26 *
27 * This allows to verify the hash of the index and the hash of all objects
28 * individually. Thus, objects can be read and written in parallel to and from
29 * the ObjectPack.
30 *
31 * Objects are used by "tentacles" to send change sets to the "octopus server"
32 * as well as by the stratum 0 to transfer object bulks to stratum 1s during
33 * replication.
34 */
35 class ObjectPack : SingleCopy {
36 FRIEND_TEST(T_Pack, Bucket);
37 FRIEND_TEST(T_Pack, ObjectPack);
38 FRIEND_TEST(T_Pack, ObjectPackTransfer);
39
40 private:
41 struct Bucket;
42
43 public:
44 typedef Bucket *BucketHandle;
45
46 /**
47 * This is used to identify the content type of different buckets. Initially,
48 * the contents of a bucket are identified as kEmpty. When committing a
49 * bucket, this is set to either kNamed - if the bucket holds the contents of
50 * a named file - or kCas - if the bucket holds the contents of a content
51 * addressable buffer.
52 */
53 enum BucketContentType { kEmpty, kNamed, kCas };
54
55 static const uint64_t kDefaultLimit = 200 * 1024 * 1024; // 200MB
56
57 /**
58 * Limit the maximum number of objects to avoid very large headers. Assuming
59 * Sha256 (71 bytes hex) + 9 bytes for the file sizes, a header with 100,000
60 * files should fit in 10M.
61 */
62 static const uint64_t kMaxObjects = 100000;
63
64 explicit ObjectPack(const uint64_t limit = kDefaultLimit);
65 ~ObjectPack();
66
67 static void AddToBucket(const void *buf, const uint64_t size,
68 const BucketHandle handle);
69
70 BucketHandle NewBucket();
71
72 bool CommitBucket(const BucketContentType type, const shash::Any &id,
73 const BucketHandle handle, const std::string &name = "");
74
75 void DiscardBucket(const BucketHandle handle);
76 void TransferBucket(const BucketHandle handle, ObjectPack *other);
77
78 unsigned char *BucketContent(size_t idx) const;
79 uint64_t BucketSize(size_t idx) const;
80 const shash::Any &BucketId(size_t idx) const;
81
82 296 uint64_t size() const { return size_; }
83
84 // This returns the number of objects in the pack (equal to the number of
85 // committed buckets)
86 68941 size_t GetNoObjects() const { return buckets_.size(); }
87
88 private:
89 /**
90 * Wrapper around memory to which data can be added. The memory should
91 * represent a piece of content-addressable storage.
92 */
93 struct Bucket : SingleCopy {
94 static const unsigned kInitialSize = 128;
95
96 Bucket();
97 ~Bucket();
98 void Add(const void *buf, const uint64_t buf_size);
99
100 unsigned char *content;
101 uint64_t size;
102 uint64_t capacity;
103 shash::Any id;
104 BucketContentType content_type;
105 std::string name;
106 };
107
108 void InitLock();
109
110 /**
111 * Protects open_buckets_ and buckets_ collections.
112 */
113 pthread_mutex_t *lock_;
114
115 /**
116 * Maximum size of this object pack.
117 */
118 uint64_t limit_;
119 /**
120 * Accumulated size of all committed buckets.
121 */
122 uint64_t size_;
123 /**
124 * Buckets that were requested but that are not yet committed
125 */
126 std::set<BucketHandle> open_buckets_;
127 /**
128 * Buckets that are committed to the object pack.
129 */
130 std::vector<BucketHandle> buckets_;
131 };
132
133 /**
134 * Data structures required for the ObjectPack serialization. Event is a
135 * template parameter for the Observable base class of ObjectPack and hence
136 * moved into this base class.
137 */
138 namespace ObjectPackBuild {
139 struct Event {
140 32110 Event(const shash::Any &id, uint64_t size, unsigned buf_size, const void *buf,
141 ObjectPack::BucketContentType type, const std::string &name)
142 32110 : id(id),
143 32110 size(size),
144 32110 buf_size(buf_size),
145 32110 buf(buf),
146 32110 object_type(type),
147 32110 object_name(name) {}
148
149 shash::Any id;
150 uint64_t size;
151 unsigned buf_size;
152 const void *buf;
153 ObjectPack::BucketContentType object_type;
154 std::string object_name;
155 };
156
157 enum State {
158 kStateContinue = 0,
159 kStateDone,
160 kStateCorrupt,
161 kStateBadFormat,
162 kStateHeaderTooBig,
163 kStateTrailingBytes,
164 };
165 } // namespace ObjectPackBuild
166
167 /**
168 * Serializes ObjectPacks. It can also serialize a single large file as an
169 * "object pack", which otherwise would need special treatment.
170 *
171 * The serialized format has a global, human readable header which has lines of
172 * character keys and string values (like the cvmfs manifest) followed by a "--"
173 * separator line followed by the index of objects. The index contains one line
174 * for each item in the pack. Each line contains the following space-separated
175 * tokens:
176 * 1. object type identifier ('N' for named files, 'C' for CAS blobs)
177 * 2. hash digest (hex)
178 * 3. object size (decimal)
179 * 4. object name - base64 encoding of the object name (optional - only if the
180 * object type is 'N')
181 */
182 class ObjectPackProducer {
183 public:
184 explicit ObjectPackProducer(ObjectPack *pack);
185 ObjectPackProducer(const shash::Any &id, FILE *big_file,
186 const std::string &file_name = "");
187 unsigned ProduceNext(const unsigned buf_size, unsigned char *buf);
188 void GetDigest(shash::Any *hash);
189 76 unsigned GetHeaderSize() { return header_.size(); }
190
191 private:
192 /**
193 * Unused if big_file_ is used.
194 */
195 ObjectPack *pack_;
196
197 /**
198 * Unused if object pack is used. Rewind before giving to ObjectPackProducer.
199 */
200 FILE *big_file_;
201
202 /**
203 * Keeps track of how many bytes have been produced.
204 */
205 uint64_t pos_;
206
207 /**
208 * Keeps track of the current index in pack_->buckets_
209 */
210 size_t idx_;
211
212 /**
213 * Keeps track of the current position in pack_->buckets_[idx_]
214 */
215 size_t pos_in_bucket_;
216
217 /**
218 * The header is created in the constructor.
219 */
220 std::string header_;
221 };
222
223 /**
224 * Deserializes an ObjectPack created by ObjectPackProducer. For every object
225 * it calls all listeners with a Event parameter at least once for every
226 * object. For large objects, it calls the listeners multiple times. It won't
227 * verify the incoming data, this is up to the listeners handling the data.
228 * The ObjectPackConsumer will verify the header digest, however.
229 */
230 class ObjectPackConsumer : public Observable<ObjectPackBuild::Event> {
231 public:
232 explicit ObjectPackConsumer(const shash::Any &expected_digest,
233 const unsigned expected_header_size);
234 ObjectPackBuild::State ConsumeNext(const unsigned buf_size,
235 const unsigned char *buf);
236
237 private:
238 /**
239 * For large objects, notify listeners in chunks of 128kB.
240 */
241 static const unsigned kAccuSize = 128 * 1024;
242
243 struct IndexEntry {
244 6776 IndexEntry() : id(), size(), entry_type(), entry_name() {}
245 IndexEntry(const shash::Any &id, const uint64_t size,
246 ObjectPack::BucketContentType type, const std::string &name)
247 : id(id), size(size), entry_type(type), entry_name(name) {}
248 shash::Any id;
249 uint64_t size;
250 ObjectPack::BucketContentType entry_type;
251 std::string entry_name;
252 };
253
254 bool ParseHeader();
255 bool ParseItem(const std::string &line, IndexEntry *entry,
256 uint64_t *sum_size);
257
258 ObjectPackBuild::State ConsumePayload(const unsigned buf_size,
259 const unsigned char *buf);
260
261 shash::Any expected_digest_;
262 unsigned expected_header_size_;
263
264 /**
265 * Keeps track of how many bytes have been consumed from the payload.
266 */
267 uint64_t pos_;
268
269 /**
270 * Keeps track of the current index in the array of objects (index_)
271 */
272 unsigned idx_;
273
274 /**
275 * Keeps track of how many bytes have been processed from the current object.
276 */
277 unsigned pos_in_object_;
278
279 /**
280 * Collects data for large objects so that the number of callbacks to the
281 * listeners is reduced.
282 */
283 unsigned char accumulator_[kAccuSize];
284
285 /**
286 * Keeps track of how many live bytes are stored in the accumulator_.
287 */
288 unsigned pos_in_accu_;
289
290 /**
291 * The state starts in kStateContinue and makes exactly one transition into
292 * one of the other states as more bytes are consumed.
293 */
294 ObjectPackBuild::State state_;
295
296 /**
297 * Temporary store for the incomplete header. Once completely consumed, the
298 * header is interpreted into global_header_ and object_index_.
299 */
300 std::string raw_header_;
301
302 /**
303 * Total size of all the objects in the pack (header not included).
304 */
305 uint64_t size_;
306
307 /**
308 * Hash id and size of the individual objects in order.
309 */
310 std::vector<IndexEntry> index_;
311 };
312
313 #endif // CVMFS_PACK_H_
314