GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/item.h
Date: 2025-06-29 02:35:41
Exec Total Coverage
Lines: 63 63 100.0%
Branches: 32 53 60.4%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_ITEM_H_
6 #define CVMFS_INGESTION_ITEM_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <string>
13 #include <vector>
14
15 #include "compression/compression.h"
16 #include "crypto/hash.h"
17 #include "file_chunk.h"
18 #include "ingestion/chunk_detector.h"
19 #include "ingestion/ingestion_source.h"
20 #include "util/atomic.h"
21 #include "util/pointer.h"
22 #include "util/single_copy.h"
23
24 namespace upload {
25 struct UploadStreamHandle;
26 }
27
28 class ItemAllocator;
29
30 /**
31 * Carries the information necessary to compress and checksum a file. During
32 * processing, the bulk chunk and the chunks_ vector are filled.
33 */
34 class FileItem : SingleCopy {
35 public:
36 explicit FileItem(IngestionSource *source,
37 uint64_t min_chunk_size = 4 * 1024 * 1024,
38 uint64_t avg_chunk_size = 8 * 1024 * 1024,
39 uint64_t max_chunk_size = 16 * 1024 * 1024,
40 zlib::Algorithms compression_algorithm = zlib::kZlibDefault,
41 shash::Algorithms hash_algorithm = shash::kSha1,
42 shash::Suffix hash_suffix = shash::kSuffixNone,
43 bool may_have_chunks = true,
44 bool has_legacy_bulk_chunk = false);
45 ~FileItem();
46
47 91452 static FileItem *CreateQuitBeacon() {
48
1/2
✓ Branch 2 taken 91452 times.
✗ Branch 3 not taken.
91452 const std::string quit_marker = std::string(1, kQuitBeaconMarker);
49
3/6
✓ Branch 1 taken 91452 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 91452 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 91452 times.
✗ Branch 8 not taken.
91452 UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker));
50
2/4
✓ Branch 2 taken 91452 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 91452 times.
✗ Branch 6 not taken.
182904 return new FileItem(source.Release());
51 91452 }
52 19527782 bool IsQuitBeacon() {
53
11/21
✓ Branch 1 taken 19513790 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 98172 times.
✓ Branch 5 taken 19415930 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 91873 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 91873 times.
✗ Branch 11 not taken.
✓ Branch 12 taken 91104 times.
✓ Branch 13 taken 769 times.
✓ Branch 14 taken 91854 times.
✓ Branch 15 taken 19415949 times.
✓ Branch 17 taken 19507229 times.
✓ Branch 18 taken 576 times.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
19527782 return (path().length() == 1) && (path()[0] == kQuitBeaconMarker);
54 }
55
56 48700379 std::string path() { return source_->GetPath(); }
57 19696553 uint64_t size() { return size_; }
58 31665767 Xor32Detector *chunk_detector() { return &chunk_detector_; }
59 48743632 shash::Any bulk_hash() { return bulk_hash_; }
60 19406960 zlib::Algorithms compression_algorithm() { return compression_algorithm_; }
61 19712758 shash::Algorithms hash_algorithm() { return hash_algorithm_; }
62 29194063 shash::Suffix hash_suffix() { return hash_suffix_; }
63 19432440 bool may_have_chunks() { return may_have_chunks_; }
64 9752290 bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; }
65
66 9748010 void set_size(uint64_t val) { size_ = val; }
67 9745844 void set_may_have_chunks(bool val) { may_have_chunks_ = val; }
68 9721508 void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); }
69 9769890 bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; }
70 9751868 uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); }
71
72 19503251 uint64_t GetNumChunks() { return chunks_.size(); }
73 9751950 FileChunkList *GetChunksPtr() { return &chunks_; }
74
75 9748872 bool Open() { return source_->Open(); }
76 21567581 ssize_t Read(void *buffer, size_t nbyte) {
77 21567581 return source_->Read(buffer, nbyte);
78 }
79 9741657 bool Close() { return source_->Close(); }
80 9747897 bool GetSize(uint64_t *size) { return source_->GetSize(size); }
81
82 // Called by ChunkItem constructor, decremented when a chunk is registered
83 9980620 void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); }
84 void RegisterChunk(const FileChunk &file_chunk);
85 9769614 bool IsProcessed() {
86
4/4
✓ Branch 1 taken 9755578 times.
✓ Branch 2 taken 14036 times.
✓ Branch 4 taken 9752276 times.
✓ Branch 5 taken 3302 times.
9769614 return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0);
87 }
88
89 private:
90 static const uint64_t kSizeUnknown = uint64_t(-1);
91 static const char kQuitBeaconMarker = '\0';
92
93 UniquePtr<IngestionSource> source_;
94 const zlib::Algorithms compression_algorithm_;
95 const shash::Algorithms hash_algorithm_;
96 const shash::Suffix hash_suffix_;
97 const bool has_legacy_bulk_chunk_;
98
99 uint64_t size_;
100 bool may_have_chunks_;
101
102 Xor32Detector chunk_detector_;
103 shash::Any bulk_hash_;
104 FileChunkList chunks_;
105 /**
106 * Number of chunks created but not yet uploaded and registered
107 */
108 atomic_int64 nchunks_in_fly_;
109 /**
110 * Switches to true once all of the file has been through the chunking
111 * stage
112 */
113 atomic_int32 is_fully_chunked_;
114 pthread_mutex_t lock_;
115 };
116
117
118 /**
119 * A chunk stores the state of compression and hashing contexts as the blocks
120 * move through the pipeline. A chunk can be a "bulk chunk" corresponding to
121 * the processed data of an entire file, or it can be a partial chunk of a
122 * (large) input file.
123 */
124 class ChunkItem : SingleCopy {
125 public:
126 ChunkItem(FileItem *file_item, uint64_t offset);
127
128 void MakeBulkChunk();
129 10686157 bool IsSolePiece() {
130
6/6
✓ Branch 0 taken 937469 times.
✓ Branch 1 taken 9748688 times.
✓ Branch 2 taken 797 times.
✓ Branch 3 taken 936672 times.
✓ Branch 5 taken 138 times.
✓ Branch 6 taken 659 times.
10686157 return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size());
131 }
132
133 920874 bool is_bulk_chunk() { return is_bulk_chunk_; }
134 9769660 FileItem *file_item() { return file_item_; }
135 10950596 uint64_t offset() { return offset_; }
136 9999752 uint64_t size() { return size_; }
137 26314360 upload::UploadStreamHandle *upload_handle() { return upload_handle_; }
138 // An active zlib compression stream requires 256kB of memory. Therefore,
139 // we create it only for the absolutely necessary duration and free the space
140 // afterwards.
141 zlib::Compressor *GetCompressor();
142 void ReleaseCompressor();
143
144 24618888 shash::ContextPtr hash_ctx() { return hash_ctx_; }
145 29006499 shash::Any *hash_ptr() { return &hash_value_; }
146
147 9975174 void set_size(uint64_t val) {
148
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9975174 times.
9975174 assert(size_ == 0);
149 9975174 size_ = val;
150 9975174 }
151 9765753 void set_upload_handle(upload::UploadStreamHandle *val) {
152
2/4
✓ Branch 0 taken 9765831 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 9765870 times.
✗ Branch 3 not taken.
9765753 assert((upload_handle_ == NULL) && (val != NULL));
153 9765870 upload_handle_ = val;
154 9765870 }
155
156 private:
157 FileItem *file_item_;
158 uint64_t offset_;
159 /**
160 * The size of a chunk is not defined before the corresponding stop block
161 * has been dispatched.
162 */
163 uint64_t size_;
164 bool is_bulk_chunk_;
165 /**
166 * Deleted by the uploader.
167 */
168 upload::UploadStreamHandle *upload_handle_;
169 UniquePtr<zlib::Compressor> compressor_;
170 shash::ContextPtr hash_ctx_;
171 shash::Any hash_value_;
172 unsigned char hash_ctx_buffer_[shash::kMaxContextSize];
173 };
174
175
176 /**
177 * A block is an item of work in the pipeline. A sequence of data blocks
178 * followed by a stop block constitutes a Chunk. A sequence of Chunks in turn
179 * build constitute a file.
180 * A block that carries data must have a non-zero-length payload.
181 */
182 class BlockItem : SingleCopy {
183 public:
184 enum BlockType {
185 kBlockHollow,
186 kBlockData,
187 kBlockStop,
188 };
189
190 explicit BlockItem(ItemAllocator *allocator);
191 BlockItem(int64_t tag, ItemAllocator *allocator);
192 ~BlockItem();
193
194
1/2
✓ Branch 2 taken 80432 times.
✗ Branch 3 not taken.
80432 static BlockItem *CreateQuitBeacon() { return new BlockItem(NULL); }
195 92981652 bool IsQuitBeacon() { return type_ == kBlockHollow; }
196
197 void MakeStop();
198 void MakeData(uint32_t capacity);
199 void MakeDataMove(BlockItem *other);
200 void MakeDataCopy(const unsigned char *data, uint32_t size);
201 void SetFileItem(FileItem *item);
202 void SetChunkItem(ChunkItem *item);
203 // Free data and reset to hollow block
204 void Reset();
205
206 uint32_t Write(void *buf, uint32_t count);
207
208 bool IsEmpty() { return size_ == 0; }
209 50487632 bool IsFull() { return size_ == capacity_; }
210
211 20530637369 unsigned char *data() { return data_; }
212 20472660442 uint32_t capacity() { return capacity_; }
213 155727639 uint32_t size() { return size_; }
214 25393186 void set_size(uint32_t val) {
215
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 25393186 times.
25393186 assert(val <= capacity_);
216 25393186 size_ = val;
217 25393186 }
218
219 98645518 BlockType type() { return type_; }
220 135959019 int64_t tag() { return tag_; }
221 46331221 FileItem *file_item() { return file_item_; }
222 129776511 ChunkItem *chunk_item() { return chunk_item_; }
223 9816031 static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); }
224
225 private:
226 /**
227 * Total capacity of all BlockItem()
228 */
229 static atomic_int64 managed_bytes_;
230
231 // Forget pointer to the data
232 void Discharge();
233
234 ItemAllocator *allocator_;
235 BlockType type_;
236
237 /**
238 * Blocks with the same tag need to be processed sequentially. That is, no
239 * two threads of the same pipeline stage must operate on blocks of the same
240 * tag. The tags roughly correspond to chunks.
241 * Tags can (and should) be set exactly once in the life time of a block.
242 */
243 int64_t tag_;
244
245 /**
246 * Can be set exactly once.
247 */
248 FileItem *file_item_;
249 ChunkItem *chunk_item_;
250
251 /**
252 * Managed by ItemAllocator
253 */
254 unsigned char *data_;
255 uint32_t capacity_;
256 uint32_t size_;
257 };
258
259 #endif // CVMFS_INGESTION_ITEM_H_
260