GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/item.h Lines: 60 60 100.0 %
Date: 2019-02-03 02:48:13 Branches: 20 31 64.5 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_INGESTION_ITEM_H_
6
#define CVMFS_INGESTION_ITEM_H_
7
8
#include <pthread.h>
9
#include <stdint.h>
10
11
#include <cassert>
12
#include <string>
13
#include <vector>
14
15
#include "atomic.h"
16
#include "compression.h"
17
#include "file_chunk.h"
18
#include "hash.h"
19
#include "ingestion/chunk_detector.h"
20
#include "ingestion/ingestion_source.h"
21
#include "util/pointer.h"
22
#include "util/single_copy.h"
23
24
namespace upload {
25
struct UploadStreamHandle;
26
}
27
28
class ItemAllocator;
29
30
/**
31
 * Carries the information necessary to compress and checksum a file. During
32
 * processing, the bulk chunk and the chunks_ vector are filled.
33
 */
34
class FileItem : SingleCopy {
35
 public:
36
  explicit FileItem(
37
    IngestionSource* source,
38
    uint64_t min_chunk_size = 4 * 1024 * 1024,
39
    uint64_t avg_chunk_size = 8 * 1024 * 1024,
40
    uint64_t max_chunk_size = 16 * 1024 * 1024,
41
    zlib::Algorithms compression_algorithm = zlib::kZlibDefault,
42
    shash::Algorithms hash_algorithm = shash::kSha1,
43
    shash::Suffix hash_suffix = shash::kSuffixNone,
44
    bool may_have_chunks = true,
45
    bool has_legacy_bulk_chunk = false);
46
  ~FileItem();
47
48
793
  static FileItem *CreateQuitBeacon() {
49
793
    std::string quit_marker = std::string(1, kQuitBeaconMarker);
50
793
    UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker));
51
793
    return new FileItem(source.Release());
52
  }
53
1097
  bool IsQuitBeacon() {
54



1097
    return (path().length() == 1) && (path()[0] == kQuitBeaconMarker);
55
  }
56
57
2339
  std::string path() { return source_->GetPath(); }
58
7030
  uint64_t size() { return size_; }
59
226493
  Xor32Detector *chunk_detector() { return &chunk_detector_; }
60
825
  shash::Any bulk_hash() { return bulk_hash_; }
61
6974
  zlib::Algorithms compression_algorithm() { return compression_algorithm_; }
62
6974
  shash::Algorithms hash_algorithm() { return hash_algorithm_; }
63
447
  shash::Suffix hash_suffix() { return hash_suffix_; }
64
313
  bool may_have_chunks() { return may_have_chunks_; }
65
197
  bool has_legacy_bulk_chunk() { return has_legacy_bulk_chunk_; }
66
67
160
  void set_size(uint64_t val) { size_ = val; }
68
100
  void set_may_have_chunks(bool val) { may_have_chunks_ = val; }
69
157
  void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); }
70
1820
  bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; }
71
154
  uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); }
72
73
300
  uint64_t GetNumChunks() { return chunks_.size(); }
74
149
  FileChunkList *GetChunksPtr() { return &chunks_; }
75
76
155
  bool Open() { return source_->Open(); }
77
216390
  ssize_t Read(void *buffer, size_t nbyte) {
78
216390
    return source_->Read(buffer, nbyte);
79
  }
80
155
  bool Close() { return source_->Close(); }
81
155
  bool GetSize(uint64_t *size) { return source_->GetSize(size); }
82
83
  // Called by ChunkItem constructor, decremented when a chunk is registered
84
6825
  void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); }
85
  void RegisterChunk(const FileChunk &file_chunk);
86
1814
  bool IsProcessed() {
87

1814
    return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0);
88
  }
89
90
 private:
91
  static const uint64_t kSizeUnknown = uint64_t(-1);
92
  static const char kQuitBeaconMarker = '\0';
93
94
  UniquePtr<IngestionSource> source_;
95
  const zlib::Algorithms compression_algorithm_;
96
  const shash::Algorithms hash_algorithm_;
97
  const shash::Suffix hash_suffix_;
98
  const bool has_legacy_bulk_chunk_;
99
100
  uint64_t size_;
101
  bool may_have_chunks_;
102
103
  Xor32Detector chunk_detector_;
104
  shash::Any bulk_hash_;
105
  FileChunkList chunks_;
106
  /**
107
   * Number of chunks created but not yet uploaded and registered
108
   */
109
  atomic_int64 nchunks_in_fly_;
110
  /**
111
   * Switches to true once all of the file has been through the chunking
112
   * stage
113
   */
114
  atomic_int32 is_fully_chunked_;
115
  pthread_mutex_t lock_;
116
};
117
118
119
/**
120
 * A chunk stores the state of compression and hashing contexts as the blocks
121
 * move through the pipeline.  A chunk can be a "bulk chunk" corresponding to
122
 * the processed data of an entire file, or it can be a partial chunk of a
123
 * (large) input file.
124
 */
