GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/item.h
Date: 2024-04-21 02:33:16
Exec Total Coverage
Lines: 59 59 100.0%
Branches: 31 53 58.5%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_ITEM_H_
6 #define CVMFS_INGESTION_ITEM_H_
7
8 #include <pthread.h>
9 #include <stdint.h>
10
11 #include <cassert>
12 #include <string>
13 #include <vector>
14
15 #include "compression.h"
16 #include "crypto/hash.h"
17 #include "file_chunk.h"
18 #include "ingestion/chunk_detector.h"
19 #include "ingestion/ingestion_source.h"
20 #include "util/atomic.h"
21 #include "util/pointer.h"
22 #include "util/single_copy.h"
23
24 namespace upload {
25 struct UploadStreamHandle;
26 }
27
28 class ItemAllocator;
29
30 /**
31 * Carries the information necessary to compress and checksum a file. During
32 * processing, the bulk chunk and the chunks_ vector are filled.
33 */
34 class FileItem : SingleCopy {
35 public:
36 explicit FileItem(
37 IngestionSource* source,
38 uint64_t min_chunk_size = 4 * 1024 * 1024,
39 uint64_t avg_chunk_size = 8 * 1024 * 1024,
40 uint64_t max_chunk_size = 16 * 1024 * 1024,
41 zlib::Algorithms compression_algorithm = zlib::kZlibDefault,
42 shash::Algorithms hash_algorithm = shash::kSha1,
43 shash::Suffix hash_suffix = shash::kSuffixNone,
44 bool may_have_chunks = true,
45 bool has_legacy_bulk_chunk = false);
46 ~FileItem();
47
48 3162 static FileItem *CreateQuitBeacon() {
49
1/2
✓ Branch 2 taken 3162 times.
✗ Branch 3 not taken.
3162 std::string quit_marker = std::string(1, kQuitBeaconMarker);
50
3/6
✓ Branch 1 taken 3162 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3162 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 3162 times.
✗ Branch 8 not taken.
3162 UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker));
51
2/4
✓ Branch 2 taken 3162 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 3162 times.
✗ Branch 6 not taken.
6324 return new FileItem(source.Release());
52 3162 }
53 500367 bool IsQuitBeacon() {
54
10/21
✓ Branch 1 taken 501337 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3343 times.
✓ Branch 5 taken 498183 times.
✗ Branch 6 not taken.
✓ Branch 7 taken 3159 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 3158 times.
✗ Branch 11 not taken.
✓ Branch 12 taken 3139 times.
✓ Branch 13 taken 19 times.
✓ Branch 14 taken 3160 times.
✓ Branch 15 taken 498181 times.
✓ Branch 17 taken 501385 times.
✗ Branch 18 not taken.
✗ Branch 20 not taken.
✗ Branch 21 not taken.
✗ Branch 23 not taken.
✗ Branch 24 not taken.
500367 return (path().length() == 1) && (path()[0] == kQuitBeaconMarker);
55 }
56
57 1248825 std::string path() { return source_->GetPath(); }
58 504572 uint64_t size() { return size_; }
59 810170 Xor32Detector *chunk_detector() { return &chunk_detector_; }
60 1249730 shash::Any bulk_hash() { return bulk_hash_; }
61 497240 zlib::Algorithms compression_algorithm() { return compression_algorithm_; }
62 504614 shash::Algorithms hash_algorithm() { return hash_algorithm_; }
63 749258 shash::Suffix hash_suffix() { return hash_suffix_; }
64 499043 bool may_have_chunks() { return may_have_chunks_; }
65 250073 bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; }
66
67 250001 void set_size(uint64_t val) { size_ = val; }
68 249910 void set_may_have_chunks(bool val) { may_have_chunks_ = val; }
69 249350 void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); }
70 250528 bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; }
71 250058 uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); }
72
73 500112 uint64_t GetNumChunks() { return chunks_.size(); }
74 250077 FileChunkList *GetChunksPtr() { return &chunks_; }
75
76 250032 bool Open() { return source_->Open(); }
77 552866 ssize_t Read(void *buffer, size_t nbyte) {
78 552866 return source_->Read(buffer, nbyte);
79 }
80 249818 bool Close() { return source_->Close(); }
81 250009 bool GetSize(uint64_t *size) { return source_->GetSize(size); }
82
83 // Called by ChunkItem constructor, decremented when a chunk is registered
84 255105 void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); }
85 void RegisterChunk(const FileChunk &file_chunk);
86 250522 bool IsProcessed() {
87
4/4
✓ Branch 1 taken 250229 times.
✓ Branch 2 taken 293 times.
✓ Branch 4 taken 250083 times.
✓ Branch 5 taken 146 times.
250522 return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0);
88 }
89
90 private:
91 static const uint64_t kSizeUnknown = uint64_t(-1);
92 static const char kQuitBeaconMarker = '\0';
93
94 UniquePtr<IngestionSource> source_;
95 const zlib::Algorithms compression_algorithm_;
96 const shash::Algorithms hash_algorithm_;
97 const shash::Suffix hash_suffix_;
98 const bool has_legacy_bulk_chunk_;
99
100 uint64_t size_;
101 bool may_have_chunks_;
102
103 Xor32Detector chunk_detector_;
104 shash::Any bulk_hash_;
105 FileChunkList chunks_;
106 /**
107 * Number of chunks created but not yet uploaded and registered
108 */
109 atomic_int64 nchunks_in_fly_;
110 /**
111 * Switches to true once all of the file has been through the chunking
112 * stage
113 */
114 atomic_int32 is_fully_chunked_;
115 pthread_mutex_t lock_;
116 };
117
118
119 /**
120 * A chunk stores the state of compression and hashing contexts as the blocks
121 * move through the pipeline. A chunk can be a "bulk chunk" corresponding to
122 * the processed data of an entire file, or it can be a partial chunk of a
123 * (large) input file.
124 */
125 class ChunkItem : SingleCopy {
126 public:
127 ChunkItem(FileItem *file_item, uint64_t offset);
128
129 void MakeBulkChunk();
130 270354 bool IsSolePiece() {
131
6/6
✓ Branch 0 taken 20442 times.
✓ Branch 1 taken 249912 times.
✓ Branch 2 taken 19 times.
✓ Branch 3 taken 20423 times.
✓ Branch 5 taken 3 times.
✓ Branch 6 taken 16 times.
270354 return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size());
132 }
133
134 20019 bool is_bulk_chunk() { return is_bulk_chunk_; }
135 250523 FileItem *file_item() { return file_item_; }
136 276343 uint64_t offset() { return offset_; }
137 255525 uint64_t size() { return size_; }
138 668579 upload::UploadStreamHandle *upload_handle() { return upload_handle_; }
139 // An active zlib compression stream requires 256kB of memory. Therefore,
140 // we create it only for the absolutely necessary duration and free the space
141 // afterwards.
142 zlib::Compressor *GetCompressor();
143 void ReleaseCompressor();
144
145 630937 shash::ContextPtr hash_ctx() { return hash_ctx_; }
146 743411 shash::Any *hash_ptr() { return &hash_value_; }
147
148
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 255027 times.
255027 void set_size(uint64_t val) { assert(size_ == 0); size_ = val; }
149 250393 void set_upload_handle(upload::UploadStreamHandle *val) {
150
2/4
✓ Branch 0 taken 250394 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 250394 times.
✗ Branch 3 not taken.
250393 assert((upload_handle_ == NULL) && (val != NULL));
151 250394 upload_handle_ = val;
152 250394 }
153
154 private:
155 FileItem *file_item_;
156 uint64_t offset_;
157 /**
158 * The size of a chunk is not defined before the corresponding stop block
159 * has been dispatched.
160 */
161 uint64_t size_;
162 bool is_bulk_chunk_;
163 /**
164 * Deleted by the uploader.
165 */
166 upload::UploadStreamHandle *upload_handle_;
167 UniquePtr<zlib::Compressor> compressor_;
168 shash::ContextPtr hash_ctx_;
169 shash::Any hash_value_;
170 unsigned char hash_ctx_buffer_[shash::kMaxContextSize];
171 };
172
173
174 /**
175 * A block is an item of work in the pipeline. A sequence of data blocks
176 * followed by a stop block constitutes a Chunk. A sequence of Chunks in turn
177 * build constitute a file.
178 * A block that carries data must have a non-zero-length payload.
179 */
180 class BlockItem : SingleCopy {
181 public:
182 enum BlockType {
183 kBlockHollow,
184 kBlockData,
185 kBlockStop,
186 };
187
188 explicit BlockItem(ItemAllocator *allocator);
189 BlockItem(int64_t tag, ItemAllocator *allocator);
190 ~BlockItem();
191
192 2792 static BlockItem *CreateQuitBeacon() {
193
1/2
✓ Branch 2 taken 2792 times.
✗ Branch 3 not taken.
2792 return new BlockItem(NULL);
194 }
195 2363636 bool IsQuitBeacon() {
196 2363636 return type_ == kBlockHollow;
197 }
198
199 void MakeStop();
200 void MakeData(uint32_t capacity);
201 void MakeDataMove(BlockItem *other);
202 void MakeDataCopy(const unsigned char *data, uint32_t size);
203 void SetFileItem(FileItem *item);
204 void SetChunkItem(ChunkItem *item);
205 // Free data and reset to hollow block
206 void Reset();
207
208 uint32_t Write(void *buf, uint32_t count);
209
210 bool IsEmpty() { return size_ == 0; }
211 1295505 bool IsFull() { return size_ == capacity_; }
212
213 526416415 unsigned char *data() { return data_; }
214 524940272 uint32_t capacity() { return capacity_; }
215 3969434 uint32_t size() { return size_; }
216
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 650982 times.
650982 void set_size(uint32_t val) { assert(val <= capacity_); size_ = val; }
217
218 2513834 BlockType type() { return type_; }
219 3481773 int64_t tag() { return tag_; }
220 1183155 FileItem *file_item() { return file_item_; }
221 3294291 ChunkItem *chunk_item() { return chunk_item_; }
222 251728 static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); }
223
224 private:
225 /**
226 * Total capacity of all BlockItem()
227 */
228 static atomic_int64 managed_bytes_;
229
230 // Forget pointer to the data
231 void Discharge();
232
233 ItemAllocator *allocator_;
234 BlockType type_;
235
236 /**
237 * Blocks with the same tag need to be processed sequentially. That is, no
238 * two threads of the same pipeline stage must operate on blocks of the same
239 * tag. The tags roughly correspond to chunks.
240 * Tags can (and should) be set exactly once in the life time of a block.
241 */
242 int64_t tag_;
243
244 /**
245 * Can be set exactly once.
246 */
247 FileItem *file_item_;
248 ChunkItem *chunk_item_;
249
250 /**
251 * Managed by ItemAllocator
252 */
253 unsigned char *data_;
254 uint32_t capacity_;
255 uint32_t size_;
256 };
257
258 #endif // CVMFS_INGESTION_ITEM_H_
259