CernVM-FS  2.10.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
item.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_ITEM_H_
6 #define CVMFS_INGESTION_ITEM_H_
7 
8 #include <pthread.h>
9 #include <stdint.h>
10 
11 #include <cassert>
12 #include <string>
13 #include <vector>
14 
15 #include "atomic.h"
16 #include "compression.h"
17 #include "file_chunk.h"
18 #include "hash.h"
21 #include "util/pointer.h"
22 #include "util/single_copy.h"
23 
24 namespace upload {
25 struct UploadStreamHandle;
26 }
27 
28 class ItemAllocator;
29 
35  public:
36  explicit FileItem(
37  IngestionSource* source,
38  uint64_t min_chunk_size = 4 * 1024 * 1024,
39  uint64_t avg_chunk_size = 8 * 1024 * 1024,
40  uint64_t max_chunk_size = 16 * 1024 * 1024,
44  bool may_have_chunks = true,
45  bool has_legacy_bulk_chunk = false);
46  ~FileItem();
47 
49  std::string quit_marker = std::string(1, kQuitBeaconMarker);
50  UniquePtr<FileIngestionSource> source(new FileIngestionSource(quit_marker));
51  return new FileItem(source.Release());
52  }
53  bool IsQuitBeacon() {
54  return (path().length() == 1) && (path()[0] == kQuitBeaconMarker);
55  }
56 
57  std::string path() { return source_->GetPath(); }
58  uint64_t size() { return size_; }
64  bool may_have_chunks() { return may_have_chunks_; }
66 
67  void set_size(uint64_t val) { size_ = val; }
68  void set_may_have_chunks(bool val) { may_have_chunks_ = val; }
69  void set_is_fully_chunked() { atomic_inc32(&is_fully_chunked_); }
70  bool is_fully_chunked() { return atomic_read32(&is_fully_chunked_) != 0; }
71  uint64_t nchunks_in_fly() { return atomic_read64(&nchunks_in_fly_); }
72 
73  uint64_t GetNumChunks() { return chunks_.size(); }
75 
76  bool Open() { return source_->Open(); }
77  ssize_t Read(void *buffer, size_t nbyte) {
78  return source_->Read(buffer, nbyte);
79  }
80  bool Close() { return source_->Close(); }
81  bool GetSize(uint64_t *size) { return source_->GetSize(size); }
82 
83  // Called by ChunkItem constructor, decremented when a chunk is registered
84  void IncNchunksInFly() { atomic_inc64(&nchunks_in_fly_); }
85  void RegisterChunk(const FileChunk &file_chunk);
86  bool IsProcessed() {
87  return is_fully_chunked() && (atomic_read64(&nchunks_in_fly_) == 0);
88  }
89 
90  private:
91  static const uint64_t kSizeUnknown = uint64_t(-1);
92  static const char kQuitBeaconMarker = '\0';
93 
99 
100  uint64_t size_;
102 
115  pthread_mutex_t lock_;
116 };
117 
118 
126  public:
127  ChunkItem(FileItem *file_item, uint64_t offset);
128 
129  void MakeBulkChunk();
130  bool IsSolePiece() {
131  return !is_bulk_chunk_ && (offset_ == 0) && (size_ == file_item_->size());
132  }
133 
134  bool is_bulk_chunk() { return is_bulk_chunk_; }
136  uint64_t offset() { return offset_; }
137  uint64_t size() { return size_; }
139  // An active zlib compression stream requires 256kB of memory. Therefore,
140  // we create it only for the absolutely necessary duration and free the space
141  // afterwards.
143  void ReleaseCompressor();
144 
147 
148  void set_size(uint64_t val) { assert(size_ == 0); size_ = val; }
150  assert((upload_handle_ == NULL) && (val != NULL));
151  upload_handle_ = val;
152  }
153 
154  private:
156  uint64_t offset_;
161  uint64_t size_;
171 };
172 
173 
181  public:
182  enum BlockType {
186  };
187 
188  explicit BlockItem(ItemAllocator *allocator);
189  BlockItem(int64_t tag, ItemAllocator *allocator);
190  ~BlockItem();
191 
193  return new BlockItem(NULL);
194  }
195  bool IsQuitBeacon() {
196  return type_ == kBlockHollow;
197  }
198 
199  void MakeStop();
200  void MakeData(uint32_t capacity);
201  void MakeDataMove(BlockItem *other);
202  void MakeDataCopy(const unsigned char *data, uint32_t size);
203  void SetFileItem(FileItem *item);
204  void SetChunkItem(ChunkItem *item);
205  // Free data and reset to hollow block
206  void Reset();
207 
208  uint32_t Write(void *buf, uint32_t count);
209 
210  bool IsEmpty() { return size_ == 0; }
211  bool IsFull() { return size_ == capacity_; }
212 
213  unsigned char *data() { return data_; }
214  uint32_t capacity() { return capacity_; }
215  uint32_t size() { return size_; }
216  void set_size(uint32_t val) { assert(val <= capacity_); size_ = val; }
217 
218  BlockType type() { return type_; }
219  int64_t tag() { return tag_; }
222  static uint64_t managed_bytes() { return atomic_read64(&managed_bytes_); }
223 
224  private:
229 
230  // Forget pointer to the data
231  void Discharge();
232 
235 
242  int64_t tag_;
243 
249 
253  unsigned char *data_;
254  uint32_t capacity_;
255  uint32_t size_;
256 };
257 
258 #endif // CVMFS_INGESTION_ITEM_H_
FileItem * file_item()
Definition: item.h:135
unsigned char * data_
Definition: item.h:253
const bool has_legacy_bulk_chunk_
Definition: item.h:98
const unsigned kMaxContextSize
Definition: hash.h:76
void RegisterChunk(const FileChunk &file_chunk)
Definition: item.cc:48
void MakeDataMove(BlockItem *other)
Definition: item.cc:172
void set_is_fully_chunked()
Definition: item.h:69
int64_t atomic_int64
Definition: atomic.h:18
uint32_t size_
Definition: item.h:255
void Discharge()
Definition: item.cc:145
uint64_t size_
Definition: item.h:161
bool GetSize(uint64_t *size)
Definition: item.h:81
shash::ContextPtr hash_ctx_
Definition: item.h:168
bool IsSolePiece()
Definition: item.h:130
void SetChunkItem(ChunkItem *item)
Definition: item.cc:216
FileItem(IngestionSource *source, uint64_t min_chunk_size=4 *1024 *1024, uint64_t avg_chunk_size=8 *1024 *1024, uint64_t max_chunk_size=16 *1024 *1024, zlib::Algorithms compression_algorithm=zlib::kZlibDefault, shash::Algorithms hash_algorithm=shash::kSha1, shash::Suffix hash_suffix=shash::kSuffixNone, bool may_have_chunks=true, bool has_legacy_bulk_chunk=false)
Definition: item.cc:17
shash::Algorithms hash_algorithm()
Definition: item.h:62
unsigned char hash_ctx_buffer_[shash::kMaxContextSize]
Definition: item.h:170
zlib::Compressor * GetCompressor()
Definition: item.cc:93
void Reset()
Definition: item.cc:205
bool IsQuitBeacon()
Definition: item.h:53
ChunkItem * chunk_item_
Definition: item.h:248
pthread_mutex_t lock_
Definition: item.h:115
shash::Any hash_value_
Definition: item.h:169
bool may_have_chunks()
Definition: item.h:64
BlockItem(ItemAllocator *allocator)
Definition: item.cc:112
assert((mem||(size==0))&&"Out Of Memory")
unsigned char * data()
Definition: item.h:213
void SetFileItem(FileItem *item)
Definition: item.cc:223
Xor32Detector * chunk_detector()
Definition: item.h:59
uint32_t capacity()
Definition: item.h:214
bool is_bulk_chunk_
Definition: item.h:162
uint64_t GetNumChunks()
Definition: item.h:73
bool IsProcessed()
Definition: item.h:86
void MakeBulkChunk()
Definition: item.cc:87
void set_size(uint64_t val)
Definition: item.h:67
void ReleaseCompressor()
Definition: item.cc:102
ChunkItem(FileItem *file_item, uint64_t offset)
Definition: item.cc:69
shash::Any bulk_hash()
Definition: item.h:60
void set_size(uint64_t val)
Definition: item.h:148
bool IsQuitBeacon()
Definition: item.h:195
Xor32Detector chunk_detector_
Definition: item.h:103
Algorithms
Definition: hash.h:40
upload::UploadStreamHandle * upload_handle_
Definition: item.h:166
int32_t atomic_int32
Definition: atomic.h:17
int64_t tag()
Definition: item.h:219
FileChunkList * GetChunksPtr()
Definition: item.h:74
FileChunkList chunks_
Definition: item.h:105
const shash::Suffix hash_suffix_
Definition: item.h:97
Algorithms
Definition: compression.h:44
uint64_t nchunks_in_fly()
Definition: item.h:71
bool is_fully_chunked()
Definition: item.h:70
static FileItem * CreateQuitBeacon()
Definition: item.h:48
BlockType
Definition: item.h:182
void set_size(uint32_t val)
Definition: item.h:216
bool may_have_chunks_
Definition: item.h:101
const shash::Algorithms hash_algorithm_
Definition: item.h:96
bool is_bulk_chunk()
Definition: item.h:134
int64_t tag_
Definition: item.h:242
upload::UploadStreamHandle * upload_handle()
Definition: item.h:138
ChunkItem * chunk_item()
Definition: item.h:221
bool has_legacy_bulk_chunk()
Definition: item.h:65
void set_may_have_chunks(bool val)
Definition: item.h:68
FileItem * file_item()
Definition: item.h:220
const zlib::Algorithms compression_algorithm_
Definition: item.h:95
uint32_t capacity_
Definition: item.h:254
uint32_t size()
Definition: item.h:215
void MakeData(uint32_t capacity)
Definition: item.cc:157
UniquePtr< IngestionSource > source_
Definition: item.h:94
~BlockItem()
Definition: item.cc:138
Definition: item.h:34
atomic_int64 nchunks_in_fly_
Definition: item.h:109
shash::ContextPtr hash_ctx()
Definition: item.h:145
BlockType type_
Definition: item.h:234
uint64_t offset()
Definition: item.h:136
void IncNchunksInFly()
Definition: item.h:84
uint64_t size()
Definition: item.h:58
shash::Suffix hash_suffix()
Definition: item.h:63
char Suffix
Definition: hash.h:113
std::string path()
Definition: item.h:57
uint64_t offset_
Definition: item.h:156
bool IsEmpty()
Definition: item.h:210
static atomic_int64 managed_bytes_
Definition: item.h:228
bool Open()
Definition: item.h:76
bool IsFull()
Definition: item.h:211
FileItem * file_item_
Definition: item.h:247
uint64_t size_
Definition: item.h:100
zlib::Algorithms compression_algorithm()
Definition: item.h:61
static const uint64_t kSizeUnknown
Definition: item.h:91
ItemAllocator * allocator_
Definition: item.h:233
atomic_int32 is_fully_chunked_
Definition: item.h:114
bool Close()
Definition: item.h:80
void set_upload_handle(upload::UploadStreamHandle *val)
Definition: item.h:149
static BlockItem * CreateQuitBeacon()
Definition: item.h:192
BlockType type()
Definition: item.h:218
ssize_t Read(void *buffer, size_t nbyte)
Definition: item.h:77
~FileItem()
Definition: item.cc:44
void MakeDataCopy(const unsigned char *data, uint32_t size)
Definition: item.cc:189
uint32_t Write(void *buf, uint32_t count)
Definition: item.cc:230
shash::Any bulk_hash_
Definition: item.h:104
static const char kQuitBeaconMarker
Definition: item.h:92
void MakeStop()
Definition: item.cc:151
uint64_t size()
Definition: item.h:137
const char kSuffixNone
Definition: hash.h:52
UniquePtr< zlib::Compressor > compressor_
Definition: item.h:167
static uint64_t managed_bytes()
Definition: item.h:222
shash::Any * hash_ptr()
Definition: item.h:146
FileItem * file_item_
Definition: item.h:155
size_t size() const
Definition: bigvector.h:100