CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task_compress.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "task_compress.h"
7 
8 #include <cstdlib>
9 
10 #include "compression.h"
11 #include "util/logging.h"
12 #include "util/smalloc.h"
13 
14 
20 void TaskCompress::Process(BlockItem *input_block) {
21  assert(input_block->chunk_item() != NULL);
22 
23  zlib::Compressor *compressor = input_block->chunk_item()->GetCompressor();
24  const int64_t tag = input_block->tag();
25  const bool flush = input_block->type() == BlockItem::kBlockStop;
26  unsigned char *input_data = input_block->data();
27  size_t remaining_in_input = input_block->size();
28 
29  BlockItem *output_block = NULL;
30  if (!tag_map_.Lookup(tag, &output_block)) {
31  // So far unseen chunk, start new stream of compressed blocks
32  output_block = new BlockItem(tag, allocator_);
33  output_block->SetFileItem(input_block->file_item());
34  output_block->SetChunkItem(input_block->chunk_item());
35  output_block->MakeData(kCompressedBlockSize);
36  tag_map_.Insert(tag, output_block);
37  }
38 
39  bool done = false;
40  do {
41  unsigned char *output_data = output_block->data() + output_block->size();
42  assert(!output_block->IsFull());
43  size_t remaining_in_output =
44  output_block->capacity() - output_block->size();
45 
46  done = compressor->Deflate(flush,
47  &input_data, &remaining_in_input, &output_data, &remaining_in_output);
48  // remaining_in_output is now number of consumed bytes
49  output_block->set_size(output_block->size() + remaining_in_output);
50 
51  if (output_block->IsFull()) {
52  tubes_out_->Dispatch(output_block);
53  output_block = new BlockItem(tag, allocator_);
54  output_block->SetFileItem(input_block->file_item());
55  output_block->SetChunkItem(input_block->chunk_item());
56  output_block->MakeData(kCompressedBlockSize);
57  tag_map_.Insert(tag, output_block);
58  }
59  } while ((remaining_in_input > 0) || (flush && !done));
60 
61  if (flush) {
62  input_block->chunk_item()->ReleaseCompressor();
63 
64  if (output_block->size() > 0)
65  tubes_out_->Dispatch(output_block);
66  else
67  delete output_block;
68  tag_map_.Erase(tag);
69 
70  BlockItem *stop_block = new BlockItem(tag, allocator_);
71  stop_block->MakeStop();
72  stop_block->SetFileItem(input_block->file_item());
73  stop_block->SetChunkItem(input_block->chunk_item());
74  tubes_out_->Dispatch(stop_block);
75  }
76 
77  delete input_block;
78 }
virtual void Process(BlockItem *input_block)
void SetChunkItem(ChunkItem *item)
Definition: item.cc:216
zlib::Compressor * GetCompressor()
Definition: item.cc:93
assert((mem||(size==0))&&"Out Of Memory")
unsigned char * data()
Definition: item.h:213
void SetFileItem(FileItem *item)
Definition: item.cc:223
uint32_t capacity()
Definition: item.h:214
TagMap tag_map_
Definition: task_compress.h:49
void ReleaseCompressor()
Definition: item.cc:102
int64_t tag()
Definition: item.h:219
ItemAllocator * allocator_
Definition: task_compress.h:48
void set_size(uint32_t val)
Definition: item.h:216
ChunkItem * chunk_item()
Definition: item.h:221
FileItem * file_item()
Definition: item.h:220
uint32_t size()
Definition: item.h:215
Tube< ItemT >::Link * Dispatch(ItemT *item)
Definition: tube.h:271
void MakeData(uint32_t capacity)
Definition: item.cc:157
void Insert(const Key &key, const Value &value)
Definition: smallhash.h:109
static const unsigned kCompressedBlockSize
Definition: task_compress.h:20
bool IsFull()
Definition: item.h:211
bool Erase(const Key &key)
Definition: smallhash.h:115
BlockType type()
Definition: item.h:218
bool Lookup(const Key &key, Value *value) const
Definition: smallhash.h:73
void MakeStop()
Definition: item.cc:151
TubeGroup< BlockItem > * tubes_out_
Definition: task_compress.h:47