GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/task_write.cc Lines: 28 33 84.8 %
Date: 2019-02-03 02:48:13 Branches: 14 21 66.7 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#include "cvmfs_config.h"
6
#include "task_write.h"
7
8
#include <cstdlib>
9
10
#include "logging.h"
11
#include "upload_facility.h"
12
13
14
577004
void TaskWrite::OnBlockComplete(
15
  const upload::UploaderResults &results,
16
  BlockItem *block_item)
17
{
18
577004
  if (results.return_code != 0) {
19
    LogCvmfs(kLogSpooler, kLogStderr, "block upload failed (code: %d)",
20
      results.return_code);
21
    abort();
22
  }
23
24
577004
  delete block_item;
25
577004
}
26
27
28
1814
void TaskWrite::OnChunkComplete(
29
  const upload::UploaderResults &results,
30
  ChunkItem *chunk_item)
31
{
32
1814
  if (results.return_code != 0) {
33
    LogCvmfs(kLogSpooler, kLogStderr, "chunk upload failed (code: %d)",
34
             results.return_code);
35
    abort();
36
  }
37
38
1814
  FileItem *file_item = chunk_item->file_item();
39
  file_item->RegisterChunk(FileChunk(*chunk_item->hash_ptr(),
40
                                     chunk_item->offset(),
41
1814
                                     chunk_item->size()));
42
1814
  delete chunk_item;
43
44
1814
  if (file_item->IsProcessed()) {
45
151
    tubes_out_->DispatchAny(file_item);
46
  }
47
1814
}
48
49
50
578818
void TaskWrite::Process(BlockItem *input_block) {
51
578818
  ChunkItem *chunk_item = input_block->chunk_item();
52
53
578818
  upload::UploadStreamHandle *handle = chunk_item->upload_handle();
54
578818
  if (handle == NULL) {
55
    // The closure passed here, is called by the AbstractUploader as soon as
56
    // it successfully committed the complete chunk
57
    handle = uploader_->InitStreamedUpload(
58
      upload::AbstractUploader::MakeClosure(
59
1814
        &TaskWrite::OnChunkComplete, this, chunk_item));
60
1814
    assert(handle != NULL);
61
1814
    chunk_item->set_upload_handle(handle);
62
  }
63
64
578818
  switch (input_block->type()) {
65
    case BlockItem::kBlockData:
66
      uploader_->ScheduleUpload(
67
        handle,
68
        upload::AbstractUploader::UploadBuffer(
69
          input_block->size(), input_block->data()),
70
        upload::AbstractUploader::MakeClosure(
71
577004
          &TaskWrite::OnBlockComplete, this, input_block));
72
577004
      break;
73
    case BlockItem::kBlockStop:
74
      // If there is a sole piece and a legacy bulk chunk, two times the same
75
      // chunk is being uploaded.  Well.  It doesn't hurt.
76
1814
      if (chunk_item->IsSolePiece()) {
77
1
        chunk_item->MakeBulkChunk();
78
      }
79
1814
      uploader_->ScheduleCommit(handle, *chunk_item->hash_ptr());
80
1814
      delete input_block;
81
1814
      break;
82
    default:
83
      abort();
84
  }
85
578818
}