CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task_chunk.cc
Go to the documentation of this file.
1 
5 #include "ingestion/task_chunk.h"
6 
7 #include <unistd.h>
8 
9 #include <cassert>
10 
11 #include "util/exception.h"
12 
19 
24 void TaskChunk::Process(BlockItem *input_block) {
25  FileItem *file_item = input_block->file_item();
26  const int64_t input_tag = input_block->tag();
27  assert((file_item != NULL) && (input_tag >= 0));
28 
29  ChunkInfo chunk_info;
30  // Do we see blocks of the file for the first time?
31  if (!tag_map_.Lookup(input_tag, &chunk_info)) {
32  // We may have only regular chunks, only a bulk chunk, or both. We may
33  // end up in a situation where we produced only a single non-bulk chunk.
34  // This needs to be fixed up later in the pipeline by the write task.
35  if (file_item->may_have_chunks()) {
36  chunk_info.next_chunk = new ChunkItem(file_item, 0);
37  chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1);
38  if (file_item->has_legacy_bulk_chunk()) {
39  chunk_info.bulk_chunk = new ChunkItem(file_item, 0);
40  }
41  } else {
42  chunk_info.bulk_chunk = new ChunkItem(file_item, 0);
43  }
44 
45  if (chunk_info.bulk_chunk != NULL) {
46  chunk_info.bulk_chunk->MakeBulkChunk();
47  chunk_info.bulk_chunk->set_size(file_item->size());
48  chunk_info.output_tag_bulk = atomic_xadd64(&tag_seq_, 1);
49  }
50  tag_map_.Insert(input_tag, chunk_info);
51  }
52  assert((chunk_info.bulk_chunk != NULL) || (chunk_info.next_chunk != NULL));
53 
54  BlockItem *output_block_bulk = NULL;
55  if (chunk_info.bulk_chunk != NULL) {
56  output_block_bulk = new BlockItem(chunk_info.output_tag_bulk, allocator_);
57  output_block_bulk->SetFileItem(file_item);
58  output_block_bulk->SetChunkItem(chunk_info.bulk_chunk);
59  }
60 
61  ChunkDetector *chunk_detector = file_item->chunk_detector();
62  switch (input_block->type()) {
64  // End of the file, no more new chunks
65  file_item->set_is_fully_chunked();
66  if (output_block_bulk)
67  output_block_bulk->MakeStop();
68  if (chunk_info.next_chunk != NULL) {
69  assert(file_item->size() >= chunk_info.next_chunk->offset());
70  chunk_info.next_chunk->set_size(file_item->size()
71  - chunk_info.next_chunk->offset());
72  BlockItem *block_stop = new BlockItem(chunk_info.output_tag_chunk,
73  allocator_);
74  block_stop->SetFileItem(file_item);
75  block_stop->SetChunkItem(chunk_info.next_chunk);
76  block_stop->MakeStop();
77  tubes_out_->Dispatch(block_stop);
78  }
79  tag_map_.Erase(input_tag);
80  break;
81 
83  if (output_block_bulk) {
84  if (chunk_info.next_chunk != NULL) {
85  // Reserve zero-copy for the regular chunk
86  output_block_bulk->MakeDataCopy(input_block->data(),
87  input_block->size());
88  } else {
89  // There is only the bulk chunk, zero copy
90  output_block_bulk->MakeDataMove(input_block);
91  }
92  }
93 
94  if (chunk_info.next_chunk != NULL) {
95  unsigned offset_in_block = 0;
96  uint64_t cut_mark = 0;
97  while ((cut_mark = chunk_detector->FindNextCutMark(input_block)) != 0) {
98  assert(cut_mark >= chunk_info.offset + offset_in_block);
99  const uint64_t cut_mark_in_block = cut_mark - chunk_info.offset;
100  assert(cut_mark_in_block >= offset_in_block);
101  assert(cut_mark_in_block <= input_block->size());
102  const unsigned tail_size = cut_mark_in_block - offset_in_block;
103 
104  if (tail_size > 0) {
105  BlockItem *block_tail = new BlockItem(chunk_info.output_tag_chunk,
106  allocator_);
107  block_tail->SetFileItem(file_item);
108  block_tail->SetChunkItem(chunk_info.next_chunk);
109  block_tail->MakeDataCopy(input_block->data() + offset_in_block,
110  tail_size);
111  tubes_out_->Dispatch(block_tail);
112  }
113 
114  assert(cut_mark >= chunk_info.next_chunk->offset());
115  // If the cut mark happens to at the end of file, let the final
116  // incoming stop block schedule dispatch of the chunk stop block
117  if (cut_mark < file_item->size()) {
118  chunk_info.next_chunk->set_size(cut_mark
119  - chunk_info.next_chunk->offset());
120  BlockItem *block_stop = new BlockItem(chunk_info.output_tag_chunk,
121  allocator_);
122  block_stop->SetFileItem(file_item);
123  block_stop->SetChunkItem(chunk_info.next_chunk);
124  block_stop->MakeStop();
125  tubes_out_->Dispatch(block_stop);
126 
127  chunk_info.next_chunk = new ChunkItem(file_item, cut_mark);
128  chunk_info.output_tag_chunk = atomic_xadd64(&tag_seq_, 1);
129  }
130  offset_in_block = cut_mark_in_block;
131  }
132  chunk_info.offset += offset_in_block;
133 
134  assert(input_block->size() >= offset_in_block);
135  const unsigned tail_size = input_block->size() - offset_in_block;
136  if (tail_size > 0) {
137  BlockItem *block_tail = new BlockItem(chunk_info.output_tag_chunk,
138  allocator_);
139  block_tail->SetFileItem(file_item);
140  block_tail->SetChunkItem(chunk_info.next_chunk);
141  block_tail->MakeDataCopy(input_block->data() + offset_in_block,
142  tail_size);
143  tubes_out_->Dispatch(block_tail);
144  chunk_info.offset += tail_size;
145  }
146 
147  // Delete data from incoming block
148  input_block->Reset();
149  }
150 
151  tag_map_.Insert(input_tag, chunk_info);
152  break;
153 
154  default:
155  PANIC(NULL);
156  }
157 
158  delete input_block;
159  if (output_block_bulk)
160  tubes_out_->Dispatch(output_block_bulk);
161 }
void MakeDataMove(BlockItem *other)
Definition: item.cc:163
void set_is_fully_chunked()
Definition: item.h:68
int64_t atomic_int64
Definition: atomic.h:18
ItemAllocator * allocator_
Definition: task_chunk.h:86
void SetChunkItem(ChunkItem *item)
Definition: item.cc:204
#define PANIC(...)
Definition: exception.h:29
void Reset()
Definition: item.cc:193
bool may_have_chunks()
Definition: item.h:63
assert((mem||(size==0))&&"Out Of Memory")
unsigned char * data()
Definition: item.h:211
void SetFileItem(FileItem *item)
Definition: item.cc:211
Xor32Detector * chunk_detector()
Definition: item.h:58
void MakeBulkChunk()
Definition: item.cc:82
virtual void Process(BlockItem *input_block)
Definition: task_chunk.cc:24
TagMap tag_map_
Definition: task_chunk.h:87
void set_size(uint64_t val)
Definition: item.h:147
int64_t output_tag_bulk
Definition: task_chunk.h:62
ChunkItem * next_chunk
Definition: task_chunk.h:67
ChunkItem * bulk_chunk
Definition: task_chunk.h:71
int64_t tag()
Definition: item.h:220
int64_t output_tag_chunk
Definition: task_chunk.h:58
static atomic_int64 tag_seq_
Definition: task_chunk.h:83
bool has_legacy_bulk_chunk()
Definition: item.h:64
FileItem * file_item()
Definition: item.h:221
uint32_t size()
Definition: item.h:213
Tube< ItemT >::Link * Dispatch(ItemT *item)
Definition: tube.h:268
void Insert(const Key &key, const Value &value)
Definition: smallhash.h:106
uint64_t FindNextCutMark(BlockItem *block)
Definition: item.h:34
uint64_t offset()
Definition: item.h:135
TubeGroup< BlockItem > * tubes_out_
Definition: task_chunk.h:85
uint64_t size()
Definition: item.h:57
bool Erase(const Key &key)
Definition: smallhash.h:112
BlockType type()
Definition: item.h:219
void MakeDataCopy(const unsigned char *data, uint32_t size)
Definition: item.cc:180
bool Lookup(const Key &key, Value *value) const
Definition: smallhash.h:70
void MakeStop()
Definition: item.cc:142
static void size_t size
Definition: smalloc.h:54