125
6825
class ChunkItem : SingleCopy {
126
 public:
127
  ChunkItem(FileItem *file_item, uint64_t offset);
128
129
  void MakeBulkChunk();
130
21816
  bool IsSolePiece() {
131

21816
    return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size());
132
  }
133
134
20019
  bool is_bulk_chunk() { return is_bulk_chunk_; }
135
1815
  FileItem *file_item() { return file_item_; }
136
29932
  uint64_t offset() { return offset_; }
137
6817
  uint64_t size() { return size_; }
138
578818
  upload::UploadStreamHandle *upload_handle() { return upload_handle_; }
139
328438
  zlib::Compressor *compressor() { return compressor_.weak_ref(); }
140
546017
  shash::ContextPtr hash_ctx() { return hash_ctx_; }
141
5451
  shash::Any *hash_ptr() { return &hash_value_; }
142
143
6788
  void set_size(uint64_t val) { assert(size_ == 0); size_ = val; }
144
1814
  void set_upload_handle(upload::UploadStreamHandle *val) {
145

1814
    assert((upload_handle_ == NULL) && (val != NULL));
146
1814
    upload_handle_ = val;
147
1814
  }
148
149
 private:
150
  FileItem *file_item_;
151
  uint64_t offset_;
152
  /**
153
   * The size of a chunk is not defined before the corresponding stop block
154
   * has been dispatched.
155
   */
156
  uint64_t size_;
157
  bool is_bulk_chunk_;
158
  /**
159
   * Deleted by the uploader.
160
   */
161
  upload::UploadStreamHandle *upload_handle_;
162
  UniquePtr<zlib::Compressor> compressor_;
163
  shash::ContextPtr hash_ctx_;
164
  UniquePtr<void> hash_ctx_buffer_;
165
  shash::Any hash_value_;
166
};
167
168
169
/**
170
 * A block is an item of work in the pipeline.  A sequence of data blocks
171
 * followed by a stop block constitutes a Chunk.  A sequence of Chunks in turn
172
 * build constitute a file.
173
 * A block that carries data must have a non-zero-length payload.
174
 */
175
class BlockItem : SingleCopy {
176
 public:
177
  enum BlockType {
178
    kBlockHollow,
179
    kBlockData,
180
    kBlockStop,
181
  };
182
183
  explicit BlockItem(ItemAllocator *allocator);
184
  BlockItem(int64_t tag, ItemAllocator *allocator);
185
  ~BlockItem();
186
187
708
  static BlockItem *CreateQuitBeacon() {
188
708
    return new BlockItem(NULL);
189
  }
190
1680256
  bool IsQuitBeacon() {
191
1680256
    return type_ == kBlockHollow;
192
  }
193
194
  void MakeStop();
195
  void MakeData(uint32_t capacity);
196
  void MakeDataMove(BlockItem *other);
197
  void MakeDataCopy(const unsigned char *data, uint32_t size);
198
  void SetFileItem(FileItem *item);
199
  void SetChunkItem(ChunkItem *item);
200
  // Free data and reset to hollow block
201
  void Reset();
202
203
  uint32_t Write(void *buf, uint32_t count);
204
205
  bool IsEmpty() { return size_ == 0; }
206
1201134
  bool IsFull() { return size_ == capacity_; }
207
208
526701201
  unsigned char *data() { return data_; }
209
524891911
  uint32_t capacity() { return capacity_; }
210
4229115
  uint32_t size() { return size_; }
211
602216
  void set_size(uint32_t val) { assert(val <= capacity_); size_ = val; }
212
213
1802058
  BlockType type() { return type_; }
214
1448682
  int64_t tag() { return tag_; }
215
772477
  FileItem *file_item() { return file_item_; }
216
2392473
  ChunkItem *chunk_item() { return chunk_item_; }
217
6935
  static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); }
218
219
 private:
220
  // Forget pointer to the data
221
  void Discharge();
222
223
  /**
224
   * Total capacity of all BlockItem()
225
   */
226
  static atomic_int64 managed_bytes_;
227
228
  ItemAllocator *allocator_;
229
  BlockType type_;
230
231
  /**
232
   * Blocks with the same tag need to be processed sequentially.  That is, no
233
   * two threads of the same pipeline stage must operate on blocks of the same
234
   * tag.  The tags roughly correspond to chunks.
235
   * Tags can (and should) be set exactly once in the life time of a block.
236
   */
237
  int64_t tag_;
238
239
  /**
240
   * Can be set exactly once.
241
   */
242
  FileItem *file_item_;
243
  ChunkItem *chunk_item_;
244
245
  /**
246
   * Managed by ItemAllocator
247
   */
248
  unsigned char *data_;
249
  uint32_t capacity_;
250
  uint32_t size_;
251
};
252
253
#endif  // CVMFS_INGESTION_ITEM_H_