GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/item.h
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 63 63 100.0%
Branches: 32 53 60.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_ITEM_H_
6 #define CVMFS_INGESTION_ITEM_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <string>
13 #include <vector>
14
15 #include "compression/compression.h"
16 #include "crypto/hash.h"
17 #include "file_chunk.h"
18 #include "ingestion/chunk_detector.h"
19 #include "ingestion/ingestion_source.h"
20 #include "util/atomic.h"
21 #include "util/pointer.h"
22 #include "util/single_copy.h"
23
24 namespace upload {
25 struct UploadStreamHandle;
26 }
27
28 class ItemAllocator;
29
30 /**
31 * Carries the information necessary to compress and checksum a file. During
32 * processing, the bulk chunk and the chunks_ vector are filled.
33 */
34 class FileItem : SingleCopy {
35 public:
36 explicit FileItem(IngestionSource *source,
37 uint64_t min_chunk_size = 4 * 1024 * 1024,
38 uint64_t avg_chunk_size = 8 * 1024 * 1024,
39 uint64_t max_chunk_size = 16 * 1024 * 1024,
40 zlib::Algorithms compression_algorithm = zlib::kZlibDefault,
41 shash::Algorithms hash_algorithm = shash::kSha1,
42 shash::Suffix hash_suffix = shash::kSuffixNone,
43 bool may_have_chunks = true,
44 bool has_legacy_bulk_chunk = false);
45 ~FileItem();
46
47 52302 static FileItem *CreateQuitBeacon() {
48
1/2
✓ Branch 2 taken 52302 times.
✗ Branch 3 not taken.
52302 const std::string quit_marker = std::string(1, kQuitBeaconMarker);
49
3/6
✓ Branch 1 taken 52302 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 52302 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 52302 times.
✗ Branch 8 not taken.
52302 UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker));
50
2/4
✓ Branch 2 taken 52302 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 52302 times.
✗ Branch 6 not taken.
104604 return new FileItem(source.Release());
51 52302 }
52 2547081 bool IsQuitBeacon() {
53
11/21
✓ Branch 1 taken 2546238 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 52180 times.
✓ Branch 5 taken 2494144 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 52134 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 52041 times.
✗ Branch 11 not taken.
✓ Branch 12 taken 51860 times.
✓ Branch 13 taken 181 times.
✓ Branch 14 taken 51864 times.
✓ Branch 15 taken 2494321 times.
✓ Branch 17 taken 2545868 times.
✓ Branch 18 taken 410 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
2547081 return (path().length() == 1) && (path()[0] == kQuitBeaconMarker);
54 }
55
56 6329848 std::string path() { return source_->GetPath(); }
57 2530482 uint64_t size() { return size_; }
58 4068054 Xor32Detector *chunk_detector() { return &chunk_detector_; }
59 6257013 shash::Any bulk_hash() { return bulk_hash_; }
60 2487852 zlib::Algorithms compression_algorithm() { return compression_algorithm_; }
61 2535626 shash::Algorithms hash_algorithm() { return hash_algorithm_; }
62 3747385 shash::Suffix hash_suffix() { return hash_suffix_; }
63 2492608 bool may_have_chunks() { return may_have_chunks_; }
64 1251982 bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; }
65
66 1251091 void set_size(uint64_t val) { size_ = val; }
67 1249026 void set_may_have_chunks(bool val) { may_have_chunks_ = val; }
68 1246595 void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); }
69 1254282 bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; }
70 1251904 uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); }
71
72 2503627 uint64_t GetNumChunks() { return chunks_.size(); }
73 1251959 FileChunkList *GetChunksPtr() { return &chunks_; }
74
75 1251296 bool Open() { return source_->Open(); }
76 2767307 ssize_t Read(void *buffer, size_t nbyte) {
77 2767307 return source_->Read(buffer, nbyte);
78 }
79 1250016 bool Close() { return source_->Close(); }
80 1251176 bool GetSize(uint64_t *size) { return source_->GetSize(size); }
81
82 // Called by ChunkItem constructor, decremented when a chunk is registered
83 1285557 void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); }
84 void RegisterChunk(const FileChunk &file_chunk);
85 1254240 bool IsProcessed() {
86
4/4
✓ Branch 1 taken 1252737 times.
✓ Branch 2 taken 1503 times.
✓ Branch 4 taken 1251983 times.
✓ Branch 5 taken 754 times.
1254240 return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0);
87 }
88
89 private:
90 static const uint64_t kSizeUnknown = uint64_t(-1);
91 static const char kQuitBeaconMarker = '\0';
92
93 UniquePtr<IngestionSource> source_;
94 const zlib::Algorithms compression_algorithm_;
95 const shash::Algorithms hash_algorithm_;
96 const shash::Suffix hash_suffix_;
97 const bool has_legacy_bulk_chunk_;
98
99 uint64_t size_;
100 bool may_have_chunks_;
101
102 Xor32Detector chunk_detector_;
103 shash::Any bulk_hash_;
104 FileChunkList chunks_;
105 /**
106 * Number of chunks created but not yet uploaded and registered
107 */
108 atomic_int64 nchunks_in_fly_;
109 /**
110 * Switches to true once all of the file has been through the chunking
111 * stage
112 */
113 atomic_int32 is_fully_chunked_;
114 pthread_mutex_t lock_;
115 };
116
117
118 /**
119 * A chunk stores the state of compression and hashing contexts as the blocks
120 * move through the pipeline. A chunk can be a "bulk chunk" corresponding to
121 * the processed data of an entire file, or it can be a partial chunk of a
122 * (large) input file.
123 */
124 class ChunkItem : SingleCopy {
125 public:
126 ChunkItem(FileItem *file_item, uint64_t offset);
127
128 void MakeBulkChunk();
129 1393964 bool IsSolePiece() {
130
6/6
✓ Branch 0 taken 142276 times.
✓ Branch 1 taken 1251688 times.
✓ Branch 2 taken 111 times.
✓ Branch 3 taken 142165 times.
✓ Branch 5 taken 21 times.
✓ Branch 6 taken 90 times.
1393964 return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size());
131 }
132
133 140133 bool is_bulk_chunk() { return is_bulk_chunk_; }
134 1254247 FileItem *file_item() { return file_item_; }
135 1433267 uint64_t offset() { return offset_; }
136 1289261 uint64_t size() { return size_; }
137 3413738 upload::UploadStreamHandle *upload_handle() { return upload_handle_; }
138 // An active zlib compression stream requires 256kB of memory. Therefore,
139 // we create it only for the absolutely necessary duration and free the space
140 // afterwards.
141 zlib::Compressor *GetCompressor();
142 void ReleaseCompressor();
143
144 3151534 shash::ContextPtr hash_ctx() { return hash_ctx_; }
145 3721248 shash::Any *hash_ptr() { return &hash_value_; }
146
147 1284923 void set_size(uint64_t val) {
148
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1284923 times.
1284923 assert(size_ == 0);
149 1284923 size_ = val;
150 1284923 }
151 1253960 void set_upload_handle(upload::UploadStreamHandle *val) {
152
2/4
✓ Branch 0 taken 1253960 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1253960 times.
✗ Branch 3 not taken.
1253960 assert((upload_handle_ == NULL) && (val != NULL));
153 1253960 upload_handle_ = val;
154 1253960 }
155
156 private:
157 FileItem *file_item_;
158 uint64_t offset_;
159 /**
160 * The size of a chunk is not defined before the corresponding stop block
161 * has been dispatched.
162 */
163 uint64_t size_;
164 bool is_bulk_chunk_;
165 /**
166 * Deleted by the uploader.
167 */
168 upload::UploadStreamHandle *upload_handle_;
169 UniquePtr<zlib::Compressor> compressor_;
170 shash::ContextPtr hash_ctx_;
171 shash::Any hash_value_;
172 unsigned char hash_ctx_buffer_[shash::kMaxContextSize];
173 };
174
175
176 /**
177 * A block is an item of work in the pipeline. A sequence of data blocks
178 * followed by a stop block constitutes a Chunk. A sequence of Chunks in turn
179 * build constitute a file.
180 * A block that carries data must have a non-zero-length payload.
181 */
182 class BlockItem : SingleCopy {
183 public:
184 enum BlockType {
185 kBlockHollow,
186 kBlockData,
187 kBlockStop,
188 };
189
190 explicit BlockItem(ItemAllocator *allocator);
191 BlockItem(int64_t tag, ItemAllocator *allocator);
192 ~BlockItem();
193
194
1/2
✓ Branch 2 taken 46360 times.
✗ Branch 3 not taken.
46360 static BlockItem *CreateQuitBeacon() { return new BlockItem(NULL); }
195 11944304 bool IsQuitBeacon() { return type_ == kBlockHollow; }
196
197 void MakeStop();
198 void MakeData(uint32_t capacity);
199 void MakeDataMove(BlockItem *other);
200 void MakeDataCopy(const unsigned char *data, uint32_t size);
201 void SetFileItem(FileItem *item);
202 void SetChunkItem(ChunkItem *item);
203 // Free data and reset to hollow block
204 void Reset();
205
206 uint32_t Write(void *buf, uint32_t count);
207
208 bool IsEmpty() { return size_ == 0; }
209 6471999 bool IsFull() { return size_ == capacity_; }
210
211 2107907555 unsigned char *data() { return data_; }
212 2100411123 uint32_t capacity() { return capacity_; }
213 20007115 uint32_t size() { return size_; }
214 3256090 void set_size(uint32_t val) {
215
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3256090 times.
3256090 assert(val <= capacity_);
216 3256090 size_ = val;
217 3256090 }
218
219 12570474 BlockType type() { return type_; }
220 17324753 int64_t tag() { return tag_; }
221 5933885 FileItem *file_item() { return file_item_; }
222 16623649 ChunkItem *chunk_item() { return chunk_item_; }
223 1260087 static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); }
224
225 private:
226 /**
227 * Total capacity of all BlockItem()
228 */
229 static atomic_int64 managed_bytes_;
230
231 // Forget pointer to the data
232 void Discharge();
233
234 ItemAllocator *allocator_;
235 BlockType type_;
236
237 /**
238 * Blocks with the same tag need to be processed sequentially. That is, no
239 * two threads of the same pipeline stage must operate on blocks of the same
240 * tag. The tags roughly correspond to chunks.
241 * Tags can (and should) be set exactly once in the life time of a block.
242 */
243 int64_t tag_;
244
245 /**
246 * Can be set exactly once.
247 */
248 FileItem *file_item_;
249 ChunkItem *chunk_item_;
250
251 /**
252 * Managed by ItemAllocator
253 */
254 unsigned char *data_;
255 uint32_t capacity_;
256 uint32_t size_;
257 };
258
259 #endif // CVMFS_INGESTION_ITEM_H_
260