CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task_write.cc
Go to the documentation of this file.
1 
6 #include "task_write.h"
7 
8 #include <cstdlib>
9 
10 #include "upload_facility.h"
11 #include "util/exception.h"
12 #include "util/logging.h"
13 
14 
16  BlockItem *block_item) {
17  if (results.return_code != 0) {
18  PANIC(kLogStderr, "block upload failed (code: %d)", results.return_code);
19  }
20 
21  delete block_item;
22 }
23 
24 
26  ChunkItem *chunk_item) {
27  if (results.return_code != 0) {
28  PANIC(kLogStderr, "chunk upload failed (code: %d)", results.return_code);
29  }
30 
31  FileItem *file_item = chunk_item->file_item();
32  file_item->RegisterChunk(FileChunk(
33  *chunk_item->hash_ptr(), chunk_item->offset(), chunk_item->size()));
34  delete chunk_item;
35 
36  if (file_item->IsProcessed()) {
37  tubes_out_->DispatchAny(file_item);
38  }
39 }
40 
41 
42 void TaskWrite::Process(BlockItem *input_block) {
43  ChunkItem *chunk_item = input_block->chunk_item();
44 
45  upload::UploadStreamHandle *handle = chunk_item->upload_handle();
46  if (handle == NULL) {
47  // The closure passed here, is called by the AbstractUploader as soon as
48  // it successfully committed the complete chunk
49  handle = uploader_->InitStreamedUpload(
51  &TaskWrite::OnChunkComplete, this, chunk_item));
52  assert(handle != NULL);
53  chunk_item->set_upload_handle(handle);
54  }
55 
56  switch (input_block->type()) {
59  handle,
61  input_block->data()),
63  &TaskWrite::OnBlockComplete, this, input_block));
64  break;
66  // If there is a sole piece and a legacy bulk chunk, two times the same
67  // chunk is being uploaded. Well. It doesn't hurt.
68  if (chunk_item->IsSolePiece()) {
69  chunk_item->MakeBulkChunk();
70  }
71  uploader_->ScheduleCommit(handle, *chunk_item->hash_ptr());
72  delete input_block;
73  break;
74  default:
75  PANIC(NULL);
76  }
77 }
FileItem * file_item()
Definition: item.h:134
void ScheduleUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback=NULL)
void RegisterChunk(const FileChunk &file_chunk)
Definition: item.cc:44
virtual void Process(BlockItem *input_block)
Definition: task_write.cc:42
Tube< ItemT >::Link * DispatchAny(ItemT *item)
Definition: tube.h:278
bool IsSolePiece()
Definition: item.h:129
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback)=0
#define PANIC(...)
Definition: exception.h:29
void ScheduleCommit(UploadStreamHandle *handle, const shash::Any &content_hash)
assert((mem||(size==0))&&"Out Of Memory")
unsigned char * data()
Definition: item.h:211
bool IsProcessed()
Definition: item.h:85
void MakeBulkChunk()
Definition: item.cc:82
upload::AbstractUploader * uploader_
Definition: task_write.h:32
void OnBlockComplete(const upload::UploaderResults &results, BlockItem *block_item)
Definition: task_write.cc:15
upload::UploadStreamHandle * upload_handle()
Definition: item.h:137
ChunkItem * chunk_item()
Definition: item.h:222
uint32_t size()
Definition: item.h:213
Definition: item.h:34
uint64_t offset()
Definition: item.h:135
void OnChunkComplete(const upload::UploaderResults &results, ChunkItem *chunk_item)
Definition: task_write.cc:25
void set_upload_handle(upload::UploadStreamHandle *val)
Definition: item.h:151
BlockType type()
Definition: item.h:219
TubeGroup< FileItem > * tubes_out_
Definition: task_write.h:31
uint64_t size()
Definition: item.h:136
shash::Any * hash_ptr()
Definition: item.h:145
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:197