| Line | Branch | Exec | Source | 
    
      | 1 |  |  | /** | 
    
      | 2 |  |  | * This file is part of the CernVM File System. | 
    
      | 3 |  |  | */ | 
    
      | 4 |  |  |  | 
    
      | 5 |  |  | #ifndef CVMFS_PACK_H_ | 
    
      | 6 |  |  | #define CVMFS_PACK_H_ | 
    
      | 7 |  |  |  | 
    
      | 8 |  |  | #include <inttypes.h> | 
    
      | 9 |  |  | #include <pthread.h> | 
    
      | 10 |  |  |  | 
    
      | 11 |  |  | #include <cstdio> | 
    
      | 12 |  |  | #include <set> | 
    
      | 13 |  |  | #include <string> | 
    
      | 14 |  |  | #include <vector> | 
    
      | 15 |  |  |  | 
    
      | 16 |  |  | #include "crypto/hash.h" | 
    
      | 17 |  |  | #include "duplex_testing.h" | 
    
      | 18 |  |  | #include "util/concurrency.h" | 
    
      | 19 |  |  | #include "util/single_copy.h" | 
    
      | 20 |  |  |  | 
    
      | 21 |  |  | /** | 
    
      | 22 |  |  | * Multiple content-addressable objects in a single BLOB.  A (serialized) | 
    
      | 23 |  |  | * ObjectPack has a header, an index containing all the objects and their | 
    
      | 24 |  |  | * offsets followed by the concatenated objects.  The secure hash of the index | 
    
      | 25 |  |  | * is in the header. | 
    
      | 26 |  |  | * | 
    
      | 27 |  |  | * This allows to verify the hash of the index and the hash of all objects | 
    
      | 28 |  |  | * individually.  Thus, objects can be read and written in parallel to and from | 
    
      | 29 |  |  | * the ObjectPack. | 
    
      | 30 |  |  | * | 
    
      | 31 |  |  | * Objects are used by "tentacles" to send change sets to the "octopus server" | 
    
      | 32 |  |  | * as well as by the stratum 0 to transfer object bulks to stratum 1s during | 
    
      | 33 |  |  | * replication. | 
    
      | 34 |  |  | */ | 
    
      | 35 |  |  | class ObjectPack : SingleCopy { | 
    
      | 36 |  |  | FRIEND_TEST(T_Pack, Bucket); | 
    
      | 37 |  |  | FRIEND_TEST(T_Pack, ObjectPack); | 
    
      | 38 |  |  | FRIEND_TEST(T_Pack, ObjectPackTransfer); | 
    
      | 39 |  |  |  | 
    
      | 40 |  |  | private: | 
    
      | 41 |  |  | struct Bucket; | 
    
      | 42 |  |  |  | 
    
      | 43 |  |  | public: | 
    
      | 44 |  |  | typedef Bucket *BucketHandle; | 
    
      | 45 |  |  |  | 
    
      | 46 |  |  | /** | 
    
      | 47 |  |  | * This is used to identify the content type of different buckets. Initially, | 
    
      | 48 |  |  | * the contents of a bucket are identified as kEmpty. When committing a | 
    
      | 49 |  |  | * bucket, this is set to either kNamed - if the bucket holds the contents of | 
    
      | 50 |  |  | * a named file - or kCas - if the bucket holds the contents of a content | 
    
      | 51 |  |  | * addressable buffer. | 
    
      | 52 |  |  | */ | 
    
      | 53 |  |  | enum BucketContentType { | 
    
      | 54 |  |  | kEmpty, | 
    
      | 55 |  |  | kNamed, | 
    
      | 56 |  |  | kCas | 
    
      | 57 |  |  | }; | 
    
      | 58 |  |  |  | 
    
      | 59 |  |  | static const uint64_t kDefaultLimit = 200 * 1024 * 1024;  // 200MB | 
    
      | 60 |  |  |  | 
    
      | 61 |  |  | /** | 
    
      | 62 |  |  | * Limit the maximum number of objects to avoid very large headers.  Assuming | 
    
      | 63 |  |  | * Sha256 (71 bytes hex) + 9 bytes for the file sizes, a header with 100,000 | 
    
      | 64 |  |  | * files should fit in 10M. | 
    
      | 65 |  |  | */ | 
    
      | 66 |  |  | static const uint64_t kMaxObjects = 100000; | 
    
      | 67 |  |  |  | 
    
      | 68 |  |  | explicit ObjectPack(const uint64_t limit = kDefaultLimit); | 
    
      | 69 |  |  | ~ObjectPack(); | 
    
      | 70 |  |  |  | 
    
      | 71 |  |  | static void AddToBucket(const void *buf, const uint64_t size, | 
    
      | 72 |  |  | const BucketHandle handle); | 
    
      | 73 |  |  |  | 
    
      | 74 |  |  | BucketHandle NewBucket(); | 
    
      | 75 |  |  |  | 
    
      | 76 |  |  | bool CommitBucket(const BucketContentType type, const shash::Any &id, | 
    
      | 77 |  |  | const BucketHandle handle, const std::string &name = ""); | 
    
      | 78 |  |  |  | 
    
      | 79 |  |  | void DiscardBucket(const BucketHandle handle); | 
    
      | 80 |  |  | void TransferBucket(const BucketHandle handle, ObjectPack *other); | 
    
      | 81 |  |  |  | 
    
      | 82 |  |  | unsigned char *BucketContent(size_t idx) const; | 
    
      | 83 |  |  | uint64_t BucketSize(size_t idx) const; | 
    
      | 84 |  |  | const shash::Any &BucketId(size_t idx) const; | 
    
      | 85 |  |  |  | 
    
      | 86 |  | 7994 | uint64_t size() const { return size_; } | 
    
      | 87 |  |  |  | 
    
      | 88 |  |  | // This returns the number of objects in the pack (equal to the number of | 
    
      | 89 |  |  | // committed buckets) | 
    
      | 90 |  | 2545522 | size_t GetNoObjects() const { return buckets_.size(); } | 
    
      | 91 |  |  |  | 
    
      | 92 |  |  | private: | 
    
      | 93 |  |  | /** | 
    
      | 94 |  |  | * Wrapper around memory to which data can be added.  The memory should | 
    
      | 95 |  |  | * represent a piece of content-addressable storage. | 
    
      | 96 |  |  | */ | 
    
      | 97 |  |  | struct Bucket : SingleCopy { | 
    
      | 98 |  |  | static const unsigned kInitialSize = 128; | 
    
      | 99 |  |  |  | 
    
      | 100 |  |  | Bucket(); | 
    
      | 101 |  |  | ~Bucket(); | 
    
      | 102 |  |  | void Add(const void *buf, const uint64_t buf_size); | 
    
      | 103 |  |  |  | 
    
      | 104 |  |  | unsigned char *content; | 
    
      | 105 |  |  | uint64_t size; | 
    
      | 106 |  |  | uint64_t capacity; | 
    
      | 107 |  |  | shash::Any id; | 
    
      | 108 |  |  | BucketContentType content_type; | 
    
      | 109 |  |  | std::string name; | 
    
      | 110 |  |  | }; | 
    
      | 111 |  |  |  | 
    
      | 112 |  |  | void InitLock(); | 
    
      | 113 |  |  |  | 
    
      | 114 |  |  | /** | 
    
      | 115 |  |  | * Protects open_buckets_ and buckets_ collections. | 
    
      | 116 |  |  | */ | 
    
      | 117 |  |  | pthread_mutex_t *lock_; | 
    
      | 118 |  |  |  | 
    
      | 119 |  |  | /** | 
    
      | 120 |  |  | * Maximum size of this object pack. | 
    
      | 121 |  |  | */ | 
    
      | 122 |  |  | uint64_t limit_; | 
    
      | 123 |  |  | /** | 
    
      | 124 |  |  | * Accumulated size of all committed buckets. | 
    
      | 125 |  |  | */ | 
    
      | 126 |  |  | uint64_t size_; | 
    
      | 127 |  |  | /** | 
    
      | 128 |  |  | * Buckets that were requested but that are not yet committed | 
    
      | 129 |  |  | */ | 
    
      | 130 |  |  | std::set<BucketHandle> open_buckets_; | 
    
      | 131 |  |  | /** | 
    
      | 132 |  |  | * Buckets that are committed to the object pack. | 
    
      | 133 |  |  | */ | 
    
      | 134 |  |  | std::vector<BucketHandle> buckets_; | 
    
      | 135 |  |  | }; | 
    
      | 136 |  |  |  | 
    
      | 137 |  |  | /** | 
    
      | 138 |  |  | * Data structures required for the ObjectPack serialization.  Event is a | 
    
      | 139 |  |  | * template parameter for the Observable base class of ObjectPack and hence | 
    
      | 140 |  |  | * moved into this base class. | 
    
      | 141 |  |  | */ | 
    
      | 142 |  |  | namespace ObjectPackBuild { | 
    
      | 143 |  |  | struct Event { | 
    
      | 144 |  | 1174903 | Event(const shash::Any &id, uint64_t size, unsigned buf_size, const void *buf, | 
    
      | 145 |  |  | ObjectPack::BucketContentType type, const std::string &name) | 
    
      | 146 |  | 1174903 | : id(id) | 
    
      | 147 |  | 1174903 | , size(size) | 
    
      | 148 |  | 1174903 | , buf_size(buf_size) | 
    
      | 149 |  | 1174903 | , buf(buf) | 
    
      | 150 |  | 1174903 | , object_type(type) | 
    
      | 151 |  | 1174903 | , object_name(name) { } | 
    
      | 152 |  |  |  | 
    
      | 153 |  |  | shash::Any id; | 
    
      | 154 |  |  | uint64_t size; | 
    
      | 155 |  |  | unsigned buf_size; | 
    
      | 156 |  |  | const void *buf; | 
    
      | 157 |  |  | ObjectPack::BucketContentType object_type; | 
    
      | 158 |  |  | std::string object_name; | 
    
      | 159 |  |  | }; | 
    
      | 160 |  |  |  | 
    
      | 161 |  |  | enum State { | 
    
      | 162 |  |  | kStateContinue = 0, | 
    
      | 163 |  |  | kStateDone, | 
    
      | 164 |  |  | kStateCorrupt, | 
    
      | 165 |  |  | kStateBadFormat, | 
    
      | 166 |  |  | kStateHeaderTooBig, | 
    
      | 167 |  |  | kStateTrailingBytes, | 
    
      | 168 |  |  | }; | 
    
      | 169 |  |  | }  // namespace ObjectPackBuild | 
    
      | 170 |  |  |  | 
    
      | 171 |  |  | /** | 
    
      | 172 |  |  | * Serializes ObjectPacks.  It can also serialize a single large file as an | 
    
      | 173 |  |  | * "object pack", which otherwise would need special treatment. | 
    
      | 174 |  |  | * | 
    
      | 175 |  |  | * The serialized format has a global, human readable header which has lines of | 
    
      | 176 |  |  | * character keys and string values (like the cvmfs manifest) followed by a "--" | 
    
      | 177 |  |  | * separator line followed by the index of objects. The index contains one line | 
    
      | 178 |  |  | * for each item in the pack. Each line contains the following space-separated | 
    
      | 179 |  |  | * tokens: | 
    
      | 180 |  |  | * 1. object type identifier ('N' for named files, 'C' for CAS blobs) | 
    
      | 181 |  |  | * 2. hash digest (hex) | 
    
      | 182 |  |  | * 3. object size (decimal) | 
    
      | 183 |  |  | * 4. object name - base64 encoding of the object name (optional - only if the | 
    
      | 184 |  |  | *                  object type is 'N') | 
    
      | 185 |  |  | */ | 
    
      | 186 |  |  | class ObjectPackProducer { | 
    
      | 187 |  |  | public: | 
    
      | 188 |  |  | explicit ObjectPackProducer(ObjectPack *pack); | 
    
      | 189 |  |  | ObjectPackProducer(const shash::Any &id, FILE *big_file, | 
    
      | 190 |  |  | const std::string &file_name = ""); | 
    
      | 191 |  |  | unsigned ProduceNext(const unsigned buf_size, unsigned char *buf); | 
    
      | 192 |  |  | void GetDigest(shash::Any *hash); | 
    
      | 193 |  | 2761 | unsigned GetHeaderSize() { return header_.size(); } | 
    
      | 194 |  |  |  | 
    
      | 195 |  |  | private: | 
    
      | 196 |  |  | /** | 
    
      | 197 |  |  | * Unused if big_file_ is used. | 
    
      | 198 |  |  | */ | 
    
      | 199 |  |  | ObjectPack *pack_; | 
    
      | 200 |  |  |  | 
    
      | 201 |  |  | /** | 
    
      | 202 |  |  | * Unused if object pack is used.  Rewind before giving to ObjectPackProducer. | 
    
      | 203 |  |  | */ | 
    
      | 204 |  |  | FILE *big_file_; | 
    
      | 205 |  |  |  | 
    
      | 206 |  |  | /** | 
    
      | 207 |  |  | * Keeps track of how many bytes have been produced. | 
    
      | 208 |  |  | */ | 
    
      | 209 |  |  | uint64_t pos_; | 
    
      | 210 |  |  |  | 
    
      | 211 |  |  | /** | 
    
      | 212 |  |  | * Keeps track of the current index in pack_->buckets_ | 
    
      | 213 |  |  | */ | 
    
      | 214 |  |  | size_t idx_; | 
    
      | 215 |  |  |  | 
    
      | 216 |  |  | /** | 
    
      | 217 |  |  | * Keeps track of the current position in pack_->buckets_[idx_] | 
    
      | 218 |  |  | */ | 
    
      | 219 |  |  | size_t pos_in_bucket_; | 
    
      | 220 |  |  |  | 
    
      | 221 |  |  | /** | 
    
      | 222 |  |  | * The header is created in the constructor. | 
    
      | 223 |  |  | */ | 
    
      | 224 |  |  | std::string header_; | 
    
      | 225 |  |  | }; | 
    
      | 226 |  |  |  | 
    
      | 227 |  |  | /** | 
    
      | 228 |  |  | * Deserializes an ObjectPack created by ObjectPackProducer.  For every object | 
    
      | 229 |  |  | * it calls all listeners with a Event parameter at least once for every | 
    
      | 230 |  |  | * object.  For large objects, it calls the listeners multiple times.  It won't | 
    
      | 231 |  |  | * verify the incoming data, this is up to the listeners handling the data. | 
    
      | 232 |  |  | * The ObjectPackConsumer will verify the header digest, however. | 
    
      | 233 |  |  | */ | 
    
      | 234 |  |  | class ObjectPackConsumer : public Observable<ObjectPackBuild::Event> { | 
    
      | 235 |  |  | public: | 
    
      | 236 |  |  | explicit ObjectPackConsumer(const shash::Any &expected_digest, | 
    
      | 237 |  |  | const unsigned expected_header_size); | 
    
      | 238 |  |  | ObjectPackBuild::State ConsumeNext(const unsigned buf_size, | 
    
      | 239 |  |  | const unsigned char *buf); | 
    
      | 240 |  |  |  | 
    
      | 241 |  |  | private: | 
    
      | 242 |  |  | /** | 
    
      | 243 |  |  | * For large objects, notify listeners in chunks of 128kB. | 
    
      | 244 |  |  | */ | 
    
      | 245 |  |  | static const unsigned kAccuSize = 128 * 1024; | 
    
      | 246 |  |  |  | 
    
      | 247 |  |  | struct IndexEntry { | 
    
      | 248 |  | 224780 | IndexEntry() : id(), size(), entry_type(), entry_name() { } | 
    
      | 249 |  |  | IndexEntry(const shash::Any &id, const uint64_t size, | 
    
      | 250 |  |  | ObjectPack::BucketContentType type, const std::string &name) | 
    
      | 251 |  |  | : id(id), size(size), entry_type(type), entry_name(name) { } | 
    
      | 252 |  |  | shash::Any id; | 
    
      | 253 |  |  | uint64_t size; | 
    
      | 254 |  |  | ObjectPack::BucketContentType entry_type; | 
    
      | 255 |  |  | std::string entry_name; | 
    
      | 256 |  |  | }; | 
    
      | 257 |  |  |  | 
    
      | 258 |  |  | bool ParseHeader(); | 
    
      | 259 |  |  | bool ParseItem(const std::string &line, IndexEntry *entry, | 
    
      | 260 |  |  | uint64_t *sum_size); | 
    
      | 261 |  |  |  | 
    
      | 262 |  |  | ObjectPackBuild::State ConsumePayload(const unsigned buf_size, | 
    
      | 263 |  |  | const unsigned char *buf); | 
    
      | 264 |  |  |  | 
    
      | 265 |  |  | shash::Any expected_digest_; | 
    
      | 266 |  |  | unsigned expected_header_size_; | 
    
      | 267 |  |  |  | 
    
      | 268 |  |  | /** | 
    
      | 269 |  |  | * Keeps track of how many bytes have been consumed from the payload. | 
    
      | 270 |  |  | */ | 
    
      | 271 |  |  | uint64_t pos_; | 
    
      | 272 |  |  |  | 
    
      | 273 |  |  | /** | 
    
      | 274 |  |  | * Keeps track of the current index in the array of objects (index_) | 
    
      | 275 |  |  | */ | 
    
      | 276 |  |  | unsigned idx_; | 
    
      | 277 |  |  |  | 
    
      | 278 |  |  | /** | 
    
      | 279 |  |  | * Keeps track of how many bytes have been processed from the current object. | 
    
      | 280 |  |  | */ | 
    
      | 281 |  |  | unsigned pos_in_object_; | 
    
      | 282 |  |  |  | 
    
      | 283 |  |  | /** | 
    
      | 284 |  |  | * Collects data for large objects so that the number of callbacks to the | 
    
      | 285 |  |  | * listeners is reduced. | 
    
      | 286 |  |  | */ | 
    
      | 287 |  |  | unsigned char accumulator_[kAccuSize]; | 
    
      | 288 |  |  |  | 
    
      | 289 |  |  | /** | 
    
      | 290 |  |  | * Keeps track of how many live bytes are stored in the accumulator_. | 
    
      | 291 |  |  | */ | 
    
      | 292 |  |  | unsigned pos_in_accu_; | 
    
      | 293 |  |  |  | 
    
      | 294 |  |  | /** | 
    
      | 295 |  |  | * The state starts in kStateContinue and makes exactly one transition into | 
    
      | 296 |  |  | * one of the other states as more bytes are consumed. | 
    
      | 297 |  |  | */ | 
    
      | 298 |  |  | ObjectPackBuild::State state_; | 
    
      | 299 |  |  |  | 
    
      | 300 |  |  | /** | 
    
      | 301 |  |  | * Temporary store for the incomplete header.  Once completely consumed, the | 
    
      | 302 |  |  | * header is interpreted into global_header_ and object_index_. | 
    
      | 303 |  |  | */ | 
    
      | 304 |  |  | std::string raw_header_; | 
    
      | 305 |  |  |  | 
    
      | 306 |  |  | /** | 
    
      | 307 |  |  | * Total size of all the objects in the pack (header not included). | 
    
      | 308 |  |  | */ | 
    
      | 309 |  |  | uint64_t size_; | 
    
      | 310 |  |  |  | 
    
      | 311 |  |  | /** | 
    
      | 312 |  |  | * Hash id and size of the individual objects in order. | 
    
      | 313 |  |  | */ | 
    
      | 314 |  |  | std::vector<IndexEntry> index_; | 
    
      | 315 |  |  | }; | 
    
      | 316 |  |  |  | 
    
      | 317 |  |  | #endif  // CVMFS_PACK_H_ | 
    
      | 318 |  |  |  |