Directory: | cvmfs/ |
---|---|
File: | cvmfs/ingestion/item.h |
Date: | 2025-06-29 02:35:41 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 63 | 63 | 100.0% |
Branches: | 32 | 53 | 60.4% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_INGESTION_ITEM_H_ | ||
6 | #define CVMFS_INGESTION_ITEM_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | #include <stdint.h> | ||
10 | |||
11 | #include <cassert> | ||
12 | #include <string> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "compression/compression.h" | ||
16 | #include "crypto/hash.h" | ||
17 | #include "file_chunk.h" | ||
18 | #include "ingestion/chunk_detector.h" | ||
19 | #include "ingestion/ingestion_source.h" | ||
20 | #include "util/atomic.h" | ||
21 | #include "util/pointer.h" | ||
22 | #include "util/single_copy.h" | ||
23 | |||
24 | namespace upload { | ||
25 | struct UploadStreamHandle; | ||
26 | } | ||
27 | |||
28 | class ItemAllocator; | ||
29 | |||
30 | /** | ||
31 | * Carries the information necessary to compress and checksum a file. During | ||
32 | * processing, the bulk chunk and the chunks_ vector are filled. | ||
33 | */ | ||
34 | class FileItem : SingleCopy { | ||
35 | public: | ||
36 | explicit FileItem(IngestionSource *source, | ||
37 | uint64_t min_chunk_size = 4 * 1024 * 1024, | ||
38 | uint64_t avg_chunk_size = 8 * 1024 * 1024, | ||
39 | uint64_t max_chunk_size = 16 * 1024 * 1024, | ||
40 | zlib::Algorithms compression_algorithm = zlib::kZlibDefault, | ||
41 | shash::Algorithms hash_algorithm = shash::kSha1, | ||
42 | shash::Suffix hash_suffix = shash::kSuffixNone, | ||
43 | bool may_have_chunks = true, | ||
44 | bool has_legacy_bulk_chunk = false); | ||
45 | ~FileItem(); | ||
46 | |||
47 | 91452 | static FileItem *CreateQuitBeacon() { | |
48 |
1/2✓ Branch 2 taken 91452 times.
✗ Branch 3 not taken.
|
91452 | const std::string quit_marker = std::string(1, kQuitBeaconMarker); |
49 |
3/6✓ Branch 1 taken 91452 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 91452 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 91452 times.
✗ Branch 8 not taken.
|
91452 | UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker)); |
50 |
2/4✓ Branch 2 taken 91452 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 91452 times.
✗ Branch 6 not taken.
|
182904 | return new FileItem(source.Release()); |
51 | 91452 | } | |
52 | 19527782 | bool IsQuitBeacon() { | |
53 |
11/21✓ Branch 1 taken 19513790 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 98172 times.
✓ Branch 5 taken 19415930 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 91873 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 91873 times.
✗ Branch 11 not taken.
✓ Branch 12 taken 91104 times.
✓ Branch 13 taken 769 times.
✓ Branch 14 taken 91854 times.
✓ Branch 15 taken 19415949 times.
✓ Branch 17 taken 19507229 times.
✓ Branch 18 taken 576 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
|
19527782 | return (path().length() == 1) && (path()[0] == kQuitBeaconMarker); |
54 | } | ||
55 | |||
56 | 48700379 | std::string path() { return source_->GetPath(); } | |
57 | 19696553 | uint64_t size() { return size_; } | |
58 | 31665767 | Xor32Detector *chunk_detector() { return &chunk_detector_; } | |
59 | 48743632 | shash::Any bulk_hash() { return bulk_hash_; } | |
60 | 19406960 | zlib::Algorithms compression_algorithm() { return compression_algorithm_; } | |
61 | 19712758 | shash::Algorithms hash_algorithm() { return hash_algorithm_; } | |
62 | 29194063 | shash::Suffix hash_suffix() { return hash_suffix_; } | |
63 | 19432440 | bool may_have_chunks() { return may_have_chunks_; } | |
64 | 9752290 | bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; } | |
65 | |||
66 | 9748010 | void set_size(uint64_t val) { size_ = val; } | |
67 | 9745844 | void set_may_have_chunks(bool val) { may_have_chunks_ = val; } | |
68 | 9721508 | void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); } | |
69 | 9769890 | bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; } | |
70 | 9751868 | uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); } | |
71 | |||
72 | 19503251 | uint64_t GetNumChunks() { return chunks_.size(); } | |
73 | 9751950 | FileChunkList *GetChunksPtr() { return &chunks_; } | |
74 | |||
75 | 9748872 | bool Open() { return source_->Open(); } | |
76 | 21567581 | ssize_t Read(void *buffer, size_t nbyte) { | |
77 | 21567581 | return source_->Read(buffer, nbyte); | |
78 | } | ||
79 | 9741657 | bool Close() { return source_->Close(); } | |
80 | 9747897 | bool GetSize(uint64_t *size) { return source_->GetSize(size); } | |
81 | |||
82 | // Called by ChunkItem constructor, decremented when a chunk is registered | ||
83 | 9980620 | void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); } | |
84 | void RegisterChunk(const FileChunk &file_chunk); | ||
85 | 9769614 | bool IsProcessed() { | |
86 |
4/4✓ Branch 1 taken 9755578 times.
✓ Branch 2 taken 14036 times.
✓ Branch 4 taken 9752276 times.
✓ Branch 5 taken 3302 times.
|
9769614 | return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0); |
87 | } | ||
88 | |||
89 | private: | ||
90 | static const uint64_t kSizeUnknown = uint64_t(-1); | ||
91 | static const char kQuitBeaconMarker = '\0'; | ||
92 | |||
93 | UniquePtr<IngestionSource> source_; | ||
94 | const zlib::Algorithms compression_algorithm_; | ||
95 | const shash::Algorithms hash_algorithm_; | ||
96 | const shash::Suffix hash_suffix_; | ||
97 | const bool has_legacy_bulk_chunk_; | ||
98 | |||
99 | uint64_t size_; | ||
100 | bool may_have_chunks_; | ||
101 | |||
102 | Xor32Detector chunk_detector_; | ||
103 | shash::Any bulk_hash_; | ||
104 | FileChunkList chunks_; | ||
105 | /** | ||
106 | * Number of chunks created but not yet uploaded and registered | ||
107 | */ | ||
108 | atomic_int64 nchunks_in_fly_; | ||
109 | /** | ||
110 | * Switches to true once all of the file has been through the chunking | ||
111 | * stage | ||
112 | */ | ||
113 | atomic_int32 is_fully_chunked_; | ||
114 | pthread_mutex_t lock_; | ||
115 | }; | ||
116 | |||
117 | |||
118 | /** | ||
119 | * A chunk stores the state of compression and hashing contexts as the blocks | ||
120 | * move through the pipeline. A chunk can be a "bulk chunk" corresponding to | ||
121 | * the processed data of an entire file, or it can be a partial chunk of a | ||
122 | * (large) input file. | ||
123 | */ | ||
124 | class ChunkItem : SingleCopy { | ||
125 | public: | ||
126 | ChunkItem(FileItem *file_item, uint64_t offset); | ||
127 | |||
128 | void MakeBulkChunk(); | ||
129 | 10686157 | bool IsSolePiece() { | |
130 |
6/6✓ Branch 0 taken 937469 times.
✓ Branch 1 taken 9748688 times.
✓ Branch 2 taken 797 times.
✓ Branch 3 taken 936672 times.
✓ Branch 5 taken 138 times.
✓ Branch 6 taken 659 times.
|
10686157 | return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size()); |
131 | } | ||
132 | |||
133 | 920874 | bool is_bulk_chunk() { return is_bulk_chunk_; } | |
134 | 9769660 | FileItem *file_item() { return file_item_; } | |
135 | 10950596 | uint64_t offset() { return offset_; } | |
136 | 9999752 | uint64_t size() { return size_; } | |
137 | 26314360 | upload::UploadStreamHandle *upload_handle() { return upload_handle_; } | |
138 | // An active zlib compression stream requires 256kB of memory. Therefore, | ||
139 | // we create it only for the absolutely necessary duration and free the space | ||
140 | // afterwards. | ||
141 | zlib::Compressor *GetCompressor(); | ||
142 | void ReleaseCompressor(); | ||
143 | |||
144 | 24618888 | shash::ContextPtr hash_ctx() { return hash_ctx_; } | |
145 | 29006499 | shash::Any *hash_ptr() { return &hash_value_; } | |
146 | |||
147 | 9975174 | void set_size(uint64_t val) { | |
148 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9975174 times.
|
9975174 | assert(size_ == 0); |
149 | 9975174 | size_ = val; | |
150 | 9975174 | } | |
151 | 9765753 | void set_upload_handle(upload::UploadStreamHandle *val) { | |
152 |
2/4✓ Branch 0 taken 9765831 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 9765870 times.
✗ Branch 3 not taken.
|
9765753 | assert((upload_handle_ == NULL) && (val != NULL)); |
153 | 9765870 | upload_handle_ = val; | |
154 | 9765870 | } | |
155 | |||
156 | private: | ||
157 | FileItem *file_item_; | ||
158 | uint64_t offset_; | ||
159 | /** | ||
160 | * The size of a chunk is not defined before the corresponding stop block | ||
161 | * has been dispatched. | ||
162 | */ | ||
163 | uint64_t size_; | ||
164 | bool is_bulk_chunk_; | ||
165 | /** | ||
166 | * Deleted by the uploader. | ||
167 | */ | ||
168 | upload::UploadStreamHandle *upload_handle_; | ||
169 | UniquePtr<zlib::Compressor> compressor_; | ||
170 | shash::ContextPtr hash_ctx_; | ||
171 | shash::Any hash_value_; | ||
172 | unsigned char hash_ctx_buffer_[shash::kMaxContextSize]; | ||
173 | }; | ||
174 | |||
175 | |||
176 | /** | ||
177 | * A block is an item of work in the pipeline. A sequence of data blocks | ||
178 | * followed by a stop block constitutes a Chunk. A sequence of Chunks in turn | ||
179 | * build constitute a file. | ||
180 | * A block that carries data must have a non-zero-length payload. | ||
181 | */ | ||
182 | class BlockItem : SingleCopy { | ||
183 | public: | ||
184 | enum BlockType { | ||
185 | kBlockHollow, | ||
186 | kBlockData, | ||
187 | kBlockStop, | ||
188 | }; | ||
189 | |||
190 | explicit BlockItem(ItemAllocator *allocator); | ||
191 | BlockItem(int64_t tag, ItemAllocator *allocator); | ||
192 | ~BlockItem(); | ||
193 | |||
194 |
1/2✓ Branch 2 taken 80432 times.
✗ Branch 3 not taken.
|
80432 | static BlockItem *CreateQuitBeacon() { return new BlockItem(NULL); } |
195 | 92981652 | bool IsQuitBeacon() { return type_ == kBlockHollow; } | |
196 | |||
197 | void MakeStop(); | ||
198 | void MakeData(uint32_t capacity); | ||
199 | void MakeDataMove(BlockItem *other); | ||
200 | void MakeDataCopy(const unsigned char *data, uint32_t size); | ||
201 | void SetFileItem(FileItem *item); | ||
202 | void SetChunkItem(ChunkItem *item); | ||
203 | // Free data and reset to hollow block | ||
204 | void Reset(); | ||
205 | |||
206 | uint32_t Write(void *buf, uint32_t count); | ||
207 | |||
208 | bool IsEmpty() { return size_ == 0; } | ||
209 | 50487632 | bool IsFull() { return size_ == capacity_; } | |
210 | |||
211 | 20530637369 | unsigned char *data() { return data_; } | |
212 | 20472660442 | uint32_t capacity() { return capacity_; } | |
213 | 155727639 | uint32_t size() { return size_; } | |
214 | 25393186 | void set_size(uint32_t val) { | |
215 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 25393186 times.
|
25393186 | assert(val <= capacity_); |
216 | 25393186 | size_ = val; | |
217 | 25393186 | } | |
218 | |||
219 | 98645518 | BlockType type() { return type_; } | |
220 | 135959019 | int64_t tag() { return tag_; } | |
221 | 46331221 | FileItem *file_item() { return file_item_; } | |
222 | 129776511 | ChunkItem *chunk_item() { return chunk_item_; } | |
223 | 9816031 | static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); } | |
224 | |||
225 | private: | ||
226 | /** | ||
227 | * Total capacity of all BlockItem() | ||
228 | */ | ||
229 | static atomic_int64 managed_bytes_; | ||
230 | |||
231 | // Forget pointer to the data | ||
232 | void Discharge(); | ||
233 | |||
234 | ItemAllocator *allocator_; | ||
235 | BlockType type_; | ||
236 | |||
237 | /** | ||
238 | * Blocks with the same tag need to be processed sequentially. That is, no | ||
239 | * two threads of the same pipeline stage must operate on blocks of the same | ||
240 | * tag. The tags roughly correspond to chunks. | ||
241 | * Tags can (and should) be set exactly once in the life time of a block. | ||
242 | */ | ||
243 | int64_t tag_; | ||
244 | |||
245 | /** | ||
246 | * Can be set exactly once. | ||
247 | */ | ||
248 | FileItem *file_item_; | ||
249 | ChunkItem *chunk_item_; | ||
250 | |||
251 | /** | ||
252 | * Managed by ItemAllocator | ||
253 | */ | ||
254 | unsigned char *data_; | ||
255 | uint32_t capacity_; | ||
256 | uint32_t size_; | ||
257 | }; | ||
258 | |||
259 | #endif // CVMFS_INGESTION_ITEM_H_ | ||
260 |