1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#ifndef CVMFS_INGESTION_TASK_CHUNK_H_ |
6 |
|
|
#define CVMFS_INGESTION_TASK_CHUNK_H_ |
7 |
|
|
|
8 |
|
|
#include <stdint.h> |
9 |
|
|
|
10 |
|
|
#include <map> |
11 |
|
|
|
12 |
|
|
#include "atomic.h" |
13 |
|
|
#include "ingestion/item.h" |
14 |
|
|
#include "ingestion/task.h" |
15 |
|
|
#include "ingestion/tube.h" |
16 |
|
|
#include "util/posix.h" |
17 |
|
|
|
18 |
|
|
class ItemAllocator; |
19 |
|
|
|
20 |
✗✗✓ |
182 |
class TaskChunk : public TubeConsumer<BlockItem> { |
21 |
|
|
public: |
22 |
|
91 |
TaskChunk(Tube<BlockItem> *tube_in, |
23 |
|
|
TubeGroup<BlockItem> *tubes_out, |
24 |
|
|
ItemAllocator *allocator) |
25 |
|
|
: TubeConsumer<BlockItem>(tube_in) |
26 |
|
|
, tubes_out_(tubes_out) |
27 |
|
91 |
, allocator_(allocator) |
28 |
|
|
{ } |
29 |
|
|
|
30 |
|
|
protected: |
31 |
|
|
virtual void Process(BlockItem *input_block); |
32 |
|
|
|
33 |
|
|
private: |
34 |
|
|
/** |
35 |
|
|
* State of the chunk creation for a file |
36 |
|
|
*/ |
37 |
|
|
struct ChunkInfo { |
38 |
|
226549 |
ChunkInfo() |
39 |
|
|
: offset(0) |
40 |
|
|
, output_tag_chunk(-1) |
41 |
|
|
, output_tag_bulk(-1) |
42 |
|
|
, next_chunk(NULL) |
43 |
|
226549 |
, bulk_chunk(NULL) |
44 |
|
226549 |
{ } |
45 |
|
|
/** |
46 |
|
|
* Sum of input block size of the file that has been processed so far |
47 |
|
|
*/ |
48 |
|
|
uint64_t offset; |
49 |
|
|
/** |
50 |
|
|
* Blocks of the current regular chunk get tagged consistently |
51 |
|
|
*/ |
52 |
|
|
int64_t output_tag_chunk; |
53 |
|
|
/** |
54 |
|
|
* Blocks of the corresponding bulk chunk get a unique tag |
55 |
|
|
*/ |
56 |
|
|
int64_t output_tag_bulk; |
57 |
|
|
/** |
58 |
|
|
* The current regular chunk that corresponds to the output block stream; |
59 |
|
|
* may be NULL. |
60 |
|
|
*/ |
61 |
|
|
ChunkItem *next_chunk; |
62 |
|
|
/** |
63 |
|
|
* The whole file chunk attached to the file; may be NULL |
64 |
|
|
*/ |
65 |
|
|
ChunkItem *bulk_chunk; |
66 |
|
|
}; |
67 |
|
|
|
68 |
|
|
/** |
69 |
|
|
* Maps input block tag (hence: file) to the state information on chunks. |
70 |
|
|
*/ |
71 |
|
|
typedef std::map<int64_t, ChunkInfo> TagMap; |
72 |
|
|
|
73 |
|
|
/** |
74 |
|
|
* Every new chunk increases the tag sequence counter that is used to annotate |
75 |
|
|
* BlockItems. |
76 |
|
|
*/ |
77 |
|
|
static atomic_int64 tag_seq_; |
78 |
|
|
|
79 |
|
|
TubeGroup<BlockItem> *tubes_out_; |
80 |
|
|
ItemAllocator *allocator_; |
81 |
|
|
TagMap tag_map_; |
82 |
|
|
}; |
83 |
|
|
|
84 |
|
|
#endif // CVMFS_INGESTION_TASK_CHUNK_H_ |