1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System. |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#include "cvmfs_config.h" |
6 |
|
|
#include "pipeline.h" |
7 |
|
|
|
8 |
|
|
#include <algorithm> |
9 |
|
|
#include <cstdlib> |
10 |
|
|
|
11 |
|
|
#include "ingestion/task_chunk.h" |
12 |
|
|
#include "ingestion/task_compress.h" |
13 |
|
|
#include "ingestion/task_hash.h" |
14 |
|
|
#include "ingestion/task_read.h" |
15 |
|
|
#include "ingestion/task_register.h" |
16 |
|
|
#include "ingestion/task_write.h" |
17 |
|
|
#include "platform.h" |
18 |
|
|
#include "upload_facility.h" |
19 |
|
|
#include "upload_spooler_definition.h" |
20 |
|
|
#include "util/string.h" |
21 |
|
|
#include "util_concurrency.h" |
22 |
|
|
|
23 |
|
87 |
IngestionPipeline::IngestionPipeline( |
24 |
|
|
upload::AbstractUploader *uploader, |
25 |
|
|
const upload::SpoolerDefinition &spooler_definition) |
26 |
|
|
: compression_algorithm_(spooler_definition.compression_alg) |
27 |
|
|
, hash_algorithm_(spooler_definition.hash_algorithm) |
28 |
|
|
, generate_legacy_bulk_chunks_(spooler_definition.generate_legacy_bulk_chunks) |
29 |
|
|
, chunking_enabled_(spooler_definition.use_file_chunking) |
30 |
|
|
, minimal_chunk_size_(spooler_definition.min_file_chunk_size) |
31 |
|
|
, average_chunk_size_(spooler_definition.avg_file_chunk_size) |
32 |
|
|
, maximal_chunk_size_(spooler_definition.max_file_chunk_size) |
33 |
|
|
, spawned_(false) |
34 |
|
|
, uploader_(uploader) |
35 |
|
87 |
, tube_counter_(kMaxFilesInFlight) |
36 |
|
|
{ |
37 |
|
87 |
unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
38 |
|
|
|
39 |
✓✓ |
174 |
for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) { |
40 |
|
87 |
Tube<FileItem> *tube = new Tube<FileItem>(); |
41 |
|
87 |
tubes_register_.TakeTube(tube); |
42 |
|
87 |
TaskRegister *task = new TaskRegister(tube, &tube_counter_); |
43 |
|
87 |
task->RegisterListener(&IngestionPipeline::OnFileProcessed, this); |
44 |
|
87 |
tasks_register_.TakeConsumer(task); |
45 |
|
|
} |
46 |
|
87 |
tubes_register_.Activate(); |
47 |
|
|
|
48 |
✓✓ |
174 |
for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) { |
49 |
|
87 |
Tube<BlockItem> *t = new Tube<BlockItem>(); |
50 |
|
87 |
tubes_write_.TakeTube(t); |
51 |
|
87 |
tasks_write_.TakeConsumer(new TaskWrite(t, &tubes_register_, uploader_)); |
52 |
|
|
} |
53 |
|
87 |
tubes_write_.Activate(); |
54 |
|
|
|
55 |
✓✓ |
261 |
for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
56 |
|
174 |
Tube<BlockItem> *t = new Tube<BlockItem>(); |
57 |
|
174 |
tubes_hash_.TakeTube(t); |
58 |
|
174 |
tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_write_)); |
59 |
|
|
} |
60 |
|
87 |
tubes_hash_.Activate(); |
61 |
|
|
|
62 |
✓✓ |
435 |
for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) { |
63 |
|
348 |
Tube<BlockItem> *t = new Tube<BlockItem>(); |
64 |
|
348 |
tubes_compress_.TakeTube(t); |
65 |
|
|
tasks_compress_.TakeConsumer( |
66 |
|
348 |
new TaskCompress(t, &tubes_hash_, &item_allocator_)); |
67 |
|
|
} |
68 |
|
87 |
tubes_compress_.Activate(); |
69 |
|
|
|
70 |
✓✓ |
174 |
for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
71 |
|
87 |
Tube<BlockItem> *t = new Tube<BlockItem>(); |
72 |
|
87 |
tubes_chunk_.TakeTube(t); |
73 |
|
|
tasks_chunk_.TakeConsumer( |
74 |
|
87 |
new TaskChunk(t, &tubes_compress_, &item_allocator_)); |
75 |
|
|
} |
76 |
|
87 |
tubes_chunk_.Activate(); |
77 |
|
|
|
78 |
|
87 |
uint64_t low = kMemLowWatermark; |
79 |
|
87 |
uint64_t high = kMemHighWatermark; |
80 |
|
87 |
char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB"); |
81 |
✗✓ |
87 |
if (fixed_limit_mb != NULL) { |
82 |
|
|
high = String2Uint64(fixed_limit_mb) * 1024 * 1024; |
83 |
|
|
low = (high * 2) / 3; |
84 |
|
|
} |
85 |
✓✓ |
783 |
for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
86 |
|
|
TaskRead *task_read = |
87 |
|
696 |
new TaskRead(&tube_input_, &tubes_chunk_, &item_allocator_); |
88 |
|
696 |
task_read->SetWatermarks(low, high); |
89 |
|
696 |
tasks_read_.TakeConsumer(task_read); |
90 |
|
|
} |
91 |
|
|
} |
92 |
|
|
|
93 |
|
|
|
94 |
|
110 |
IngestionPipeline::~IngestionPipeline() { |
95 |
✓✗ |
87 |
if (spawned_) { |
96 |
|
87 |
tasks_read_.Terminate(); |
97 |
|
87 |
tasks_chunk_.Terminate(); |
98 |
|
87 |
tasks_compress_.Terminate(); |
99 |
|
87 |
tasks_hash_.Terminate(); |
100 |
|
87 |
tasks_write_.Terminate(); |
101 |
|
87 |
tasks_register_.Terminate(); |
102 |
|
|
} |
103 |
✗✗✗✓
|
23 |
} |
104 |
|
|
|
105 |
|
|
|
106 |
|
149 |
void IngestionPipeline::OnFileProcessed( |
107 |
|
|
const upload::SpoolerResult &spooler_result) |
108 |
|
|
{ |
109 |
|
149 |
NotifyListeners(spooler_result); |
110 |
|
149 |
} |
111 |
|
|
|
112 |
|
|
|
113 |
|
149 |
void IngestionPipeline::Process( |
114 |
|
|
IngestionSource* source, |
115 |
|
|
bool allow_chunking, |
116 |
|
|
shash::Suffix hash_suffix) |
117 |
|
|
{ |
118 |
|
|
FileItem *file_item = new FileItem( |
119 |
|
|
source, |
120 |
|
|
minimal_chunk_size_, |
121 |
|
|
average_chunk_size_, |
122 |
|
|
maximal_chunk_size_, |
123 |
|
|
compression_algorithm_, |
124 |
|
|
hash_algorithm_, |
125 |
|
|
hash_suffix, |
126 |
|
|
allow_chunking && chunking_enabled_, |
127 |
✓✓✓✗
|
149 |
generate_legacy_bulk_chunks_); |
128 |
|
149 |
tube_counter_.Enqueue(file_item); |
129 |
|
149 |
tube_input_.Enqueue(file_item); |
130 |
|
149 |
} |
131 |
|
|
|
132 |
|
|
|
133 |
|
87 |
void IngestionPipeline::Spawn() { |
134 |
|
87 |
tasks_register_.Spawn(); |
135 |
|
87 |
tasks_write_.Spawn(); |
136 |
|
87 |
tasks_hash_.Spawn(); |
137 |
|
87 |
tasks_compress_.Spawn(); |
138 |
|
87 |
tasks_chunk_.Spawn(); |
139 |
|
87 |
tasks_read_.Spawn(); |
140 |
|
87 |
spawned_ = true; |
141 |
|
87 |
} |
142 |
|
|
|
143 |
|
|
|
144 |
|
102 |
void IngestionPipeline::WaitFor() { |
145 |
|
102 |
tube_counter_.Wait(); |
146 |
|
102 |
} |
147 |
|
|
|
148 |
|
|
|
149 |
|
|
//------------------------------------------------------------------------------ |
150 |
|
|
|
151 |
|
|
|
152 |
|
1 |
void TaskScrubbingCallback::Process(BlockItem *block_item) { |
153 |
|
1 |
FileItem *file_item = block_item->file_item(); |
154 |
✗✓ |
1 |
assert(file_item != NULL); |
155 |
✗✓ |
1 |
assert(!file_item->path().empty()); |
156 |
|
1 |
ChunkItem *chunk_item = block_item->chunk_item(); |
157 |
✗✓ |
1 |
assert(chunk_item != NULL); |
158 |
✗✓ |
1 |
assert(chunk_item->is_bulk_chunk()); |
159 |
|
|
|
160 |
✗✓✗ |
1 |
switch (block_item->type()) { |
161 |
|
|
case BlockItem::kBlockData: |
162 |
|
|
delete block_item; |
163 |
|
|
break; |
164 |
|
|
|
165 |
|
|
case BlockItem::kBlockStop: |
166 |
✗✓ |
1 |
assert(!chunk_item->hash_ptr()->IsNull()); |
167 |
|
|
NotifyListeners(ScrubbingResult(file_item->path(), |
168 |
|
1 |
*chunk_item->hash_ptr())); |
169 |
✓✗ |
1 |
delete block_item; |
170 |
✓✗ |
1 |
delete chunk_item; |
171 |
✓✗ |
1 |
delete file_item; |
172 |
|
1 |
tube_counter_->Pop(); |
173 |
|
1 |
break; |
174 |
|
|
|
175 |
|
|
default: |
176 |
|
|
abort(); |
177 |
|
|
} |
178 |
|
1 |
} |
179 |
|
|
|
180 |
|
|
|
181 |
|
|
//------------------------------------------------------------------------------ |
182 |
|
|
|
183 |
|
|
|
184 |
|
1 |
ScrubbingPipeline::ScrubbingPipeline() |
185 |
|
|
: spawned_(false) |
186 |
|
1 |
, tube_counter_(kMaxFilesInFlight) |
187 |
|
|
{ |
188 |
|
1 |
unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8); |
189 |
|
|
|
190 |
✓✓ |
2 |
for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) { |
191 |
|
1 |
Tube<BlockItem> *tube = new Tube<BlockItem>(); |
192 |
|
1 |
tubes_scrubbing_callback_.TakeTube(tube); |
193 |
|
|
TaskScrubbingCallback *task = |
194 |
|
1 |
new TaskScrubbingCallback(tube, &tube_counter_); |
195 |
|
1 |
task->RegisterListener(&ScrubbingPipeline::OnFileProcessed, this); |
196 |
|
1 |
tasks_scrubbing_callback_.TakeConsumer(task); |
197 |
|
|
} |
198 |
|
1 |
tubes_scrubbing_callback_.Activate(); |
199 |
|
|
|
200 |
✓✓ |
3 |
for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) { |
201 |
|
2 |
Tube<BlockItem> *t = new Tube<BlockItem>(); |
202 |
|
2 |
tubes_hash_.TakeTube(t); |
203 |
|
2 |
tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_scrubbing_callback_)); |
204 |
|
|
} |
205 |
|
1 |
tubes_hash_.Activate(); |
206 |
|
|
|
207 |
✓✓ |
2 |
for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) { |
208 |
|
1 |
Tube<BlockItem> *t = new Tube<BlockItem>(); |
209 |
|
1 |
tubes_chunk_.TakeTube(t); |
210 |
|
|
tasks_chunk_.TakeConsumer( |
211 |
|
1 |
new TaskChunk(t, &tubes_hash_, &item_allocator_)); |
212 |
|
|
} |
213 |
|
1 |
tubes_chunk_.Activate(); |
214 |
|
|
|
215 |
✓✓ |
9 |
for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) { |
216 |
|
|
TaskRead *task_read = |
217 |
|
8 |
new TaskRead(&tube_input_, &tubes_chunk_, &item_allocator_); |
218 |
|
8 |
task_read->SetWatermarks(kMemLowWatermark, kMemHighWatermark); |
219 |
|
8 |
tasks_read_.TakeConsumer(task_read); |
220 |
|
|
} |
221 |
|
|
} |
222 |
|
|
|
223 |
|
|
|
224 |
|
2 |
ScrubbingPipeline::~ScrubbingPipeline() { |
225 |
✓✗ |
1 |
if (spawned_) { |
226 |
|
1 |
tasks_read_.Terminate(); |
227 |
|
1 |
tasks_chunk_.Terminate(); |
228 |
|
1 |
tasks_hash_.Terminate(); |
229 |
|
1 |
tasks_scrubbing_callback_.Terminate(); |
230 |
|
|
} |
231 |
✗✗✗✓
|
1 |
} |
232 |
|
|
|
233 |
|
|
|
234 |
|
1 |
void ScrubbingPipeline::OnFileProcessed( |
235 |
|
|
const ScrubbingResult &scrubbing_result) |
236 |
|
|
{ |
237 |
|
1 |
NotifyListeners(scrubbing_result); |
238 |
|
1 |
} |
239 |
|
|
|
240 |
|
|
|
241 |
|
1 |
void ScrubbingPipeline::Process( |
242 |
|
|
IngestionSource *source, |
243 |
|
|
shash::Algorithms hash_algorithm, |
244 |
|
|
shash::Suffix hash_suffix) |
245 |
|
|
{ |
246 |
|
|
FileItem *file_item = new FileItem( |
247 |
|
|
source, |
248 |
|
|
0, 0, 0, |
249 |
|
|
zlib::kNoCompression, |
250 |
|
|
hash_algorithm, |
251 |
|
|
hash_suffix, |
252 |
|
|
false, /* may_have_chunks */ |
253 |
|
1 |
true /* hash_legacy_bulk_chunk */); |
254 |
|
1 |
tube_counter_.Enqueue(file_item); |
255 |
|
1 |
tube_input_.Enqueue(file_item); |
256 |
|
1 |
} |
257 |
|
|
|
258 |
|
|
|
259 |
|
1 |
void ScrubbingPipeline::Spawn() { |
260 |
|
1 |
tasks_scrubbing_callback_.Spawn(); |
261 |
|
1 |
tasks_hash_.Spawn(); |
262 |
|
1 |
tasks_chunk_.Spawn(); |
263 |
|
1 |
tasks_read_.Spawn(); |
264 |
|
1 |
spawned_ = true; |
265 |
|
1 |
} |
266 |
|
|
|
267 |
|
|
|
268 |
|
1 |
void ScrubbingPipeline::WaitFor() { |
269 |
|
1 |
tube_counter_.Wait(); |
270 |
|
1 |
} |