| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/ingestion/item.h |
| Date: | 2025-11-09 02:35:23 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 63 | 63 | 100.0% |
| Branches: | 32 | 53 | 60.4% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_INGESTION_ITEM_H_ | ||
| 6 | #define CVMFS_INGESTION_ITEM_H_ | ||
| 7 | |||
| 8 | #include <pthread.h> | ||
| 9 | #include <stdint.h> | ||
| 10 | |||
| 11 | #include <cassert> | ||
| 12 | #include <string> | ||
| 13 | #include <vector> | ||
| 14 | |||
| 15 | #include "compression/compression.h" | ||
| 16 | #include "crypto/hash.h" | ||
| 17 | #include "file_chunk.h" | ||
| 18 | #include "ingestion/chunk_detector.h" | ||
| 19 | #include "ingestion/ingestion_source.h" | ||
| 20 | #include "util/atomic.h" | ||
| 21 | #include "util/pointer.h" | ||
| 22 | #include "util/single_copy.h" | ||
| 23 | |||
| 24 | namespace upload { | ||
| 25 | struct UploadStreamHandle; | ||
| 26 | } | ||
| 27 | |||
| 28 | class ItemAllocator; | ||
| 29 | |||
| 30 | /** | ||
| 31 | * Carries the information necessary to compress and checksum a file. During | ||
| 32 | * processing, the bulk chunk and the chunks_ vector are filled. | ||
| 33 | */ | ||
| 34 | class FileItem : SingleCopy { | ||
| 35 | public: | ||
| 36 | explicit FileItem(IngestionSource *source, | ||
| 37 | uint64_t min_chunk_size = 4 * 1024 * 1024, | ||
| 38 | uint64_t avg_chunk_size = 8 * 1024 * 1024, | ||
| 39 | uint64_t max_chunk_size = 16 * 1024 * 1024, | ||
| 40 | zlib::Algorithms compression_algorithm = zlib::kZlibDefault, | ||
| 41 | shash::Algorithms hash_algorithm = shash::kSha1, | ||
| 42 | shash::Suffix hash_suffix = shash::kSuffixNone, | ||
| 43 | bool may_have_chunks = true, | ||
| 44 | bool has_legacy_bulk_chunk = false); | ||
| 45 | ~FileItem(); | ||
| 46 | |||
| 47 | 88728 | static FileItem *CreateQuitBeacon() { | |
| 48 |
1/2✓ Branch 2 taken 88728 times.
✗ Branch 3 not taken.
|
88728 | const std::string quit_marker = std::string(1, kQuitBeaconMarker); |
| 49 |
3/6✓ Branch 1 taken 88728 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 88728 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 88728 times.
✗ Branch 8 not taken.
|
88728 | UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker)); |
| 50 |
2/4✓ Branch 2 taken 88728 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 88728 times.
✗ Branch 6 not taken.
|
177456 | return new FileItem(source.Release()); |
| 51 | 88728 | } | |
| 52 | 14542549 | bool IsQuitBeacon() { | |
| 53 |
11/21✓ Branch 1 taken 14542888 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 92097 times.
✓ Branch 5 taken 14451454 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 88862 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 88832 times.
✗ Branch 11 not taken.
✓ Branch 12 taken 88252 times.
✓ Branch 13 taken 580 times.
✓ Branch 14 taken 88880 times.
✓ Branch 15 taken 14451406 times.
✓ Branch 17 taken 14539081 times.
✓ Branch 18 taken 1189 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
|
14542549 | return (path().length() == 1) && (path()[0] == kQuitBeaconMarker); |
| 54 | } | ||
| 55 | |||
| 56 | 36213385 | std::string path() { return source_->GetPath(); } | |
| 57 | 14574930 | uint64_t size() { return size_; } | |
| 58 | 23386054 | Xor32Detector *chunk_detector() { return &chunk_detector_; } | |
| 59 | 36209296 | shash::Any bulk_hash() { return bulk_hash_; } | |
| 60 | 14403089 | zlib::Algorithms compression_algorithm() { return compression_algorithm_; } | |
| 61 | 14571035 | shash::Algorithms hash_algorithm() { return hash_algorithm_; } | |
| 62 | 21715564 | shash::Suffix hash_suffix() { return hash_suffix_; } | |
| 63 | 14451054 | bool may_have_chunks() { return may_have_chunks_; } | |
| 64 | 7252601 | bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; } | |
| 65 | |||
| 66 | 7248565 | void set_size(uint64_t val) { size_ = val; } | |
| 67 | 7245607 | void set_may_have_chunks(bool val) { may_have_chunks_ = val; } | |
| 68 | 7218867 | void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); } | |
| 69 | 7265423 | bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; } | |
| 70 | 7252012 | uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); } | |
| 71 | |||
| 72 | 14504241 | uint64_t GetNumChunks() { return chunks_.size(); } | |
| 73 | 7252550 | FileChunkList *GetChunksPtr() { return &chunks_; } | |
| 74 | |||
| 75 | 7249335 | bool Open() { return source_->Open(); } | |
| 76 | 16031940 | ssize_t Read(void *buffer, size_t nbyte) { | |
| 77 | 16031940 | return source_->Read(buffer, nbyte); | |
| 78 | } | ||
| 79 | 7242375 | bool Close() { return source_->Close(); } | |
| 80 | 7248494 | bool GetSize(uint64_t *size) { return source_->GetSize(size); } | |
| 81 | |||
| 82 | // Called by ChunkItem constructor, decremented when a chunk is registered | ||
| 83 | 7347050 | void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); } | |
| 84 | void RegisterChunk(const FileChunk &file_chunk); | ||
| 85 | 7265303 | bool IsProcessed() { | |
| 86 |
4/4✓ Branch 1 taken 7256514 times.
✓ Branch 2 taken 8789 times.
✓ Branch 4 taken 7252851 times.
✓ Branch 5 taken 3663 times.
|
7265303 | return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0); |
| 87 | } | ||
| 88 | |||
| 89 | private: | ||
| 90 | static const uint64_t kSizeUnknown = uint64_t(-1); | ||
| 91 | static const char kQuitBeaconMarker = '\0'; | ||
| 92 | |||
| 93 | UniquePtr<IngestionSource> source_; | ||
| 94 | const zlib::Algorithms compression_algorithm_; | ||
| 95 | const shash::Algorithms hash_algorithm_; | ||
| 96 | const shash::Suffix hash_suffix_; | ||
| 97 | const bool has_legacy_bulk_chunk_; | ||
| 98 | |||
| 99 | uint64_t size_; | ||
| 100 | bool may_have_chunks_; | ||
| 101 | |||
| 102 | Xor32Detector chunk_detector_; | ||
| 103 | shash::Any bulk_hash_; | ||
| 104 | FileChunkList chunks_; | ||
| 105 | /** | ||
| 106 | * Number of chunks created but not yet uploaded and registered | ||
| 107 | */ | ||
| 108 | atomic_int64 nchunks_in_fly_; | ||
| 109 | /** | ||
| 110 | * Switches to true once all of the file has been through the chunking | ||
| 111 | * stage | ||
| 112 | */ | ||
| 113 | atomic_int32 is_fully_chunked_; | ||
| 114 | pthread_mutex_t lock_; | ||
| 115 | }; | ||
| 116 | |||
| 117 | |||
| 118 | /** | ||
| 119 | * A chunk stores the state of compression and hashing contexts as the blocks | ||
| 120 | * move through the pipeline. A chunk can be a "bulk chunk" corresponding to | ||
| 121 | * the processed data of an entire file, or it can be a partial chunk of a | ||
| 122 | * (large) input file. | ||
| 123 | */ | ||
| 124 | class ChunkItem : SingleCopy { | ||
| 125 | public: | ||
| 126 | ChunkItem(FileItem *file_item, uint64_t offset); | ||
| 127 | |||
| 128 | void MakeBulkChunk(); | ||
| 129 | 7662588 | bool IsSolePiece() { | |
| 130 |
6/6✓ Branch 0 taken 412521 times.
✓ Branch 1 taken 7250067 times.
✓ Branch 2 taken 479 times.
✓ Branch 3 taken 412042 times.
✓ Branch 5 taken 60 times.
✓ Branch 6 taken 419 times.
|
7662588 | return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size()); |
| 131 | } | ||
| 132 | |||
| 133 | 400380 | bool is_bulk_chunk() { return is_bulk_chunk_; } | |
| 134 | 7265323 | FileItem *file_item() { return file_item_; } | |
| 135 | 7787685 | uint64_t offset() { return offset_; } | |
| 136 | 7365363 | uint64_t size() { return size_; } | |
| 137 | 19101127 | upload::UploadStreamHandle *upload_handle() { return upload_handle_; } | |
| 138 | // An active zlib compression stream requires 256kB of memory. Therefore, | ||
| 139 | // we create it only for the absolutely necessary duration and free the space | ||
| 140 | // afterwards. | ||
| 141 | zlib::Compressor *GetCompressor(); | ||
| 142 | void ReleaseCompressor(); | ||
| 143 | |||
| 144 | 18271506 | shash::ContextPtr hash_ctx() { return hash_ctx_; } | |
| 145 | 21527027 | shash::Any *hash_ptr() { return &hash_value_; } | |
| 146 | |||
| 147 | 7344193 | void set_size(uint64_t val) { | |
| 148 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 7344193 times.
|
7344193 | assert(size_ == 0); |
| 149 | 7344193 | size_ = val; | |
| 150 | 7344193 | } | |
| 151 | 7263070 | void set_upload_handle(upload::UploadStreamHandle *val) { | |
| 152 |
2/4✓ Branch 0 taken 7263128 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 7263157 times.
✗ Branch 3 not taken.
|
7263070 | assert((upload_handle_ == NULL) && (val != NULL)); |
| 153 | 7263157 | upload_handle_ = val; | |
| 154 | 7263157 | } | |
| 155 | |||
| 156 | private: | ||
| 157 | FileItem *file_item_; | ||
| 158 | uint64_t offset_; | ||
| 159 | /** | ||
| 160 | * The size of a chunk is not defined before the corresponding stop block | ||
| 161 | * has been dispatched. | ||
| 162 | */ | ||
| 163 | uint64_t size_; | ||
| 164 | bool is_bulk_chunk_; | ||
| 165 | /** | ||
| 166 | * Deleted by the uploader. | ||
| 167 | */ | ||
| 168 | upload::UploadStreamHandle *upload_handle_; | ||
| 169 | UniquePtr<zlib::Compressor> compressor_; | ||
| 170 | shash::ContextPtr hash_ctx_; | ||
| 171 | shash::Any hash_value_; | ||
| 172 | unsigned char hash_ctx_buffer_[shash::kMaxContextSize]; | ||
| 173 | }; | ||
| 174 | |||
| 175 | |||
| 176 | /** | ||
| 177 | * A block is an item of work in the pipeline. A sequence of data blocks | ||
| 178 | * followed by a stop block constitutes a Chunk. A sequence of Chunks in turn | ||
| 179 | * build constitute a file. | ||
| 180 | * A block that carries data must have a non-zero-length payload. | ||
| 181 | */ | ||
| 182 | class BlockItem : SingleCopy { | ||
| 183 | public: | ||
| 184 | enum BlockType { | ||
| 185 | kBlockHollow, | ||
| 186 | kBlockData, | ||
| 187 | kBlockStop, | ||
| 188 | }; | ||
| 189 | |||
| 190 | explicit BlockItem(ItemAllocator *allocator); | ||
| 191 | BlockItem(int64_t tag, ItemAllocator *allocator); | ||
| 192 | ~BlockItem(); | ||
| 193 | |||
| 194 |
1/2✓ Branch 2 taken 78496 times.
✗ Branch 3 not taken.
|
78496 | static BlockItem *CreateQuitBeacon() { return new BlockItem(NULL); } |
| 195 | 68091517 | bool IsQuitBeacon() { return type_ == kBlockHollow; } | |
| 196 | |||
| 197 | void MakeStop(); | ||
| 198 | void MakeData(uint32_t capacity); | ||
| 199 | void MakeDataMove(BlockItem *other); | ||
| 200 | void MakeDataCopy(const unsigned char *data, uint32_t size); | ||
| 201 | void SetFileItem(FileItem *item); | ||
| 202 | void SetChunkItem(ChunkItem *item); | ||
| 203 | // Free data and reset to hollow block | ||
| 204 | void Reset(); | ||
| 205 | |||
| 206 | uint32_t Write(void *buf, uint32_t count); | ||
| 207 | |||
| 208 | bool IsEmpty() { return size_ == 0; } | ||
| 209 | 37499650 | bool IsFull() { return size_ == capacity_; } | |
| 210 | |||
| 211 | 5303951587 | unsigned char *data() { return data_; } | |
| 212 | 5261716937 | uint32_t capacity() { return capacity_; } | |
| 213 | 111250862 | uint32_t size() { return size_; } | |
| 214 | 18835199 | void set_size(uint32_t val) { | |
| 215 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 18835199 times.
|
18835199 | assert(val <= capacity_); |
| 216 | 18835199 | size_ = val; | |
| 217 | 18835199 | } | |
| 218 | |||
| 219 | 70307017 | BlockType type() { return type_; } | |
| 220 | 100104653 | int64_t tag() { return tag_; } | |
| 221 | 34157811 | FileItem *file_item() { return file_item_; } | |
| 222 | 94448764 | ChunkItem *chunk_item() { return chunk_item_; } | |
| 223 | 7298905 | static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); } | |
| 224 | |||
| 225 | private: | ||
| 226 | /** | ||
| 227 | * Total capacity of all BlockItem() | ||
| 228 | */ | ||
| 229 | static atomic_int64 managed_bytes_; | ||
| 230 | |||
| 231 | // Forget pointer to the data | ||
| 232 | void Discharge(); | ||
| 233 | |||
| 234 | ItemAllocator *allocator_; | ||
| 235 | BlockType type_; | ||
| 236 | |||
| 237 | /** | ||
| 238 | * Blocks with the same tag need to be processed sequentially. That is, no | ||
| 239 | * two threads of the same pipeline stage must operate on blocks of the same | ||
| 240 | * tag. The tags roughly correspond to chunks. | ||
| 241 | * Tags can (and should) be set exactly once in the life time of a block. | ||
| 242 | */ | ||
| 243 | int64_t tag_; | ||
| 244 | |||
| 245 | /** | ||
| 246 | * Can be set exactly once. | ||
| 247 | */ | ||
| 248 | FileItem *file_item_; | ||
| 249 | ChunkItem *chunk_item_; | ||
| 250 | |||
| 251 | /** | ||
| 252 | * Managed by ItemAllocator | ||
| 253 | */ | ||
| 254 | unsigned char *data_; | ||
| 255 | uint32_t capacity_; | ||
| 256 | uint32_t size_; | ||
| 257 | }; | ||
| 258 | |||
| 259 | #endif // CVMFS_INGESTION_ITEM_H_ | ||
| 260 |