GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/pipeline.cc Lines: 132 137 96.4 %
Date: 2019-02-03 02:48:13 Branches: 37 59 62.7 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "cvmfs_config.h"
6
#include "pipeline.h"
7
8
#include <algorithm>
9
#include <cstdlib>
10
11
#include "ingestion/task_chunk.h"
12
#include "ingestion/task_compress.h"
13
#include "ingestion/task_hash.h"
14
#include "ingestion/task_read.h"
15
#include "ingestion/task_register.h"
16
#include "ingestion/task_write.h"
17
#include "platform.h"
18
#include "upload_facility.h"
19
#include "upload_spooler_definition.h"
20
#include "util/string.h"
21
#include "util_concurrency.h"
22
23
87
IngestionPipeline::IngestionPipeline(
24
  upload::AbstractUploader *uploader,
25
  const upload::SpoolerDefinition &spooler_definition)
26
  : compression_algorithm_(spooler_definition.compression_alg)
27
  , hash_algorithm_(spooler_definition.hash_algorithm)
28
  , generate_legacy_bulk_chunks_(spooler_definition.generate_legacy_bulk_chunks)
29
  , chunking_enabled_(spooler_definition.use_file_chunking)
30
  , minimal_chunk_size_(spooler_definition.min_file_chunk_size)
31
  , average_chunk_size_(spooler_definition.avg_file_chunk_size)
32
  , maximal_chunk_size_(spooler_definition.max_file_chunk_size)
33
  , spawned_(false)
34
  , uploader_(uploader)
35
87
  , tube_counter_(kMaxFilesInFlight)
36
{
37
87
  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
38
39
174
  for (unsigned i = 0; i < nfork_base * kNforkRegister; ++i) {
40
87
    Tube<FileItem> *tube = new Tube<FileItem>();
41
87
    tubes_register_.TakeTube(tube);
42
87
    TaskRegister *task = new TaskRegister(tube, &tube_counter_);
43
87
    task->RegisterListener(&IngestionPipeline::OnFileProcessed, this);
44
87
    tasks_register_.TakeConsumer(task);
45
  }
46
87
  tubes_register_.Activate();
47
48
174
  for (unsigned i = 0; i < nfork_base * kNforkWrite; ++i) {
49
87
    Tube<BlockItem> *t = new Tube<BlockItem>();
50
87
    tubes_write_.TakeTube(t);
51
87
    tasks_write_.TakeConsumer(new TaskWrite(t, &tubes_register_, uploader_));
52
  }
53
87
  tubes_write_.Activate();
54
55
261
  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
56
174
    Tube<BlockItem> *t = new Tube<BlockItem>();
57
174
    tubes_hash_.TakeTube(t);
58
174
    tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_write_));
59
  }
60
87
  tubes_hash_.Activate();
61
62
435
  for (unsigned i = 0; i < nfork_base * kNforkCompress; ++i) {
63
348
    Tube<BlockItem> *t = new Tube<BlockItem>();
64
348
    tubes_compress_.TakeTube(t);
65
    tasks_compress_.TakeConsumer(
66
348
      new TaskCompress(t, &tubes_hash_, &item_allocator_));
67
  }
68
87
  tubes_compress_.Activate();
69
70
174
  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
71
87
    Tube<BlockItem> *t = new Tube<BlockItem>();
72
87
    tubes_chunk_.TakeTube(t);
73
    tasks_chunk_.TakeConsumer(
74
87
      new TaskChunk(t, &tubes_compress_, &item_allocator_));
75
  }
76
87
  tubes_chunk_.Activate();
77
78
87
  uint64_t low = kMemLowWatermark;
79
87
  uint64_t high = kMemHighWatermark;
80
87
  char *fixed_limit_mb = getenv("_CVMFS_SERVER_PIPELINE_MB");
81
87
  if (fixed_limit_mb != NULL) {
82
    high = String2Uint64(fixed_limit_mb) * 1024 * 1024;
83
    low = (high * 2) / 3;
84
  }
85
783
  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
86
    TaskRead *task_read =
87
696
      new TaskRead(&tube_input_, &tubes_chunk_, &item_allocator_);
88
696
    task_read->SetWatermarks(low, high);
89
696
    tasks_read_.TakeConsumer(task_read);
90
  }
91
}
92
93
94
110
IngestionPipeline::~IngestionPipeline() {
95
87
  if (spawned_) {
96
87
    tasks_read_.Terminate();
97
87
    tasks_chunk_.Terminate();
98
87
    tasks_compress_.Terminate();
99
87
    tasks_hash_.Terminate();
100
87
    tasks_write_.Terminate();
101
87
    tasks_register_.Terminate();
102
  }
103

23
}
104
105
106
149
void IngestionPipeline::OnFileProcessed(
107
  const upload::SpoolerResult &spooler_result)
108
{
109
149
  NotifyListeners(spooler_result);
110
149
}
111
112
113
149
void IngestionPipeline::Process(
114
  IngestionSource* source,
115
  bool allow_chunking,
116
  shash::Suffix hash_suffix)
