GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/pipeline.h Lines: 8 8 100.0 %
Date: 2019-02-03 02:48:13 Branches: 2 5 40.0 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_INGESTION_PIPELINE_H_
6
#define CVMFS_INGESTION_PIPELINE_H_
7
8
#include <string>
9
10
#include "compression.h"
11
#include "hash.h"
12
#include "ingestion/item.h"
13
#include "ingestion/item_mem.h"
14
#include "ingestion/task.h"
15
#include "ingestion/tube.h"
16
#include "upload_spooler_result.h"
17
#include "util_concurrency.h"
18
19
namespace upload {
20
class AbstractUploader;
21
struct SpoolerDefinition;
22
}
23
24
class IngestionPipeline : public Observable<upload::SpoolerResult> {
25
 public:
26
  explicit IngestionPipeline(
27
    upload::AbstractUploader *uploader,
28
    const upload::SpoolerDefinition &spooler_definition);
29
  ~IngestionPipeline();
30
31
  void Spawn();
32
  void Process(IngestionSource* source, bool allow_chunking,
33
               shash::Suffix hash_suffix = shash::kSuffixNone);
34
  void WaitFor();
35
36
  void OnFileProcessed(const upload::SpoolerResult &spooler_result);
37
38
 private:
39
  static const uint64_t kMemLowWatermark = 786 * 1024 * 1024;
40
  static const uint64_t kMemHighWatermark = 1024 * 1024 * 1024;
41
  static const unsigned kMaxFilesInFlight = 8000;
42
  static const unsigned kNforkRegister = 1;
43
  static const unsigned kNforkWrite = 1;
44
  static const unsigned kNforkHash = 2;
45
  static const unsigned kNforkCompress = 4;
46
  static const unsigned kNforkChunk = 1;
47
  static const unsigned kNforkRead = 8;
48
49
  const zlib::Algorithms compression_algorithm_;
50
  const shash::Algorithms hash_algorithm_;
51
  const bool generate_legacy_bulk_chunks_;
52
  const bool chunking_enabled_;
53
  const size_t minimal_chunk_size_;
54
  const size_t average_chunk_size_;
55
  const size_t maximal_chunk_size_;
56
57
  bool spawned_;
58
  upload::AbstractUploader *uploader_;
59
  // TODO(jblomer): a semaphore would be faster!
60
  Tube<FileItem> tube_counter_;
61
  Tube<FileItem> tube_input_;
62
63
  TubeConsumerGroup<FileItem> tasks_read_;
64
65
  TubeGroup<BlockItem> tubes_chunk_;
66
  TubeConsumerGroup<BlockItem> tasks_chunk_;
67
68
  TubeGroup<BlockItem> tubes_compress_;
69
  TubeConsumerGroup<BlockItem> tasks_compress_;
70
71
  TubeGroup<BlockItem> tubes_hash_;
72
  TubeConsumerGroup<BlockItem> tasks_hash_;
73
74
  TubeGroup<BlockItem> tubes_write_;
75
  TubeConsumerGroup<BlockItem> tasks_write_;
76
77
  TubeGroup<FileItem> tubes_register_;
78
  TubeConsumerGroup<FileItem> tasks_register_;
79
80
  ItemAllocator item_allocator_;
81
};  // class IngestionPipeline
82
83
84
3
struct ScrubbingResult {
85
1
  ScrubbingResult() { }
86
1
  ScrubbingResult(const std::string &p, const shash::Any &h)
87
1
    : path(p), hash(h) { }
88
  std::string path;
89
  shash::Any hash;
90
};
91
92
93
class TaskScrubbingCallback
94
  : public TubeConsumer<BlockItem>
95
  , public Observable<ScrubbingResult>
96
2
{
97
 public:
98
1
  TaskScrubbingCallback(Tube<BlockItem> *tube_in,
99
                        Tube<FileItem> *tube_counter)
100
    : TubeConsumer<BlockItem>(tube_in)
101
1
    , tube_counter_(tube_counter)
102
1
  { }
103
104
 protected:
105
  virtual void Process(BlockItem *block_item);
106
107
 private:
108
  Tube<FileItem> *tube_counter_;
109
};
110
111
112
class ScrubbingPipeline : public Observable<ScrubbingResult> {
113
 public:
114
  ScrubbingPipeline();
115
  ~ScrubbingPipeline();
116
117
  void Spawn();
118
  void Process(IngestionSource* source,
119
               shash::Algorithms hash_algorithm,
120
               shash::Suffix hash_suffix);
121
  void WaitFor();
122
123
  void OnFileProcessed(const ScrubbingResult &scrubbing_result);
124
125
 private:
126
  static const uint64_t kMemLowWatermark = 384 * 1024 * 1024;
127
  static const uint64_t kMemHighWatermark = 512 * 1024 * 1024;
128
  static const unsigned kMaxFilesInFlight = 8000;
129
  static const unsigned kNforkScrubbingCallback = 1;
130
  static const unsigned kNforkHash = 2;
131
  static const unsigned kNforkChunk = 1;
132
  static const unsigned kNforkRead = 8;
133
134
  bool spawned_;
135
  Tube<FileItem> tube_input_;
136
  // TODO(jblomer): a semaphore would be faster!
137
  Tube<FileItem> tube_counter_;
138
139
  TubeConsumerGroup<FileItem> tasks_read_;
140
141
  TubeGroup<BlockItem> tubes_chunk_;
142
  TubeConsumerGroup<BlockItem> tasks_chunk_;
143
144
  TubeGroup<BlockItem> tubes_hash_;
145
  TubeConsumerGroup<BlockItem> tasks_hash_;
146
147
  TubeGroup<BlockItem> tubes_scrubbing_callback_;
148
  TubeConsumerGroup<BlockItem> tasks_scrubbing_callback_;
149
150
  ItemAllocator item_allocator_;
151
};
152
153
#endif  // CVMFS_INGESTION_PIPELINE_H_