GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/pack.h Lines: 10 10 100.0 %
Date: 2019-02-03 02:48:13 Branches: 1 6 16.7 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_PACK_H_
6
#define CVMFS_PACK_H_
7
8
#include <inttypes.h>
9
#include <pthread.h>
10
11
#include <cstdio>
12
#include <set>
13
#include <string>
14
#include <vector>
15
16
#include "gtest/gtest_prod.h"
17
#include "hash.h"
18
#include "util/single_copy.h"
19
#include "util_concurrency.h"
20
21
/**
22
 * Multiple content-addressable objects in a single BLOB.  A (serialized)
23
 * ObjectPack has a header, an index containing all the objects and their
24
 * offsets followed by the concatenated objects.  The secure hash of the index
25
 * is in the header.
26
 *
27
 * This allows to verify the hash of the index and the hash of all objects
28
 * individually.  Thus, objects can be read and written in parallel to and from
29
 * the ObjectPack.
30
 *
31
 * Objects are used by "tentacles" to send change sets to the "octopus server"
32
 * as well as by the stratum 0 to transfer object bulks to stratum 1s during
33
 * replication.
34
 */
35
class ObjectPack : SingleCopy {
36
  FRIEND_TEST(T_Pack, Bucket);
37
  FRIEND_TEST(T_Pack, ObjectPack);
38
  FRIEND_TEST(T_Pack, ObjectPackTransfer);
39
40
 private:
41
  struct Bucket;
42
43
 public:
44
  typedef Bucket *BucketHandle;
45
46
  /**
47
   * This is used to identify the content type of different buckets. Initially,
48
   * the contents of a bucket are identified as kEmpty. When committing a
49
   * bucket, this is set to either kNamed - if the bucket holds the contents of
50
   * a named file - or kCas - if the bucket holds the contents of a content
51
   * addressable buffer.
52
   */
53
  enum BucketContentType { kEmpty, kNamed, kCas };
54
55
  static const uint64_t kDefaultLimit = 200 * 1024 * 1024;  // 200MB
56
57
  /**
58
   * Limit the maximum number of objects to avoid very large headers.  Assuming
59
   * Sha256 (71 bytes hex) + 9 bytes for the file sizes, a header with 100,000
60
   * files should fit in 10M.
61
   */
62
  static const uint64_t kMaxObjects = 100000;
63
64
  explicit ObjectPack(const uint64_t limit = kDefaultLimit);
65
  ~ObjectPack();
66
67
  static void AddToBucket(const void *buf, const uint64_t size,
68
                          const BucketHandle handle);
69
70
  BucketHandle NewBucket();
71
72
  bool CommitBucket(const BucketContentType type, const shash::Any &id,
73
                    const BucketHandle handle, const std::string &name = "");
74
75
  void DiscardBucket(const BucketHandle handle);
76
  void TransferBucket(const BucketHandle handle, ObjectPack *other);
77
78
  unsigned char *BucketContent(size_t idx) const;
79
  uint64_t BucketSize(size_t idx) const;
80
  const shash::Any &BucketId(size_t idx) const;
81
82
296
  uint64_t size() const { return size_; }
83
84
  // This returns the number of objects in the pack (equal to the number of
85
  // committed buckets)
86
69120
  size_t GetNoObjects() const { return buckets_.size(); }
87
88
 private:
89
  /**
90
   * Wrapper around memory to which data can be added.  The memory should
91
   * represent a piece of content-addressable storage.
92
   */
93
  struct Bucket : SingleCopy {
94
    static const unsigned kInitialSize = 128;
95
96
    Bucket();
97
    ~Bucket();
98
    void Add(const void *buf, const uint64_t buf_size);
99
100
    unsigned char *content;
101
    uint64_t size;
102
    uint64_t capacity;
103
    shash::Any id;
104
    BucketContentType content_type;
105
    std::string name;
106
  };
107
108
  void InitLock();
109
110
  /**
111
   * Protects open_buckets_ and buckets_ collections.
112
   */
113
  pthread_mutex_t *lock_;
114
115
  /**
116
   * Maximum size of this object pack.
117
   */
118
  uint64_t limit_;
119
  /**
120
   * Accumulated size of all committed buckets.
121
   */
122
  uint64_t size_;
123
  /**
124
   * Buckets that were requested but that are not yet committed
125
   */
126
  std::set<BucketHandle> open_buckets_;
127
  /**
128
   * Buckets that are committed to the object pack.
129
   */
130
  std::vector<BucketHandle> buckets_;
131
};
132
133
/**
134
 * Data structures required for the ObjectPack serialization.  Event is a
135
 * template parameter for the Observable base class of ObjectPack and hence
136
 * moved into this base class.
137
 */
