GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/task_chunk.cc Lines: 85 86 98.8 %
Date: 2019-02-03 02:48:13 Branches: 43 55 78.2 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include <unistd.h>
6
7
#include <cassert>
8
9
#include "ingestion/task_chunk.h"
10
11
/**
12
 * The tags from the read stage in the pipeline and the tags given in the
13
 * chunking stage can safely overlap.  Nevertheless, debugging might be easier
14
 * if they don't.  So let's start with a high number.
15
 */
16
atomic_int64 TaskChunk::tag_seq_ = 2 << 28;
17
18
/**
19
 * Consumes the stream of input blocks and produces new output blocks according
20
 * to cut marks.  The output blocks correspond to chunks.
21
 */
22
226394
void TaskChunk::Process(BlockItem *input_block) {
23
226394
  FileItem *file_item = input_block->file_item();
24
226394
  int64_t input_tag = input_block->tag();
25

226394
  assert((file_item != NULL) && (input_tag >= 0));
26
27
226394
  ChunkInfo chunk_info;
28
  // Do we see blocks of the file for the first time?
29
226394
  TagMap::const_iterator iter_find = tag_map_.find(input_tag);
30
226394
  if (iter_find == tag_map_.end()) {
31
    // We may have only regular chunks, only a bulk chunk, or both.  We may
32
    // end up in a situation where we produced only a single non-bulk chunk.
33
    // This needs to be fixed up later in the pipeline by the write task.
34
155
    if (file_item->may_have_chunks()) {
35
48
      chunk_info.next_chunk = new ChunkItem(file_item, 0);
36
48
      chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1);
37
48
      if (file_item->has_legacy_bulk_chunk()) {
38
41
        chunk_info.bulk_chunk = new ChunkItem(file_item, 0);
39
      }
40
    } else {
41
107
      chunk_info.bulk_chunk = new ChunkItem(file_item, 0);
42
    }
43
44
155
    if (chunk_info.bulk_chunk != NULL) {
45
148
      chunk_info.bulk_chunk->MakeBulkChunk();
46
148
      chunk_info.bulk_chunk->set_size(file_item->size());
47
148
      chunk_info.output_tag_bulk = atomic_xadd64(&tag_seq_, 1);
48
    }
49
155
    tag_map_[input_tag] = chunk_info;
50
  } else {
51
226239
    chunk_info = iter_find->second;
52
  }
53

226394
  assert((chunk_info.bulk_chunk != NULL) || (chunk_info.next_chunk != NULL));
54
55
226394
  BlockItem *output_block_bulk = NULL;
56
226394
  if (chunk_info.bulk_chunk != NULL) {
57
215347
    output_block_bulk = new BlockItem(chunk_info.output_tag_bulk, allocator_);
58
215347
    output_block_bulk->SetFileItem(file_item);
59
215347
    output_block_bulk->SetChunkItem(chunk_info.bulk_chunk);
60
  }
61
62
226394
  ChunkDetector *chunk_detector = file_item->chunk_detector();
63
226394
  switch (input_block->type()) {
64
    case BlockItem::kBlockStop:
65
      // End of the file, no more new chunks
66
155
      file_item->set_is_fully_chunked();
67
155
      if (output_block_bulk) output_block_bulk->MakeStop();
68
155
      if (chunk_info.next_chunk != NULL) {
69
48
        assert(file_item->size() >= chunk_info.next_chunk->offset());
70
        chunk_info.next_chunk->set_size(
71
48
          file_item->size() - chunk_info.next_chunk->offset());
72
        BlockItem *block_stop =
73
48
          new BlockItem(chunk_info.output_tag_chunk, allocator_);
74
48
        block_stop->SetFileItem(file_item);
75
48
        block_stop->SetChunkItem(chunk_info.next_chunk);
76
48
        block_stop->MakeStop();
77
48
        tubes_out_->Dispatch(block_stop);
78
      }
79
155
      tag_map_.erase(input_tag);
80
155
      break;
81
82
    case BlockItem::kBlockData:
83
226239
      if (output_block_bulk) {
84
215199
        if (chunk_info.next_chunk != NULL) {
85
          // Reserve zero-copy for the regular chunk
86
          output_block_bulk->MakeDataCopy(input_block->data(),
87
108544
                                          input_block->size());
88
        } else {
89
          // There is only the bulk chunk, zero copy
90
106655
          output_block_bulk->MakeDataMove(input_block);
91
        }
92
      }
93
94
226239
      if (chunk_info.next_chunk != NULL) {
95
119584
        unsigned offset_in_block = 0;
96
119584
        uint64_t cut_mark = 0;
97
245761
        while ((cut_mark = chunk_detector->FindNextCutMark(input_block)) != 0) {
98
6593
          assert(cut_mark >= chunk_info.offset + offset_in_block);
99
6593
          uint64_t cut_mark_in_block = cut_mark - chunk_info.offset;
100
6593
          assert(cut_mark_in_block >= offset_in_block);
101
6593
          assert(cut_mark_in_block <= input_block->size());
102
6593
          unsigned tail_size = cut_mark_in_block - offset_in_block;
103
104
6593
          if (tail_size > 0) {
105
            BlockItem *block_tail =
106
6593
              new BlockItem(chunk_info.output_tag_chunk, allocator_);
107
6593
            block_tail->SetFileItem(file_item);
108
6593
            block_tail->SetChunkItem(chunk_info.next_chunk);
109
            block_tail->MakeDataCopy(input_block->data() + offset_in_block,
110
6593
                                     tail_size);
111
6593
            tubes_out_->Dispatch(block_tail);
112
          }
113
114
6593
          assert(cut_mark >= chunk_info.next_chunk->offset());
115
          // If the cut mark happens to at the end of file, let the final
116
          // incoming stop block schedule dispatch of the chunk stop block
117
6593
          if (cut_mark < file_item->size()) {
118
            chunk_info.next_chunk->set_size(
119
6592
              cut_mark - chunk_info.next_chunk->offset());
120
            BlockItem *block_stop =
121
6592
              new BlockItem(chunk_info.output_tag_chunk, allocator_);
122
6592
            block_stop->SetFileItem(file_item);
123
6592
            block_stop->SetChunkItem(chunk_info.next_chunk);
124
6592
            block_stop->MakeStop();
125
6592
            tubes_out_->Dispatch(block_stop);
126
127
6592
            chunk_info.next_chunk = new ChunkItem(file_item, cut_mark);
128
6592
            chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1);
129
          }
130
6593
          offset_in_block = cut_mark_in_block;
131
        }
132
119584
        chunk_info.offset += offset_in_block;
133
134
119584
        assert(input_block->size() >= offset_in_block);
135
119584
        unsigned tail_size = input_block->size() - offset_in_block;
136
119584
        if (tail_size > 0) {
137
          BlockItem *block_tail =
138
119370
            new BlockItem(chunk_info.output_tag_chunk, allocator_);
139
119370
          block_tail->SetFileItem(file_item);
140
119370
          block_tail->SetChunkItem(chunk_info.next_chunk);
141
          block_tail->MakeDataCopy(input_block->data() + offset_in_block,
142
119370
                                   tail_size);
143
119370
          tubes_out_->Dispatch(block_tail);
144
119370
          chunk_info.offset += tail_size;
145
        }
146
147
        // Delete data from incoming block
148
119584
        input_block->Reset();
149
      }
150
151
226239
      tag_map_[input_tag] = chunk_info;
152
226239
      break;
153
154
    default:
155
      abort();
156
  }
157
158
226394
  delete input_block;
159
226394
  if (output_block_bulk) tubes_out_->Dispatch(output_block_bulk);
160
226394
}