CernVM-FS  2.9.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task_write.cc
Go to the documentation of this file.
1 
5 #include "cvmfs_config.h"
6 #include "task_write.h"
7 
8 #include <cstdlib>
9 
10 #include "logging.h"
11 #include "upload_facility.h"
12 #include "util/exception.h"
13 
14 
16  const upload::UploaderResults &results,
17  BlockItem *block_item)
18 {
19  if (results.return_code != 0) {
20  PANIC(kLogStderr, "block upload failed (code: %d)", results.return_code);
21  }
22 
23  delete block_item;
24 }
25 
26 
28  const upload::UploaderResults &results,
29  ChunkItem *chunk_item)
30 {
31  if (results.return_code != 0) {
32  PANIC(kLogStderr, "chunk upload failed (code: %d)", results.return_code);
33  }
34 
35  FileItem *file_item = chunk_item->file_item();
36  file_item->RegisterChunk(FileChunk(*chunk_item->hash_ptr(),
37  chunk_item->offset(),
38  chunk_item->size()));
39  delete chunk_item;
40 
41  if (file_item->IsProcessed()) {
42  tubes_out_->DispatchAny(file_item);
43  }
44 }
45 
46 
47 void TaskWrite::Process(BlockItem *input_block) {
48  ChunkItem *chunk_item = input_block->chunk_item();
49 
50  upload::UploadStreamHandle *handle = chunk_item->upload_handle();
51  if (handle == NULL) {
52  // The closure passed here, is called by the AbstractUploader as soon as
53  // it successfully committed the complete chunk
54  handle = uploader_->InitStreamedUpload(
56  &TaskWrite::OnChunkComplete, this, chunk_item));
57  assert(handle != NULL);
58  chunk_item->set_upload_handle(handle);
59  }
60 
61  switch (input_block->type()) {
64  handle,
66  input_block->size(), input_block->data()),
68  &TaskWrite::OnBlockComplete, this, input_block));
69  break;
71  // If there is a sole piece and a legacy bulk chunk, two times the same
72  // chunk is being uploaded. Well. It doesn't hurt.
73  if (chunk_item->IsSolePiece()) {
74  chunk_item->MakeBulkChunk();
75  }
76  uploader_->ScheduleCommit(handle, *chunk_item->hash_ptr());
77  delete input_block;
78  break;
79  default:
80  PANIC(NULL);
81  }
82 }
FileItem * file_item()
Definition: item.h:135
void ScheduleUpload(UploadStreamHandle *handle, UploadBuffer buffer, const CallbackTN *callback=NULL)
void RegisterChunk(const FileChunk &file_chunk)
Definition: item.cc:48
virtual void Process(BlockItem *input_block)
Definition: task_write.cc:47
Tube< ItemT >::Link * DispatchAny(ItemT *item)
Definition: tube.h:266
bool IsSolePiece()
Definition: item.h:130
#define PANIC(...)
Definition: exception.h:26
void ScheduleCommit(UploadStreamHandle *handle, const shash::Any &content_hash)
assert((mem||(size==0))&&"Out Of Memory")
unsigned char * data()
Definition: item.h:213
bool IsProcessed()
Definition: item.h:86
void MakeBulkChunk()
Definition: item.cc:87
upload::AbstractUploader * uploader_
Definition: task_write.h:33
void OnBlockComplete(const upload::UploaderResults &results, BlockItem *block_item)
Definition: task_write.cc:15
upload::UploadStreamHandle * upload_handle()
Definition: item.h:138
ChunkItem * chunk_item()
Definition: item.h:221
uint32_t size()
Definition: item.h:215
Definition: item.h:34
uint64_t offset()
Definition: item.h:136
virtual UploadStreamHandle * InitStreamedUpload(const CallbackTN *callback=NULL)=0
void OnChunkComplete(const upload::UploaderResults &results, ChunkItem *chunk_item)
Definition: task_write.cc:27
void set_upload_handle(upload::UploadStreamHandle *val)
Definition: item.h:149
BlockType type()
Definition: item.h:218
TubeGroup< FileItem > * tubes_out_
Definition: task_write.h:32
uint64_t size()
Definition: item.h:137
shash::Any * hash_ptr()
Definition: item.h:146
static CallbackTN * MakeClosure(typename BoundClosure< UploaderResults, DelegateT, ClosureDataT >::CallbackMethod method, DelegateT *delegate, const ClosureDataT &closure_data)
Definition: async.h:204