GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/ingestion/ingestion_source.h Lines: 28 55 50.9 %
Date: 2019-02-03 02:48:13 Branches: 8 24 33.3 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_
6
#define CVMFS_INGESTION_INGESTION_SOURCE_H_
7
8
#include <fcntl.h>
9
#include <pthread.h>
10
#include <unistd.h>
11
12
#include <cassert>
13
#include <cerrno>
14
#include <cstdio>
15
#include <string>
16
17
#include "duplex_libarchive.h"
18
#include "logging.h"
19
#include "platform.h"
20
#include "util/posix.h"
21
#include "util/single_copy.h"
22
#include "util_concurrency.h"
23
24
/*
25
 * The purpose of this class is to add a common interface for object that are
26
 * ingested by the pipeline. Hence the pipeline is able to ingest everything
27
 * that implements this interface.
28
 * The ownership of new IngestionSource objects is transfered from their creator
29
 * directly to the pipeline itself that will take care of deallocating
30
 * everything.
31
 * The pipeline is multithreaded so it is very likely that the code implement in
32
 * this interface will be called inside a different thread from the one that
33
 * originally
34
 * allocated the object, hence is necessary to take extra care in the use of
35
 * locks, prefer conditional variables.
36
 */
37
958
class IngestionSource : SingleCopy {
38
 public:
39
958
  virtual ~IngestionSource() {}
40
  virtual std::string GetPath() const = 0;
41
  virtual bool Open() = 0;
42
  virtual ssize_t Read(void* buffer, size_t nbyte) = 0;
43
  virtual bool Close() = 0;
44
  virtual bool GetSize(uint64_t* size) = 0;
45
};
46
47
class FileIngestionSource : public IngestionSource {
48
 public:
49
958
  explicit FileIngestionSource(const std::string& path)
50
958
      : path_(path), fd_(-1), stat_obtained_(false) {}
51
1916
  ~FileIngestionSource() {}
52
53
2339
  std::string GetPath() const { return path_; }
54
55
155
  bool Open() {
56
155
    fd_ = open(path_.c_str(), O_RDONLY);
57
155
    if (fd_ < 0) {
58
      LogCvmfs(kLogCvmfs, kLogStderr,
59
               "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(),
60
               errno, strerror(errno));
61
      return false;
62
    }
63
155
    return true;
64
  }
65
66
216390
  ssize_t Read(void* buffer, size_t nbyte) {
67
216390
    assert(fd_ >= 0);
68
216390
    ssize_t read = SafeRead(fd_, buffer, nbyte);
69
216390
    if (read < 0) {
70
      LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s",
71
               fd_, path_.c_str(), errno, strerror(errno));
72
    }
73
216390
    return read;
74
  }
75
76
155
  bool Close() {
77
155
    if (fd_ == -1) return true;
78
79
    // tell to the OS that we are not going to access the file again in the
80
    // foreaseable future.
81
155
    (void)platform_invalidate_kcache(fd_, 0, 0);
82
83
155
    int ret = close(fd_);
84
155
    fd_ = -1;
85
155
    return (ret == 0);
86
  }
87
88
155
  bool GetSize(uint64_t* size) {
89
155
    if (stat_obtained_) {
90
      *size = stat_.st_size;
91
      return true;
92
    }
93
155
    int ret = platform_fstat(fd_, &stat_);
94
155
    if (ret == 0) {
95
155
      *size = stat_.st_size;
96
155
      stat_obtained_ = true;
97
155
      return true;
98
    }
99
    return false;
100
  }
101
102
 private:
103
  const std::string path_;
104
  int fd_;
105
  platform_stat64 stat_;
106
  bool stat_obtained_;
107
};
108
109
class TarIngestionSource : public IngestionSource {
110
 public:
111
  TarIngestionSource(const std::string path, struct archive* archive,
112
                     struct archive_entry* entry, Signal* read_archive_signal)
113
      : path_(path),
114
        archive_(archive),
115
        read_archive_signal_(read_archive_signal) {
116
    assert(read_archive_signal_->IsSleeping());
117
    const struct stat* stat_ = archive_entry_stat(entry);
118
    size_ = stat_->st_size;
119
  }
120
121
  std::string GetPath() const { return path_; }
122
123
  bool Open() {
124
    assert(size_ >= 0);
125
    return true;
126
  }
127
128
  ssize_t Read(void* external_buffer, size_t nbytes) {
129
    ssize_t read = archive_read_data(archive_, external_buffer, nbytes);
130
    if (read < 0) {
131
      errno = archive_errno(archive_);
132
      LogCvmfs(kLogCvmfs, kLogStderr,
133
               "failed to read data from the tar entry: %s (%d)\n %s",
134
               path_.c_str(), errno, archive_error_string(archive_));
135
    }
136
    return read;
137
  }
138
139
  bool Close() {
140
    read_archive_signal_->Wakeup();
141
    return true;
142
  }
143
144
  bool GetSize(uint64_t* size) {
145
    *size = size_;
146
    return true;
147
  }
148
149
 private:
150
  std::string path_;
151
  struct archive* archive_;
152
  uint64_t size_;
153
  Signal* read_archive_signal_;
154
};
155
156
#endif  // CVMFS_INGESTION_INGESTION_SOURCE_H_