CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task_chunk.cc
Go to the documentation of this file.
1 
5 #include <unistd.h>
6 
7 #include <cassert>
8 
9 #include "ingestion/task_chunk.h"
10 #include "util/exception.h"
11 
18 
23 void TaskChunk::Process(BlockItem *input_block) {
24  FileItem *file_item = input_block->file_item();
25  int64_t input_tag = input_block->tag();
26  assert((file_item != NULL) && (input_tag >= 0));
27 
28  ChunkInfo chunk_info;
29  // Do we see blocks of the file for the first time?
30  if (!tag_map_.Lookup(input_tag, &chunk_info)) {
31  // We may have only regular chunks, only a bulk chunk, or both. We may
32  // end up in a situation where we produced only a single non-bulk chunk.
33  // This needs to be fixed up later in the pipeline by the write task.
34  if (file_item->may_have_chunks()) {
35  chunk_info.next_chunk = new ChunkItem(file_item, 0);
36  chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1);
37  if (file_item->has_legacy_bulk_chunk()) {
38  chunk_info.bulk_chunk = new ChunkItem(file_item, 0);
39  }
40  } else {
41  chunk_info.bulk_chunk = new ChunkItem(file_item, 0);
42  }
43 
44  if (chunk_info.bulk_chunk != NULL) {
45  chunk_info.bulk_chunk->MakeBulkChunk();
46  chunk_info.bulk_chunk->set_size(file_item->size());
47  chunk_info.output_tag_bulk = atomic_xadd64(&tag_seq_, 1);
48  }
49  tag_map_.Insert(input_tag, chunk_info);
50  }
51  assert((chunk_info.bulk_chunk != NULL) || (chunk_info.next_chunk != NULL));
52 
53  BlockItem *output_block_bulk = NULL;
54  if (chunk_info.bulk_chunk != NULL) {
55  output_block_bulk = new BlockItem(chunk_info.output_tag_bulk, allocator_);
56  output_block_bulk->SetFileItem(file_item);
57  output_block_bulk->SetChunkItem(chunk_info.bulk_chunk);
58  }
59 
60  ChunkDetector *chunk_detector = file_item->chunk_detector();
61  switch (input_block->type()) {
63  // End of the file, no more new chunks
64  file_item->set_is_fully_chunked();
65  if (output_block_bulk) output_block_bulk->MakeStop();
66  if (chunk_info.next_chunk != NULL) {
67  assert(file_item->size() >= chunk_info.next_chunk->offset());
68  chunk_info.next_chunk->set_size(
69  file_item->size() - chunk_info.next_chunk->offset());
70  BlockItem *block_stop =
71  new BlockItem(chunk_info.output_tag_chunk, allocator_);
72  block_stop->SetFileItem(file_item);
73  block_stop->SetChunkItem(chunk_info.next_chunk);
74  block_stop->MakeStop();
75  tubes_out_->Dispatch(block_stop);
76  }
77  tag_map_.Erase(input_tag);
78  break;
79 
81  if (output_block_bulk) {
82  if (chunk_info.next_chunk != NULL) {
83  // Reserve zero-copy for the regular chunk
84  output_block_bulk->MakeDataCopy(input_block->data(),
85  input_block->size());
86  } else {
87  // There is only the bulk chunk, zero copy
88  output_block_bulk->MakeDataMove(input_block);
89  }
90  }
91 
92  if (chunk_info.next_chunk != NULL) {
93  unsigned offset_in_block = 0;
94  uint64_t cut_mark = 0;
95  while ((cut_mark = chunk_detector->FindNextCutMark(input_block)) != 0) {
96  assert(cut_mark >= chunk_info.offset + offset_in_block);
97  uint64_t cut_mark_in_block = cut_mark - chunk_info.offset;
98  assert(cut_mark_in_block >= offset_in_block);
99  assert(cut_mark_in_block <= input_block->size());
100  unsigned tail_size = cut_mark_in_block - offset_in_block;
101 
102  if (tail_size > 0) {
103  BlockItem *block_tail =
104  new BlockItem(chunk_info.output_tag_chunk, allocator_);
105  block_tail->SetFileItem(file_item);
106  block_tail->SetChunkItem(chunk_info.next_chunk);
107  block_tail->MakeDataCopy(input_block->data() + offset_in_block,
108  tail_size);
109  tubes_out_->Dispatch(block_tail);
110  }
111 
112  assert(cut_mark >= chunk_info.next_chunk->offset());
113  // If the cut mark happens to at the end of file, let the final
114  // incoming stop block schedule dispatch of the chunk stop block
115  if (cut_mark < file_item->size()) {
116  chunk_info.next_chunk->set_size(
117  cut_mark - chunk_info.next_chunk->offset());
118  BlockItem *block_stop =
119  new BlockItem(chunk_info.output_tag_chunk, allocator_);
120  block_stop->SetFileItem(file_item);
121  block_stop->SetChunkItem(chunk_info.next_chunk);
122  block_stop->MakeStop();
123  tubes_out_->Dispatch(block_stop);
124 
125  chunk_info.next_chunk = new ChunkItem(file_item, cut_mark);
126  chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1);
127  }
128  offset_in_block = cut_mark_in_block;
129  }
130  chunk_info.offset += offset_in_block;
131 
132  assert(input_block->size() >= offset_in_block);
133  unsigned tail_size = input_block->size() - offset_in_block;
134  if (tail_size > 0) {
135  BlockItem *block_tail =
136  new BlockItem(chunk_info.output_tag_chunk, allocator_);
137  block_tail->SetFileItem(file_item);
138  block_tail->SetChunkItem(chunk_info.next_chunk);
139  block_tail->MakeDataCopy(input_block->data() + offset_in_block,
140  tail_size);
141  tubes_out_->Dispatch(block_tail);
142  chunk_info.offset += tail_size;
143  }
144 
145  // Delete data from incoming block
146  input_block->Reset();
147  }
148 
149  tag_map_.Insert(input_tag, chunk_info);
150  break;
151 
152  default:
153  PANIC(NULL);
154  }
155 
156  delete input_block;
157  if (output_block_bulk) tubes_out_->Dispatch(output_block_bulk);
158 }
void MakeDataMove(BlockItem *other)
Definition: item.cc:172
void set_is_fully_chunked()
Definition: item.h:69
int64_t atomic_int64
Definition: atomic.h:18
ItemAllocator * allocator_
Definition: task_chunk.h:88
void SetChunkItem(ChunkItem *item)
Definition: item.cc:216
#define PANIC(...)
Definition: exception.h:29
void Reset()
Definition: item.cc:205
bool may_have_chunks()
Definition: item.h:64
assert((mem||(size==0))&&"Out Of Memory")
unsigned char * data()
Definition: item.h:213
void SetFileItem(FileItem *item)
Definition: item.cc:223
Xor32Detector * chunk_detector()
Definition: item.h:59
void MakeBulkChunk()
Definition: item.cc:87
virtual void Process(BlockItem *input_block)
Definition: task_chunk.cc:23
TagMap tag_map_
Definition: task_chunk.h:89
void set_size(uint64_t val)
Definition: item.h:148
int64_t output_tag_bulk
Definition: task_chunk.h:64
ChunkItem * next_chunk
Definition: task_chunk.h:69
ChunkItem * bulk_chunk
Definition: task_chunk.h:73
int64_t tag()
Definition: item.h:219
int64_t output_tag_chunk
Definition: task_chunk.h:60
static atomic_int64 tag_seq_
Definition: task_chunk.h:85
bool has_legacy_bulk_chunk()
Definition: item.h:65
FileItem * file_item()
Definition: item.h:220
uint32_t size()
Definition: item.h:215
Tube< ItemT >::Link * Dispatch(ItemT *item)
Definition: tube.h:271
void Insert(const Key &key, const Value &value)
Definition: smallhash.h:109
uint64_t FindNextCutMark(BlockItem *block)
Definition: item.h:34
uint64_t offset()
Definition: item.h:136
TubeGroup< BlockItem > * tubes_out_
Definition: task_chunk.h:87
uint64_t size()
Definition: item.h:58
bool Erase(const Key &key)
Definition: smallhash.h:115
BlockType type()
Definition: item.h:218
void MakeDataCopy(const unsigned char *data, uint32_t size)
Definition: item.cc:189
bool Lookup(const Key &key, Value *value) const
Definition: smallhash.h:73
void MakeStop()
Definition: item.cc:151
static void size_t size
Definition: smalloc.h:54