138
namespace ObjectPackBuild {
139
32200
struct Event {
140
32200
  Event(const shash::Any &id, uint64_t size, unsigned buf_size, const void *buf,
141
        ObjectPack::BucketContentType type, const std::string &name)
142
      : id(id),
143
        size(size),
144
        buf_size(buf_size),
145
        buf(buf),
146
        object_type(type),
147
32200
        object_name(name) {}
148
149
  shash::Any id;
150
  uint64_t size;
151
  unsigned buf_size;
152
  const void *buf;
153
  ObjectPack::BucketContentType object_type;
154
  std::string object_name;
155
};
156
157
enum State {
158
  kStateContinue = 0,
159
  kStateDone,
160
  kStateCorrupt,
161
  kStateBadFormat,
162
  kStateHeaderTooBig,
163
  kStateTrailingBytes,
164
};
165
}  // namespace ObjectPackBuild
166
167
/**
168
 * Serializes ObjectPacks.  It can also serialize a single large file as an
169
 * "object pack", which otherwise would need special treatment.
170
 *
171
 * The serialized format has a global, human readable header which has lines of
172
 * character keys and string values (like the cvmfs manifest) follwed by a "--"
173
 * separator line followed by the index of objects. The index contains one line
174
 * for each item in the pack. Each line contains the following space-separated
175
 * tokens:
176
 * 1. object type identifier ('N' for named files, 'C' for CAS blobs)
177
 * 2. hash digest (hex)
178
 * 3. object size (decimal)
179
 * 4. object name - base64 encoding of the object name (optional - only if the
180
 *                  object type is 'N')
181
 */
182
80
class ObjectPackProducer {
183
 public:
184
  explicit ObjectPackProducer(ObjectPack *pack);
185
  ObjectPackProducer(const shash::Any &id, FILE *big_file,
186
                     const std::string &file_name = "");
187
  unsigned ProduceNext(const unsigned buf_size, unsigned char *buf);
188
  void GetDigest(shash::Any *hash);
189
76
  unsigned GetHeaderSize() { return header_.size(); }
190
191
 private:
192
  /**
193
   * Unused if big_file_ is used.
194
   */
195
  ObjectPack *pack_;
196
197
  /**
198
   * Unused if object pack is used.  Rewind before giving to ObjectPackProducer.
199
   */
200
  FILE *big_file_;
201
202
  /**
203
   * Keeps track of how many bytes have been produced.
204
   */
205
  uint64_t pos_;
206
207
  /**
208
   * Keeps track of the current index in pack_->buckets_
209
   */
210
  size_t idx_;
211
212
  /**
213
   * Keeps track of the current position in pack_->buckets_[idx_]
214
   */
215
  size_t pos_in_bucket_;
216
217
  /**
218
   * The header is created in the constructor.
219
   */
220
  std::string header_;
221
};
222
223
/**
224
 * Deserializes an ObjectPack created by ObjectPackProducer.  For every object
225
 * it calls all listeners with a Event parameter at least once for every
226
 * object.  For large objects, it calls the listeners multiple times.  It won't
227
 * verify the incoming data, this is up to the listeners handling the data.
228
 * The ObjectPackConsumer will verify the header digest, however.
229
 */
230

74
class ObjectPackConsumer : public Observable<ObjectPackBuild::Event> {
231
 public:
232
  explicit ObjectPackConsumer(const shash::Any &expected_digest,
233
                              const unsigned expected_header_size);
234
  ObjectPackBuild::State ConsumeNext(const unsigned buf_size,
235
                                     const unsigned char *buf);
236
237
 private:
238
  /**
239
   * For large objects, notify listeners in chunks of 128kB.
240
   */
241
  static const unsigned kAccuSize = 128 * 1024;
242
243
36475
  struct IndexEntry {
244
6651
    IndexEntry() : id(), size(), entry_type(), entry_name() {}
245
    IndexEntry(const shash::Any &id, const uint64_t size,
246
               ObjectPack::BucketContentType type, const std::string &name)
247
        : id(id), size(size), entry_type(type), entry_name(name) {}
248
    shash::Any id;
249
    uint64_t size;
250
    ObjectPack::BucketContentType entry_type;
251
    std::string entry_name;
252
  };
253
254
  bool ParseHeader();
255
  bool ParseItem(const std::string &line, IndexEntry *entry,
256
                 uint64_t *sum_size);
257
258
  ObjectPackBuild::State ConsumePayload(const unsigned buf_size,
259
                                        const unsigned char *buf);
260
261
  shash::Any expected_digest_;
262
  unsigned expected_header_size_;
263
264
  /**
265
   * Keeps track of how many bytes have been consumed from the payload.
266
   */
267
  uint64_t pos_;
268
269
  /**
270
   * Keeps track of the current index in the array of objects (index_)
271
   */
272
  unsigned idx_;
273
274
  /**
275
   * Keeps track of how many bytes have been processed from the current object.
276
   */
277
  unsigned pos_in_object_;
278
279
  /**
280
   * Collects data for large objects so that the number of callbacks to the
281
   * listeners is reduced.
282
   */
283
  unsigned char accumulator_[kAccuSize];
284
285
  /**
286
   * Keeps track of how many live bytes are stored in the accumulator_.
287
   */
288
  unsigned pos_in_accu_;
289
290
  /**
291
   * The state starts in kStateContinue and makes exactly one transition into
292
   * one of the other states as more bytes are consumed.
293
   */
294
  ObjectPackBuild::State state_;
295
296
  /**
297
   * Temporary store for the incomplete header.  Once completely consumed, the
298
   * header is interpreted into global_header_ and object_index_.
299
   */
300
  std::string raw_header_;
301
302
  /**
303
   * Total size of all the objects in the pack (header not included).
304
   */
305
  uint64_t size_;
306
307
  /**
308
   * Hash id and size of the individual objects in order.
309
   */
310
  std::vector<IndexEntry> index_;
311
};
312
313
#endif  // CVMFS_PACK_H_