GCC Code Coverage Report | |||||||||||||||||||||
|
|||||||||||||||||||||
Line | Branch | Exec | Source |
1 |
/** |
||
2 |
* This file is part of the CernVM File System. |
||
3 |
*/ |
||
4 |
|||
5 |
#ifndef CVMFS_INGESTION_ITEM_H_ |
||
6 |
#define CVMFS_INGESTION_ITEM_H_ |
||
7 |
|||
8 |
#include <pthread.h> |
||
9 |
#include <stdint.h> |
||
10 |
|||
11 |
#include <cassert> |
||
12 |
#include <string> |
||
13 |
#include <vector> |
||
14 |
|||
15 |
#include "atomic.h" |
||
16 |
#include "compression.h" |
||
17 |
#include "file_chunk.h" |
||
18 |
#include "hash.h" |
||
19 |
#include "ingestion/chunk_detector.h" |
||
20 |
#include "ingestion/ingestion_source.h" |
||
21 |
#include "util/pointer.h" |
||
22 |
#include "util/single_copy.h" |
||
23 |
|||
24 |
namespace upload { |
||
25 |
struct UploadStreamHandle; |
||
26 |
} |
||
27 |
|||
28 |
class ItemAllocator; |
||
29 |
|||
30 |
/** |
||
31 |
* Carries the information necessary to compress and checksum a file. During |
||
32 |
* processing, the bulk chunk and the chunks_ vector are filled. |
||
33 |
*/ |
||
34 |
class FileItem : SingleCopy { |
||
35 |
public: |
||
36 |
explicit FileItem( |
||
37 |
IngestionSource* source, |
||
38 |
uint64_t min_chunk_size = 4 * 1024 * 1024, |
||
39 |
uint64_t avg_chunk_size = 8 * 1024 * 1024, |
||
40 |
uint64_t max_chunk_size = 16 * 1024 * 1024, |
||
41 |
zlib::Algorithms compression_algorithm = zlib::kZlibDefault, |
||
42 |
shash::Algorithms hash_algorithm = shash::kSha1, |
||
43 |
shash::Suffix hash_suffix = shash::kSuffixNone, |
||
44 |
bool may_have_chunks = true, |
||
45 |
bool has_legacy_bulk_chunk = false); |
||
46 |
~FileItem(); |
||
47 |
|||
48 |
793 |
static FileItem *CreateQuitBeacon() { |
|
49 |
793 |
std::string quit_marker = std::string(1, kQuitBeaconMarker); |
|
50 |
793 |
UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker)); |
|
51 |
793 |
return new FileItem(source.Release()); |
|
52 |
} |
||
53 |
1097 |
bool IsQuitBeacon() { |
|
54 |
✓✓✓✗ ✓✓✗✗ ✗✗✗✓ ✗ |
1097 |
return (path().length() == 1) && (path()[0] == kQuitBeaconMarker); |
55 |
} |
||
56 |
|||
57 |
2339 |
std::string path() { return source_->GetPath(); } |
|
58 |
7030 |
uint64_t size() { return size_; } |
|
59 |
226493 |
Xor32Detector *chunk_detector() { return &chunk_detector_; } |
|
60 |
825 |
shash::Any bulk_hash() { return bulk_hash_; } |
|
61 |
6974 |
zlib::Algorithms compression_algorithm() { return compression_algorithm_; } |
|
62 |
6974 |
shash::Algorithms hash_algorithm() { return hash_algorithm_; } |
|
63 |
447 |
shash::Suffix hash_suffix() { return hash_suffix_; } |
|
64 |
313 |
bool may_have_chunks() { return may_have_chunks_; } |
|
65 |
197 |
bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; } |
|
66 |
|||
67 |
160 |
void set_size(uint64_t val) { size_ = val; } |
|
68 |
100 |
void set_may_have_chunks(bool val) { may_have_chunks_ = val; } |
|
69 |
157 |
void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); } |
|
70 |
1820 |
bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; } |
|
71 |
154 |
uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); } |
|
72 |
|||
73 |
300 |
uint64_t GetNumChunks() { return chunks_.size(); } |
|
74 |
149 |
FileChunkList *GetChunksPtr() { return &chunks_; } |
|
75 |
|||
76 |
155 |
bool Open() { return source_->Open(); } |
|
77 |
216390 |
ssize_t Read(void *buffer, size_t nbyte) { |
|
78 |
216390 |
return source_->Read(buffer, nbyte); |
|
79 |
} |
||
80 |
155 |
bool Close() { return source_->Close(); } |
|
81 |
155 |
bool GetSize(uint64_t *size) { return source_->GetSize(size); } |
|
82 |
|||
83 |
// Called by ChunkItem constructor, decremented when a chunk is registered |
||
84 |
6825 |
void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); } |
|
85 |
void RegisterChunk(const FileChunk &file_chunk); |
||
86 |
1814 |
bool IsProcessed() { |
|
87 |
✓✓✓✓ |
1814 |
return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0); |
88 |
} |
||
89 |
|||
90 |
private: |
||
91 |
static const uint64_t kSizeUnknown = uint64_t(-1); |
||
92 |
static const char kQuitBeaconMarker = '\0'; |
||
93 |
|||
94 |
UniquePtr<IngestionSource> source_; |
||
95 |
const zlib::Algorithms compression_algorithm_; |
||
96 |
const shash::Algorithms hash_algorithm_; |
||
97 |
const shash::Suffix hash_suffix_; |
||
98 |
const bool has_legacy_bulk_chunk_; |
||
99 |
|||
100 |
uint64_t size_; |
||
101 |
bool may_have_chunks_; |
||
102 |
|||
103 |
Xor32Detector chunk_detector_; |
||
104 |
shash::Any bulk_hash_; |
||
105 |
FileChunkList chunks_; |
||
106 |
/** |
||
107 |
* Number of chunks created but not yet uploaded and registered |
||
108 |
*/ |
||
109 |
atomic_int64 nchunks_in_fly_; |
||
110 |
/** |
||
111 |
* Switches to true once all of the file has been through the chunking |
||
112 |
* stage |
||
113 |
*/ |
||
114 |
atomic_int32 is_fully_chunked_; |
||
115 |
pthread_mutex_t lock_; |
||
116 |
}; |
||
117 |
|||
118 |
|||
119 |
/** |
||
120 |
* A chunk stores the state of compression and hashing contexts as the blocks |
||
121 |
* move through the pipeline. A chunk can be a "bulk chunk" corresponding to |
||
122 |
* the processed data of an entire file, or it can be a partial chunk of a |
||
123 |
* (large) input file. |
||
124 |
*/ |
||
125 |
6825 |
class ChunkItem : SingleCopy { |
|
126 |
public: |
||
127 |
ChunkItem(FileItem *file_item, uint64_t offset); |
||
128 |
|||
129 |
void MakeBulkChunk(); |
||
130 |
21816 |
bool IsSolePiece() { |
|
131 |
✓✓✓✓ ✓✓ |
21816 |
return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size()); |
132 |
} |
||
133 |
|||
134 |
20019 |
bool is_bulk_chunk() { return is_bulk_chunk_; } |
|
135 |
1815 |
FileItem *file_item() { return file_item_; } |
|
136 |
29932 |
uint64_t offset() { return offset_; } |
|
137 |
6817 |
uint64_t size() { return size_; } |
|
138 |
578818 |
upload::UploadStreamHandle *upload_handle() { return upload_handle_; } |
|
139 |
328438 |
zlib::Compressor *compressor() { return compressor_.weak_ref(); } |
|
140 |
546017 |
shash::ContextPtr hash_ctx() { return hash_ctx_; } |
|
141 |
5451 |
shash::Any *hash_ptr() { return &hash_value_; } |
|
142 |
|||
143 |
✗✓ | 6788 |
void set_size(uint64_t val) { assert(size_ == 0); size_ = val; } |
144 |
1814 |
void set_upload_handle(upload::UploadStreamHandle *val) { |
|
145 |
✓✗✗✓ |
1814 |
assert((upload_handle_ == NULL) && (val != NULL)); |
146 |
1814 |
upload_handle_ = val; |
|
147 |
1814 |
} |
|
148 |
|||
149 |
private: |
||
150 |
FileItem *file_item_; |
||
151 |
uint64_t offset_; |
||
152 |
/** |
||
153 |
* The size of a chunk is not defined before the corresponding stop block |
||
154 |
* has been dispatched. |
||
155 |
*/ |
||
156 |
uint64_t size_; |
||
157 |
bool is_bulk_chunk_; |
||
158 |
/** |
||
159 |
* Deleted by the uploader. |
||
160 |
*/ |
||
161 |
upload::UploadStreamHandle *upload_handle_; |
||
162 |
UniquePtr<zlib::Compressor> compressor_; |
||
163 |
shash::ContextPtr hash_ctx_; |
||
164 |
UniquePtr<void> hash_ctx_buffer_; |
||
165 |
shash::Any hash_value_; |
||
166 |
}; |
||
167 |
|||
168 |
|||
169 |
/** |
||
170 |
* A block is an item of work in the pipeline. A sequence of data blocks |
||
171 |
* followed by a stop block constitutes a Chunk. A sequence of Chunks in turn |
||
172 |
* build constitute a file. |
||
173 |
* A block that carries data must have a non-zero-length payload. |
||
174 |
*/ |
||
175 |
class BlockItem : SingleCopy { |
||
176 |
public: |
||
177 |
enum BlockType { |
||
178 |
kBlockHollow, |
||
179 |
kBlockData, |
||
180 |
kBlockStop, |
||
181 |
}; |
||
182 |
|||
183 |
explicit BlockItem(ItemAllocator *allocator); |
||
184 |
BlockItem(int64_t tag, ItemAllocator *allocator); |
||
185 |
~BlockItem(); |
||
186 |
|||
187 |
708 |
static BlockItem *CreateQuitBeacon() { |
|
188 |
708 |
return new BlockItem(NULL); |
|
189 |
} |
||
190 |
1680256 |
bool IsQuitBeacon() { |
|
191 |
1680256 |
return type_ == kBlockHollow; |
|
192 |
} |
||
193 |
|||
194 |
void MakeStop(); |
||
195 |
void MakeData(uint32_t capacity); |
||
196 |
void MakeDataMove(BlockItem *other); |
||
197 |
void MakeDataCopy(const unsigned char *data, uint32_t size); |
||
198 |
void SetFileItem(FileItem *item); |
||
199 |
void SetChunkItem(ChunkItem *item); |
||
200 |
// Free data and reset to hollow block |
||
201 |
void Reset(); |
||
202 |
|||
203 |
uint32_t Write(void *buf, uint32_t count); |
||
204 |
|||
205 |
bool IsEmpty() { return size_ == 0; } |
||
206 |
1201134 |
bool IsFull() { return size_ == capacity_; } |
|
207 |
|||
208 |
526701201 |
unsigned char *data() { return data_; } |
|
209 |
524891911 |
uint32_t capacity() { return capacity_; } |
|
210 |
4229115 |
uint32_t size() { return size_; } |
|
211 |
✗✓ | 602216 |
void set_size(uint32_t val) { assert(val <= capacity_); size_ = val; } |
212 |
|||
213 |
1802058 |
BlockType type() { return type_; } |
|
214 |
1448682 |
int64_t tag() { return tag_; } |
|
215 |
772477 |
FileItem *file_item() { return file_item_; } |
|
216 |
2392473 |
ChunkItem *chunk_item() { return chunk_item_; } |
|
217 |
6935 |
static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); } |
|
218 |
|||
219 |
private: |
||
220 |
// Forget pointer to the data |
||
221 |
void Discharge(); |
||
222 |
|||
223 |
/** |
||
224 |
* Total capacity of all BlockItem() |
||
225 |
*/ |
||
226 |
static atomic_int64 managed_bytes_; |
||
227 |
|||
228 |
ItemAllocator *allocator_; |
||
229 |
BlockType type_; |
||
230 |
|||
231 |
/** |
||
232 |
* Blocks with the same tag need to be processed sequentially. That is, no |
||
233 |
* two threads of the same pipeline stage must operate on blocks of the same |
||
234 |
* tag. The tags roughly correspond to chunks. |
||
235 |
* Tags can (and should) be set exactly once in the life time of a block. |
||
236 |
*/ |
||
237 |
int64_t tag_; |
||
238 |
|||
239 |
/** |
||
240 |
* Can be set exactly once. |
||
241 |
*/ |
||
242 |
FileItem *file_item_; |
||
243 |
ChunkItem *chunk_item_; |
||
244 |
|||
245 |
/** |
||
246 |
* Managed by ItemAllocator |
||
247 |
*/ |
||
248 |
unsigned char *data_; |
||
249 |
uint32_t capacity_; |
||
250 |
uint32_t size_; |
||
251 |
}; |
||
252 |
|||
253 |
#endif // CVMFS_INGESTION_ITEM_H_ |
Generated by: GCOVR (Version 4.1) |