GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/task_compress.cc Lines: 42 42 100.0 %
Date: 2019-02-03 02:48:13 Branches: 20 24 83.3 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "cvmfs_config.h"
6
#include "task_compress.h"
7
8
#include <cstdlib>
9
10
#include "compression.h"
11
#include "logging.h"
12
#include "smalloc.h"
13
14
15
/**
16
 * The data payload of the blocks is replaced by their compressed counterparts.
17
 * The block tags stay the same.
18
 * TODO(jblomer): avoid memory copy with EchoCompressor
19
 */
20
328438
void TaskCompress::Process(BlockItem *input_block) {
21
328438
  assert(input_block->chunk_item() != NULL);
22
23
328438
  zlib::Compressor *compressor = input_block->chunk_item()->compressor();
24
328438
  const int64_t tag = input_block->tag();
25
328438
  const bool flush = input_block->type() == BlockItem::kBlockStop;
26
328434
  unsigned char *input_data = input_block->data();
27
328434
  size_t remaining_in_input = input_block->size();
28
29
328438
  BlockItem *output_block = NULL;
30
328438
  TagMap::iterator iter_find = tag_map_.find(tag);
31
328430
  if (iter_find == tag_map_.end()) {
32
    // So far unseen chunk, start new stream of compressed blocks
33
1783
    output_block = new BlockItem(tag, allocator_);
34
1783
    output_block->SetFileItem(input_block->file_item());
35
1783
    output_block->SetChunkItem(input_block->chunk_item());
36
1783
    output_block->MakeData(kCompressedBlockSize);
37
1783
    tag_map_[tag] = output_block;
38
  } else {
39
326647
    output_block = iter_find->second;
40
  }
41
42
328430
  bool done = false;
43


600629
  do {
44
600625
    unsigned char *output_data = output_block->data() + output_block->size();
45
600617
    assert(!output_block->IsFull());
46
    size_t remaining_in_output =
47
600617
      output_block->capacity() - output_block->size();
48
49
    done = compressor->Deflate(flush,
50
600621
      &input_data, &remaining_in_input, &output_data, &remaining_in_output);
51
    // remaining_in_output is now number of consumed bytes
52
600573
    output_block->set_size(output_block->size() + remaining_in_output);
53
54
600577
    if (output_block->IsFull()) {
55
542443
      tubes_out_->Dispatch(output_block);
56
542483
      output_block = new BlockItem(tag, allocator_);
57
542491
      output_block->SetFileItem(input_block->file_item());
58
542491
      output_block->SetChunkItem(input_block->chunk_item());
59
542491
      output_block->MakeData(kCompressedBlockSize);
60
542491
      tag_map_[tag] = output_block;
61
    }
62
  } while ((remaining_in_input > 0) || (flush && !done));
63
64
328434
  if (flush) {
65
1783
    if (output_block->size() > 0)
66
1782
      tubes_out_->Dispatch(output_block);
67
    else
68
1
      delete output_block;
69
1783
    tag_map_.erase(tag);
70
71
1783
    BlockItem *stop_block = new BlockItem(tag, allocator_);
72
1783
    stop_block->MakeStop();
73
1783
    stop_block->SetFileItem(input_block->file_item());
74
1783
    stop_block->SetChunkItem(input_block->chunk_item());
75
1783
    tubes_out_->Dispatch(stop_block);
76
  }
77
78
328434
  delete input_block;
79
328442
}