Directory: | cvmfs/ |
---|---|
File: | cvmfs/ingestion/item.h |
Date: | 2025-02-09 02:34:19 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 59 | 59 | 100.0% |
Branches: | 31 | 53 | 58.5% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_INGESTION_ITEM_H_ | ||
6 | #define CVMFS_INGESTION_ITEM_H_ | ||
7 | |||
8 | #include <pthread.h> | ||
9 | #include <stdint.h> | ||
10 | |||
11 | #include <cassert> | ||
12 | #include <string> | ||
13 | #include <vector> | ||
14 | |||
15 | #include "compression/compression.h" | ||
16 | #include "crypto/hash.h" | ||
17 | #include "file_chunk.h" | ||
18 | #include "ingestion/chunk_detector.h" | ||
19 | #include "ingestion/ingestion_source.h" | ||
20 | #include "util/atomic.h" | ||
21 | #include "util/pointer.h" | ||
22 | #include "util/single_copy.h" | ||
23 | |||
24 | namespace upload { | ||
25 | struct UploadStreamHandle; | ||
26 | } | ||
27 | |||
28 | class ItemAllocator; | ||
29 | |||
30 | /** | ||
31 | * Carries the information necessary to compress and checksum a file. During | ||
32 | * processing, the bulk chunk and the chunks_ vector are filled. | ||
33 | */ | ||
34 | class FileItem : SingleCopy { | ||
35 | public: | ||
36 | explicit FileItem( | ||
37 | IngestionSource* source, | ||
38 | uint64_t min_chunk_size = 4 * 1024 * 1024, | ||
39 | uint64_t avg_chunk_size = 8 * 1024 * 1024, | ||
40 | uint64_t max_chunk_size = 16 * 1024 * 1024, | ||
41 | zlib::Algorithms compression_algorithm = zlib::kZlibDefault, | ||
42 | shash::Algorithms hash_algorithm = shash::kSha1, | ||
43 | shash::Suffix hash_suffix = shash::kSuffixNone, | ||
44 | bool may_have_chunks = true, | ||
45 | bool has_legacy_bulk_chunk = false); | ||
46 | ~FileItem(); | ||
47 | |||
48 | 3162 | static FileItem *CreateQuitBeacon() { | |
49 |
1/2✓ Branch 2 taken 3162 times.
✗ Branch 3 not taken.
|
3162 | std::string quit_marker = std::string(1, kQuitBeaconMarker); |
50 |
3/6✓ Branch 1 taken 3162 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3162 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 3162 times.
✗ Branch 8 not taken.
|
3162 | UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker)); |
51 |
2/4✓ Branch 2 taken 3162 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3162 times.
✗ Branch 6 not taken.
|
6324 | return new FileItem(source.Release()); |
52 | 3162 | } | |
53 | 501148 | bool IsQuitBeacon() { | |
54 |
10/21✓ Branch 1 taken 501431 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3402 times.
✓ Branch 5 taken 498120 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 3162 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 3161 times.
✗ Branch 11 not taken.
✓ Branch 12 taken 3138 times.
✓ Branch 13 taken 23 times.
✓ Branch 14 taken 3159 times.
✓ Branch 15 taken 498122 times.
✓ Branch 17 taken 501381 times.
✗ Branch 18 not taken.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
|
501148 | return (path().length() == 1) && (path()[0] == kQuitBeaconMarker); |
55 | } | ||
56 | |||
57 | 1248808 | std::string path() { return source_->GetPath(); } | |
58 | 504144 | uint64_t size() { return size_; } | |
59 | 809050 | Xor32Detector *chunk_detector() { return &chunk_detector_; } | |
60 | 1249712 | shash::Any bulk_hash() { return bulk_hash_; } | |
61 | 496990 | zlib::Algorithms compression_algorithm() { return compression_algorithm_; } | |
62 | 504240 | shash::Algorithms hash_algorithm() { return hash_algorithm_; } | |
63 | 748649 | shash::Suffix hash_suffix() { return hash_suffix_; } | |
64 | 498367 | bool may_have_chunks() { return may_have_chunks_; } | |
65 | 250075 | bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; } | |
66 | |||
67 | 249966 | void set_size(uint64_t val) { size_ = val; } | |
68 | 249857 | void set_may_have_chunks(bool val) { may_have_chunks_ = val; } | |
69 | 249049 | void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); } | |
70 | 250528 | bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; } | |
71 | 250059 | uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); } | |
72 | |||
73 | 500101 | uint64_t GetNumChunks() { return chunks_.size(); } | |
74 | 250077 | FileChunkList *GetChunksPtr() { return &chunks_; } | |
75 | |||
76 | 249983 | bool Open() { return source_->Open(); } | |
77 | 552728 | ssize_t Read(void *buffer, size_t nbyte) { | |
78 | 552728 | return source_->Read(buffer, nbyte); | |
79 | } | ||
80 | 249717 | bool Close() { return source_->Close(); } | |
81 | 250013 | bool GetSize(uint64_t *size) { return source_->GetSize(size); } | |
82 | |||
83 | // Called by ChunkItem constructor, decremented when a chunk is registered | ||
84 | 254903 | void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); } | |
85 | void RegisterChunk(const FileChunk &file_chunk); | ||
86 | 250522 | bool IsProcessed() { | |
87 |
4/4✓ Branch 1 taken 250209 times.
✓ Branch 2 taken 313 times.
✓ Branch 4 taken 250083 times.
✓ Branch 5 taken 126 times.
|
250522 | return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0); |
88 | } | ||
89 | |||
90 | private: | ||
91 | static const uint64_t kSizeUnknown = uint64_t(-1); | ||
92 | static const char kQuitBeaconMarker = '\0'; | ||
93 | |||
94 | UniquePtr<IngestionSource> source_; | ||
95 | const zlib::Algorithms compression_algorithm_; | ||
96 | const shash::Algorithms hash_algorithm_; | ||
97 | const shash::Suffix hash_suffix_; | ||
98 | const bool has_legacy_bulk_chunk_; | ||
99 | |||
100 | uint64_t size_; | ||
101 | bool may_have_chunks_; | ||
102 | |||
103 | Xor32Detector chunk_detector_; | ||
104 | shash::Any bulk_hash_; | ||
105 | FileChunkList chunks_; | ||
106 | /** | ||
107 | * Number of chunks created but not yet uploaded and registered | ||
108 | */ | ||
109 | atomic_int64 nchunks_in_fly_; | ||
110 | /** | ||
111 | * Switches to true once all of the file has been through the chunking | ||
112 | * stage | ||
113 | */ | ||
114 | atomic_int32 is_fully_chunked_; | ||
115 | pthread_mutex_t lock_; | ||
116 | }; | ||
117 | |||
118 | |||
119 | /** | ||
120 | * A chunk stores the state of compression and hashing contexts as the blocks | ||
121 | * move through the pipeline. A chunk can be a "bulk chunk" corresponding to | ||
122 | * the processed data of an entire file, or it can be a partial chunk of a | ||
123 | * (large) input file. | ||
124 | */ | ||
125 | class ChunkItem : SingleCopy { | ||
126 | public: | ||
127 | ChunkItem(FileItem *file_item, uint64_t offset); | ||
128 | |||
129 | void MakeBulkChunk(); | ||
130 | 270387 | bool IsSolePiece() { | |
131 |
6/6✓ Branch 0 taken 20442 times.
✓ Branch 1 taken 249945 times.
✓ Branch 2 taken 19 times.
✓ Branch 3 taken 20423 times.
✓ Branch 5 taken 3 times.
✓ Branch 6 taken 16 times.
|
270387 | return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size()); |
132 | } | ||
133 | |||
134 | 20019 | bool is_bulk_chunk() { return is_bulk_chunk_; } | |
135 | 250523 | FileItem *file_item() { return file_item_; } | |
136 | 276337 | uint64_t offset() { return offset_; } | |
137 | 255525 | uint64_t size() { return size_; } | |
138 | 668653 | upload::UploadStreamHandle *upload_handle() { return upload_handle_; } | |
139 | // An active zlib compression stream requires 256kB of memory. Therefore, | ||
140 | // we create it only for the absolutely necessary duration and free the space | ||
141 | // afterwards. | ||
142 | zlib::Compressor *GetCompressor(); | ||
143 | void ReleaseCompressor(); | ||
144 | |||
145 | 630982 | shash::ContextPtr hash_ctx() { return hash_ctx_; } | |
146 | 742995 | shash::Any *hash_ptr() { return &hash_value_; } | |
147 | |||
148 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 254849 times.
|
254849 | void set_size(uint64_t val) { assert(size_ == 0); size_ = val; } |
149 | 250419 | void set_upload_handle(upload::UploadStreamHandle *val) { | |
150 |
2/4✓ Branch 0 taken 250423 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 250423 times.
✗ Branch 3 not taken.
|
250419 | assert((upload_handle_ == NULL) && (val != NULL)); |
151 | 250423 | upload_handle_ = val; | |
152 | 250423 | } | |
153 | |||
154 | private: | ||
155 | FileItem *file_item_; | ||
156 | uint64_t offset_; | ||
157 | /** | ||
158 | * The size of a chunk is not defined before the corresponding stop block | ||
159 | * has been dispatched. | ||
160 | */ | ||
161 | uint64_t size_; | ||
162 | bool is_bulk_chunk_; | ||
163 | /** | ||
164 | * Deleted by the uploader. | ||
165 | */ | ||
166 | upload::UploadStreamHandle *upload_handle_; | ||
167 | UniquePtr<zlib::Compressor> compressor_; | ||
168 | shash::ContextPtr hash_ctx_; | ||
169 | shash::Any hash_value_; | ||
170 | unsigned char hash_ctx_buffer_[shash::kMaxContextSize]; | ||
171 | }; | ||
172 | |||
173 | |||
174 | /** | ||
175 | * A block is an item of work in the pipeline. A sequence of data blocks | ||
176 | * followed by a stop block constitutes a Chunk. A sequence of Chunks in turn | ||
177 | * build constitute a file. | ||
178 | * A block that carries data must have a non-zero-length payload. | ||
179 | */ | ||
180 | class BlockItem : SingleCopy { | ||
181 | public: | ||
182 | enum BlockType { | ||
183 | kBlockHollow, | ||
184 | kBlockData, | ||
185 | kBlockStop, | ||
186 | }; | ||
187 | |||
188 | explicit BlockItem(ItemAllocator *allocator); | ||
189 | BlockItem(int64_t tag, ItemAllocator *allocator); | ||
190 | ~BlockItem(); | ||
191 | |||
192 | 2792 | static BlockItem *CreateQuitBeacon() { | |
193 |
1/2✓ Branch 2 taken 2792 times.
✗ Branch 3 not taken.
|
2792 | return new BlockItem(NULL); |
194 | } | ||
195 | 2365105 | bool IsQuitBeacon() { | |
196 | 2365105 | return type_ == kBlockHollow; | |
197 | } | ||
198 | |||
199 | void MakeStop(); | ||
200 | void MakeData(uint32_t capacity); | ||
201 | void MakeDataMove(BlockItem *other); | ||
202 | void MakeDataCopy(const unsigned char *data, uint32_t size); | ||
203 | void SetFileItem(FileItem *item); | ||
204 | void SetChunkItem(ChunkItem *item); | ||
205 | // Free data and reset to hollow block | ||
206 | void Reset(); | ||
207 | |||
208 | uint32_t Write(void *buf, uint32_t count); | ||
209 | |||
210 | bool IsEmpty() { return size_ == 0; } | ||
211 | 1294012 | bool IsFull() { return size_ == capacity_; } | |
212 | |||
213 | 526415373 | unsigned char *data() { return data_; } | |
214 | 524939733 | uint32_t capacity() { return capacity_; } | |
215 | 3965452 | uint32_t size() { return size_; } | |
216 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 651097 times.
|
651097 | void set_size(uint32_t val) { assert(val <= capacity_); size_ = val; } |
217 | |||
218 | 2511076 | BlockType type() { return type_; } | |
219 | 3459761 | int64_t tag() { return tag_; } | |
220 | 1182301 | FileItem *file_item() { return file_item_; } | |
221 | 3292885 | ChunkItem *chunk_item() { return chunk_item_; } | |
222 | 251717 | static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); } | |
223 | |||
224 | private: | ||
225 | /** | ||
226 | * Total capacity of all BlockItem() | ||
227 | */ | ||
228 | static atomic_int64 managed_bytes_; | ||
229 | |||
230 | // Forget pointer to the data | ||
231 | void Discharge(); | ||
232 | |||
233 | ItemAllocator *allocator_; | ||
234 | BlockType type_; | ||
235 | |||
236 | /** | ||
237 | * Blocks with the same tag need to be processed sequentially. That is, no | ||
238 | * two threads of the same pipeline stage must operate on blocks of the same | ||
239 | * tag. The tags roughly correspond to chunks. | ||
240 | * Tags can (and should) be set exactly once in the life time of a block. | ||
241 | */ | ||
242 | int64_t tag_; | ||
243 | |||
244 | /** | ||
245 | * Can be set exactly once. | ||
246 | */ | ||
247 | FileItem *file_item_; | ||
248 | ChunkItem *chunk_item_; | ||
249 | |||
250 | /** | ||
251 | * Managed by ItemAllocator | ||
252 | */ | ||
253 | unsigned char *data_; | ||
254 | uint32_t capacity_; | ||
255 | uint32_t size_; | ||
256 | }; | ||
257 | |||
258 | #endif // CVMFS_INGESTION_ITEM_H_ | ||
259 |