117
{
118
  FileItem *file_item = new FileItem(
119
    source,
120
    minimal_chunk_size_,
121
    average_chunk_size_,
122
    maximal_chunk_size_,
123
    compression_algorithm_,
124
    hash_algorithm_,
125
    hash_suffix,
126
    allow_chunking && chunking_enabled_,
127

149
    generate_legacy_bulk_chunks_);
128
149
  tube_counter_.Enqueue(file_item);
129
149
  tube_input_.Enqueue(file_item);
130
149
}
131
132
133
87
void IngestionPipeline::Spawn() {
134
87
  tasks_register_.Spawn();
135
87
  tasks_write_.Spawn();
136
87
  tasks_hash_.Spawn();
137
87
  tasks_compress_.Spawn();
138
87
  tasks_chunk_.Spawn();
139
87
  tasks_read_.Spawn();
140
87
  spawned_ = true;
141
87
}
142
143
144
102
void IngestionPipeline::WaitFor() {
145
102
  tube_counter_.Wait();
146
102
}
147
148
149
//------------------------------------------------------------------------------
150
151
152
1
void TaskScrubbingCallback::Process(BlockItem *block_item) {
153
1
  FileItem *file_item = block_item->file_item();
154
1
  assert(file_item != NULL);
155
1
  assert(!file_item->path().empty());
156
1
  ChunkItem *chunk_item = block_item->chunk_item();
157
1
  assert(chunk_item != NULL);
158
1
  assert(chunk_item->is_bulk_chunk());
159
160
1
  switch (block_item->type()) {
161
    case BlockItem::kBlockData:
162
      delete block_item;
163
      break;
164
165
    case BlockItem::kBlockStop:
166
1
      assert(!chunk_item->hash_ptr()->IsNull());
167
      NotifyListeners(ScrubbingResult(file_item->path(),
168
1
                                      *chunk_item->hash_ptr()));
169
1
      delete block_item;
170
1
      delete chunk_item;
171
1
      delete file_item;
172
1
      tube_counter_->Pop();
173
1
      break;
174
175
    default:
176
      abort();
177
  }
178
1
}
179
180
181
//------------------------------------------------------------------------------
182
183
184
1
ScrubbingPipeline::ScrubbingPipeline()
185
  : spawned_(false)
186
1
  , tube_counter_(kMaxFilesInFlight)
187
{
188
1
  unsigned nfork_base = std::max(1U, GetNumberOfCpuCores() / 8);
189
190
2
  for (unsigned i = 0; i < nfork_base * kNforkScrubbingCallback; ++i) {
191
1
    Tube<BlockItem> *tube = new Tube<BlockItem>();
192
1
    tubes_scrubbing_callback_.TakeTube(tube);
193
    TaskScrubbingCallback *task =
194
1
      new TaskScrubbingCallback(tube, &tube_counter_);
195
1
    task->RegisterListener(&ScrubbingPipeline::OnFileProcessed, this);
196
1
    tasks_scrubbing_callback_.TakeConsumer(task);
197
  }
198
1
  tubes_scrubbing_callback_.Activate();
199
200
3
  for (unsigned i = 0; i < nfork_base * kNforkHash; ++i) {
201
2
    Tube<BlockItem> *t = new Tube<BlockItem>();
202
2
    tubes_hash_.TakeTube(t);
203
2
    tasks_hash_.TakeConsumer(new TaskHash(t, &tubes_scrubbing_callback_));
204
  }
205
1
  tubes_hash_.Activate();
206
207
2
  for (unsigned i = 0; i < nfork_base * kNforkChunk; ++i) {
208
1
    Tube<BlockItem> *t = new Tube<BlockItem>();
209
1
    tubes_chunk_.TakeTube(t);
210
    tasks_chunk_.TakeConsumer(
211
1
      new TaskChunk(t, &tubes_hash_, &item_allocator_));
212
  }
213
1
  tubes_chunk_.Activate();
214
215
9
  for (unsigned i = 0; i < nfork_base * kNforkRead; ++i) {
216
    TaskRead *task_read =
217
8
      new TaskRead(&tube_input_, &tubes_chunk_, &item_allocator_);
218
8
    task_read->SetWatermarks(kMemLowWatermark, kMemHighWatermark);
219
8
    tasks_read_.TakeConsumer(task_read);
220
  }
221
}
222
223
224
2
ScrubbingPipeline::~ScrubbingPipeline() {
225
1
  if (spawned_) {
226
1
    tasks_read_.Terminate();
227
1
    tasks_chunk_.Terminate();
228
1
    tasks_hash_.Terminate();
229
1
    tasks_scrubbing_callback_.Terminate();
230
  }
231

1
}
232
233
234
1
void ScrubbingPipeline::OnFileProcessed(
235
  const ScrubbingResult &scrubbing_result)
236
{
237
1
  NotifyListeners(scrubbing_result);
238
1
}
239
240
241
1
void ScrubbingPipeline::Process(
242
  IngestionSource *source,
243
  shash::Algorithms hash_algorithm,
244
  shash::Suffix hash_suffix)
245
{
246
  FileItem *file_item = new FileItem(
247
    source,
248
    0, 0, 0,
249
    zlib::kNoCompression,
250
    hash_algorithm,
251
    hash_suffix,
252
    false,  /* may_have_chunks */
253
1
    true  /* hash_legacy_bulk_chunk */);
254
1
  tube_counter_.Enqueue(file_item);
255
1
  tube_input_.Enqueue(file_item);
256
1
}
257
258
259
1
void ScrubbingPipeline::Spawn() {
260
1
  tasks_scrubbing_callback_.Spawn();
261
1
  tasks_hash_.Spawn();
262
1
  tasks_chunk_.Spawn();
263
1
  tasks_read_.Spawn();
264
1
  spawned_ = true;
265
1
}
266
267
268
1
void ScrubbingPipeline::WaitFor() {
269
1
  tube_counter_.Wait();
270
1
}