CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
item.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_ITEM_H_
6 #define CVMFS_INGESTION_ITEM_H_
7 
8 #include <pthread.h>
9 #include <stdint.h>
10 
11 #include <cassert>
12 #include <string>
13 #include <vector>
14 
16 #include "crypto/hash.h"
17 #include "file_chunk.h"
20 #include "util/atomic.h"
21 #include "util/pointer.h"
22 #include "util/single_copy.h"
23 
24 namespace upload {
25 struct UploadStreamHandle;
26 }
27 
28 class ItemAllocator;
29 
35  public:
37  uint64_t min_chunk_size = 4 * 1024 * 1024,
38  uint64_t avg_chunk_size = 8 * 1024 * 1024,
39  uint64_t max_chunk_size = 16 * 1024 * 1024,
43  bool may_have_chunks = true,
44  bool has_legacy_bulk_chunk = false);
45  ~FileItem();
46 
48  std::string quit_marker = std::string(1, kQuitBeaconMarker);
50  return new FileItem(source.Release());
51  }
52  bool IsQuitBeacon() {
53  return (path().length() == 1) && (path()[0] == kQuitBeaconMarker);
54  }
55 
56  std::string path() { return source_->GetPath(); }
57  uint64_t size() { return size_; }
63  bool may_have_chunks() { return may_have_chunks_; }
65 
66  void set_size(uint64_t val) { size_ = val; }
67  void set_may_have_chunks(bool val) { may_have_chunks_ = val; }
68  void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); }
69  bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; }
70  uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); }
71 
72  uint64_t GetNumChunks() { return chunks_.size(); }
74 
75  bool Open() { return source_->Open(); }
76  ssize_t Read(void *buffer, size_t nbyte) {
77  return source_->Read(buffer, nbyte);
78  }
79  bool Close() { return source_->Close(); }
80  bool GetSize(uint64_t *size) { return source_->GetSize(size); }
81 
82  // Called by ChunkItem constructor, decremented when a chunk is registered
83  void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); }
84  void RegisterChunk(const FileChunk &file_chunk);
85  bool IsProcessed() {
86  return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0);
87  }
88 
89  private:
90  static const uint64_t kSizeUnknown = uint64_t(-1);
91  static const char kQuitBeaconMarker = '\0';
92 
98 
99  uint64_t size_;
101 
114  pthread_mutex_t lock_;
115 };
116 
117 
125  public:
126  ChunkItem(FileItem *file_item, uint64_t offset);
127 
128  void MakeBulkChunk();
129  bool IsSolePiece() {
130  return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size());
131  }
132 
133  bool is_bulk_chunk() { return is_bulk_chunk_; }
135  uint64_t offset() { return offset_; }
136  uint64_t size() { return size_; }
138  // An active zlib compression stream requires 256kB of memory. Therefore,
139  // we create it only for the absolutely necessary duration and free the space
140  // afterwards.
142  void ReleaseCompressor();
143 
146 
147  void set_size(uint64_t val) {
148  assert(size_ == 0);
149  size_ = val;
150  }
152  assert((upload_handle_ == NULL) && (val != NULL));
153  upload_handle_ = val;
154  }
155 
156  private:
158  uint64_t offset_;
163  uint64_t size_;
173 };
174 
175 
183  public:
184  enum BlockType {
188  };
189 
190  explicit BlockItem(ItemAllocator *allocator);
191  BlockItem(int64_t tag, ItemAllocator *allocator);
192  ~BlockItem();
193 
194  static BlockItem *CreateQuitBeacon() { return new BlockItem(NULL); }
195  bool IsQuitBeacon() { return type_ == kBlockHollow; }
196 
197  void MakeStop();
198  void MakeData(uint32_t capacity);
199  void MakeDataMove(BlockItem *other);
200  void MakeDataCopy(const unsigned char *data, uint32_t size);
201  void SetFileItem(FileItem *item);
202  void SetChunkItem(ChunkItem *item);
203  // Free data and reset to hollow block
204  void Reset();
205 
206  uint32_t Write(void *buf, uint32_t count);
207 
208  bool IsEmpty() { return size_ == 0; }
209  bool IsFull() { return size_ == capacity_; }
210 
211  unsigned char *data() { return data_; }
212  uint32_t capacity() { return capacity_; }
213  uint32_t size() { return size_; }
214  void set_size(uint32_t val) {
215  assert(val <= capacity_);
216  size_ = val;
217  }
218 
219  BlockType type() { return type_; }
220  int64_t tag() { return tag_; }
223  static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); }
224 
225  private:
230 
231  // Forget pointer to the data
232  void Discharge();
233 
236 
243  int64_t tag_;
244 
250 
254  unsigned char *data_;
255  uint32_t capacity_;
256  uint32_t size_;
257 };
258 
259 #endif // CVMFS_INGESTION_ITEM_H_
FileItem * file_item()
Definition: item.h:134
unsigned char * data_
Definition: item.h:254
const bool has_legacy_bulk_chunk_
Definition: item.h:97
const unsigned kMaxContextSize
Definition: hash.h:76
void RegisterChunk(const FileChunk &file_chunk)
Definition: item.cc:44
void MakeDataMove(BlockItem *other)
Definition: item.cc:163
void set_is_fully_chunked()
Definition: item.h:68
int64_t atomic_int64
Definition: atomic.h:18
uint32_t size_
Definition: item.h:256
void Discharge()
Definition: item.cc:136
uint64_t size_
Definition: item.h:163
bool GetSize(uint64_t *size)
Definition: item.h:80
shash::ContextPtr hash_ctx_
Definition: item.h:170
bool IsSolePiece()
Definition: item.h:129
void SetChunkItem(ChunkItem *item)
Definition: item.cc:204
FileItem(IngestionSource *source, uint64_t min_chunk_size=4 *1024 *1024, uint64_t avg_chunk_size=8 *1024 *1024, uint64_t max_chunk_size=16 *1024 *1024, zlib::Algorithms compression_algorithm=zlib::kZlibDefault, shash::Algorithms hash_algorithm=shash::kSha1, shash::Suffix hash_suffix=shash::kSuffixNone, bool may_have_chunks=true, bool has_legacy_bulk_chunk=false)
Definition: item.cc:17
CVMFS_EXPORT const LogSource source
Definition: exception.h:33
shash::Algorithms hash_algorithm()
Definition: item.h:61
unsigned char hash_ctx_buffer_[shash::kMaxContextSize]
Definition: item.h:172
zlib::Compressor * GetCompressor()
Definition: item.cc:88
void Reset()
Definition: item.cc:193
bool IsQuitBeacon()
Definition: item.h:52
ChunkItem * chunk_item_
Definition: item.h:249
pthread_mutex_t lock_
Definition: item.h:114
shash::Any hash_value_
Definition: item.h:171
bool may_have_chunks()
Definition: item.h:63
BlockItem(ItemAllocator *allocator)
Definition: item.cc:105
assert((mem||(size==0))&&"Out Of Memory")
unsigned char * data()
Definition: item.h:211
void SetFileItem(FileItem *item)
Definition: item.cc:211
Xor32Detector * chunk_detector()
Definition: item.h:58
uint32_t capacity()
Definition: item.h:212
bool is_bulk_chunk_
Definition: item.h:164
uint64_t GetNumChunks()
Definition: item.h:72
bool IsProcessed()
Definition: item.h:85
void MakeBulkChunk()
Definition: item.cc:82
void set_size(uint64_t val)
Definition: item.h:66
void ReleaseCompressor()
Definition: item.cc:97
ChunkItem(FileItem *file_item, uint64_t offset)
Definition: item.cc:65
shash::Any bulk_hash()
Definition: item.h:59
void set_size(uint64_t val)
Definition: item.h:147
bool IsQuitBeacon()
Definition: item.h:195
Xor32Detector chunk_detector_
Definition: item.h:102
Algorithms
Definition: hash.h:41
upload::UploadStreamHandle * upload_handle_
Definition: item.h:168
int32_t atomic_int32
Definition: atomic.h:17
int64_t tag()
Definition: item.h:220
FileChunkList * GetChunksPtr()
Definition: item.h:73
FileChunkList chunks_
Definition: item.h:104
const shash::Suffix hash_suffix_
Definition: item.h:96
Algorithms
Definition: compression.h:44
uint64_t nchunks_in_fly()
Definition: item.h:70
bool is_fully_chunked()
Definition: item.h:69
static FileItem * CreateQuitBeacon()
Definition: item.h:47
BlockType
Definition: item.h:184
void set_size(uint32_t val)
Definition: item.h:214
bool may_have_chunks_
Definition: item.h:100
const shash::Algorithms hash_algorithm_
Definition: item.h:95
bool is_bulk_chunk()
Definition: item.h:133
int64_t tag_
Definition: item.h:243
upload::UploadStreamHandle * upload_handle()
Definition: item.h:137
ChunkItem * chunk_item()
Definition: item.h:222
bool has_legacy_bulk_chunk()
Definition: item.h:64
void set_may_have_chunks(bool val)
Definition: item.h:67
FileItem * file_item()
Definition: item.h:221
const zlib::Algorithms compression_algorithm_
Definition: item.h:94
uint32_t capacity_
Definition: item.h:255
uint32_t size()
Definition: item.h:213
void MakeData(uint32_t capacity)
Definition: item.cc:148
UniquePtr< IngestionSource > source_
Definition: item.h:93
~BlockItem()
Definition: item.cc:129
Definition: item.h:34
atomic_int64 nchunks_in_fly_
Definition: item.h:108
shash::ContextPtr hash_ctx()
Definition: item.h:144
BlockType type_
Definition: item.h:235
uint64_t offset()
Definition: item.h:135
void IncNchunksInFly()
Definition: item.h:83
uint64_t size()
Definition: item.h:57
shash::Suffix hash_suffix()
Definition: item.h:62
char Suffix
Definition: hash.h:111
std::string path()
Definition: item.h:56
uint64_t offset_
Definition: item.h:158
bool IsEmpty()
Definition: item.h:208
static atomic_int64 managed_bytes_
Definition: item.h:229
bool Open()
Definition: item.h:75
bool IsFull()
Definition: item.h:209
FileItem * file_item_
Definition: item.h:248
uint64_t size_
Definition: item.h:99
zlib::Algorithms compression_algorithm()
Definition: item.h:60
static const uint64_t kSizeUnknown
Definition: item.h:90
ItemAllocator * allocator_
Definition: item.h:234
atomic_int32 is_fully_chunked_
Definition: item.h:113
bool Close()
Definition: item.h:79
void set_upload_handle(upload::UploadStreamHandle *val)
Definition: item.h:151
static BlockItem * CreateQuitBeacon()
Definition: item.h:194
BlockType type()
Definition: item.h:219
ssize_t Read(void *buffer, size_t nbyte)
Definition: item.h:76
~FileItem()
Definition: item.cc:42
void MakeDataCopy(const unsigned char *data, uint32_t size)
Definition: item.cc:180
uint32_t Write(void *buf, uint32_t count)
Definition: item.cc:218
shash::Any bulk_hash_
Definition: item.h:103
static const char kQuitBeaconMarker
Definition: item.h:91
void MakeStop()
Definition: item.cc:142
uint64_t size()
Definition: item.h:136
const char kSuffixNone
Definition: hash.h:53
UniquePtr< zlib::Compressor > compressor_
Definition: item.h:169
static uint64_t managed_bytes()
Definition: item.h:223
shash::Any * hash_ptr()
Definition: item.h:145
FileItem * file_item_
Definition: item.h:157
size_t size() const
Definition: bigvector.h:117