1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#include "cvmfs_config.h" |
6 |
|
|
#include "task_compress.h" |
7 |
|
|
|
8 |
|
|
#include <cstdlib> |
9 |
|
|
|
10 |
|
|
#include "compression.h" |
11 |
|
|
#include "logging.h" |
12 |
|
|
#include "smalloc.h" |
13 |
|
|
|
14 |
|
|
|
15 |
|
|
/** |
16 |
|
|
* The data payload of the blocks is replaced by their compressed counterparts. |
17 |
|
|
* The block tags stay the same. |
18 |
|
|
* TODO(jblomer): avoid memory copy with EchoCompressor |
19 |
|
|
*/ |
20 |
|
328438 |
void TaskCompress::Process(BlockItem *input_block) { |
21 |
✗✓ |
328438 |
assert(input_block->chunk_item() != NULL); |
22 |
|
|
|
23 |
|
328438 |
zlib::Compressor *compressor = input_block->chunk_item()->compressor(); |
24 |
|
328438 |
const int64_t tag = input_block->tag(); |
25 |
|
328438 |
const bool flush = input_block->type() == BlockItem::kBlockStop; |
26 |
|
328434 |
unsigned char *input_data = input_block->data(); |
27 |
|
328434 |
size_t remaining_in_input = input_block->size(); |
28 |
|
|
|
29 |
|
328438 |
BlockItem *output_block = NULL; |
30 |
|
328438 |
TagMap::iterator iter_find = tag_map_.find(tag); |
31 |
✓✓ |
328430 |
if (iter_find == tag_map_.end()) { |
32 |
|
|
// So far unseen chunk, start new stream of compressed blocks |
33 |
|
1783 |
output_block = new BlockItem(tag, allocator_); |
34 |
|
1783 |
output_block->SetFileItem(input_block->file_item()); |
35 |
|
1783 |
output_block->SetChunkItem(input_block->chunk_item()); |
36 |
|
1783 |
output_block->MakeData(kCompressedBlockSize); |
37 |
|
1783 |
tag_map_[tag] = output_block; |
38 |
|
|
} else { |
39 |
|
326647 |
output_block = iter_find->second; |
40 |
|
|
} |
41 |
|
|
|
42 |
|
328430 |
bool done = false; |
43 |
✓✓✓✓ ✓✓✓✓
|
600629 |
do { |
44 |
|
600625 |
unsigned char *output_data = output_block->data() + output_block->size(); |
45 |
✗✓ |
600617 |
assert(!output_block->IsFull()); |
46 |
|
|
size_t remaining_in_output = |
47 |
|
600617 |
output_block->capacity() - output_block->size(); |
48 |
|
|
|
49 |
|
|
done = compressor->Deflate(flush, |
50 |
|
600621 |
&input_data, &remaining_in_input, &output_data, &remaining_in_output); |
51 |
|
|
// remaining_in_output is now number of consumed bytes |
52 |
|
600573 |
output_block->set_size(output_block->size() + remaining_in_output); |
53 |
|
|
|
54 |
✓✓ |
600577 |
if (output_block->IsFull()) { |
55 |
|
542443 |
tubes_out_->Dispatch(output_block); |
56 |
|
542483 |
output_block = new BlockItem(tag, allocator_); |
57 |
|
542491 |
output_block->SetFileItem(input_block->file_item()); |
58 |
|
542491 |
output_block->SetChunkItem(input_block->chunk_item()); |
59 |
|
542491 |
output_block->MakeData(kCompressedBlockSize); |
60 |
|
542491 |
tag_map_[tag] = output_block; |
61 |
|
|
} |
62 |
|
|
} while ((remaining_in_input > 0) || (flush && !done)); |
63 |
|
|
|
64 |
✓✓ |
328434 |
if (flush) { |
65 |
✓✓ |
1783 |
if (output_block->size() > 0) |
66 |
|
1782 |
tubes_out_->Dispatch(output_block); |
67 |
|
|
else |
68 |
✓✗ |
1 |
delete output_block; |
69 |
|
1783 |
tag_map_.erase(tag); |
70 |
|
|
|
71 |
|
1783 |
BlockItem *stop_block = new BlockItem(tag, allocator_); |
72 |
|
1783 |
stop_block->MakeStop(); |
73 |
|
1783 |
stop_block->SetFileItem(input_block->file_item()); |
74 |
|
1783 |
stop_block->SetChunkItem(input_block->chunk_item()); |
75 |
|
1783 |
tubes_out_->Dispatch(stop_block); |
76 |
|
|
} |
77 |
|
|
|
78 |
✓✗ |
328434 |
delete input_block; |
79 |
|
328442 |
} |