CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ingestion_source.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_
6 #define CVMFS_INGESTION_INGESTION_SOURCE_H_
7 
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <unistd.h>
11 
12 #include <algorithm>
13 #include <cassert>
14 #include <cerrno>
15 #include <cstdio>
16 #include <string>
17 
18 #include "duplex_libarchive.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/posix.h"
23 #include "util/single_copy.h"
24 
25 /*
26  * The purpose of this class is to add a common interface for object that are
27  * ingested by the pipeline. Hence the pipeline is able to ingest everything
28  * that implements this interface.
29  * The ownership of new IngestionSource objects is transferred from their
30  * creator directly to the pipeline itself that will take care of deallocating
31  * everything.
32  * The pipeline is multithreaded so it is very likely that the code implement in
33  * this interface will be called inside a different thread from the one that
34  * originally
35  * allocated the object, hence is necessary to take extra care in the use of
36  * locks, prefer conditional variables.
37  */
39  public:
40  virtual ~IngestionSource() { }
41  virtual std::string GetPath() const = 0;
42  virtual bool IsRealFile() const = 0;
43  virtual bool Open() = 0;
44  virtual ssize_t Read(void *buffer, size_t nbyte) = 0;
45  virtual bool Close() = 0;
46  virtual bool GetSize(uint64_t *size) = 0;
47 };
48 
50  public:
51  explicit FileIngestionSource(const std::string &path)
52  : path_(path), fd_(-1), stat_obtained_(false) { }
54 
55  std::string GetPath() const { return path_; }
56  virtual bool IsRealFile() const { return true; }
57 
58  bool Open() {
59  fd_ = open(path_.c_str(), O_RDONLY);
60  if (fd_ < 0) {
62  "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(),
63  errno, strerror(errno));
64  return false;
65  }
66  return true;
67  }
68 
69  ssize_t Read(void *buffer, size_t nbyte) {
70  assert(fd_ >= 0);
71  ssize_t read = SafeRead(fd_, buffer, nbyte);
72  if (read < 0) {
73  LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s",
74  path_.c_str(), errno, strerror(errno));
75  }
76  return read;
77  }
78 
79  bool Close() {
80  if (fd_ == -1)
81  return true;
82 
83  // tell to the OS that we are not going to access the file again in the
84  // foreaseable future.
85  (void)platform_invalidate_kcache(fd_, 0, 0);
86 
87  int ret = close(fd_);
88  fd_ = -1;
89  return (ret == 0);
90  }
91 
92  bool GetSize(uint64_t *size) {
93  if (stat_obtained_) {
94  *size = stat_.st_size;
95  return true;
96  }
97  int ret = platform_fstat(fd_, &stat_);
98  if (ret == 0) {
99  *size = stat_.st_size;
100  stat_obtained_ = true;
101  return true;
102  }
103  return false;
104  }
105 
106  private:
107  const std::string path_;
108  int fd_;
111 };
112 
113 
118  public:
119  MemoryIngestionSource(const std::string &p, const unsigned char *d,
120  unsigned s)
121  : path_(p), data_(d), size_(s), pos_(0) { }
123  virtual std::string GetPath() const { return path_; }
124  virtual bool IsRealFile() const { return false; }
125  virtual bool Open() { return true; }
126  virtual ssize_t Read(void *buffer, size_t nbyte) {
127  size_t remaining = size_ - pos_;
128  size_t size = std::min(remaining, nbyte);
129  if (size > 0)
130  memcpy(buffer, data_ + pos_, size);
131  pos_ += size;
132  return static_cast<ssize_t>(size);
133  }
134  virtual bool Close() { return true; }
135  virtual bool GetSize(uint64_t *size) {
136  *size = size_;
137  return true;
138  }
139 
140  private:
141  std::string path_;
142  const unsigned char *data_;
143  unsigned size_;
144  unsigned pos_;
145 };
146 
147 
152  public:
153  explicit StringIngestionSource(const std::string &data)
154  : data_(data)
155  , source_("MEM", reinterpret_cast<const unsigned char *>(data_.data()),
156  data_.length()) { }
157  StringIngestionSource(const std::string &data, const std::string &filename)
158  : data_(data)
159  , source_(filename, reinterpret_cast<const unsigned char *>(data_.data()),
160  data_.length()) { }
162  virtual std::string GetPath() const { return source_.GetPath(); }
163  virtual bool IsRealFile() const { return false; }
164  virtual bool Open() { return source_.Open(); }
165  virtual ssize_t Read(void *buffer, size_t nbyte) {
166  return source_.Read(buffer, nbyte);
167  }
168  virtual bool Close() { return source_.Close(); }
169  virtual bool GetSize(uint64_t *size) { return source_.GetSize(size); }
170 
171  private:
172  std::string data_;
174 };
175 
176 
178  public:
179  TarIngestionSource(const std::string &path, struct archive *archive,
180  struct archive_entry *entry, Signal *read_archive_signal)
181  : path_(path)
182  , archive_(archive)
183  , read_archive_signal_(read_archive_signal) {
185  const struct stat *stat_ = archive_entry_stat(entry);
186  size_ = stat_->st_size;
187  }
188 
189  std::string GetPath() const { return path_; }
190  virtual bool IsRealFile() const { return false; }
191 
192  bool Open() {
193  assert(size_ >= 0);
194  return true;
195  }
196 
197  ssize_t Read(void *external_buffer, size_t nbytes) {
198  ssize_t read = archive_read_data(archive_, external_buffer, nbytes);
199  if (read < 0) {
200  errno = archive_errno(archive_);
202  "failed to read data from the tar entry: %s (%d)\n %s",
203  path_.c_str(), errno, archive_error_string(archive_));
204  }
205  return read;
206  }
207 
208  bool Close() {
210  return true;
211  }
212 
213  bool GetSize(uint64_t *size) {
214  *size = size_;
215  return true;
216  }
217 
218  private:
219  std::string path_;
220  struct archive *archive_;
221  uint64_t size_;
223 };
224 
225 #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_
virtual bool GetSize(uint64_t *size)
std::string GetPath() const
virtual bool IsRealFile() const
struct stat64 platform_stat64
const std::string path_
MemoryIngestionSource(const std::string &p, const unsigned char *d, unsigned s)
FileIngestionSource(const std::string &path)
virtual bool IsRealFile() const
void Wakeup()
Definition: concurrency.cc:60
virtual ssize_t Read(void *buffer, size_t nbyte)
virtual bool Close()=0
assert((mem||(size==0))&&"Out Of Memory")
virtual bool GetSize(uint64_t *size)=0
virtual std::string GetPath() const
StringIngestionSource(const std::string &data)
bool IsSleeping()
Definition: concurrency.cc:67
StringIngestionSource(const std::string &data, const std::string &filename)
bool GetSize(uint64_t *size)
struct archive * archive_
virtual bool Open()=0
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
Definition: posix.cc:2093
int platform_invalidate_kcache(const int fd, const off_t offset, const off_t length)
virtual bool IsRealFile() const
const unsigned char * data_
virtual std::string GetPath() const =0
virtual bool IsRealFile() const =0
ssize_t Read(void *external_buffer, size_t nbytes)
TarIngestionSource(const std::string &path, struct archive *archive, struct archive_entry *entry, Signal *read_archive_signal)
platform_stat64 stat_
virtual ssize_t Read(void *buffer, size_t nbyte)=0
ssize_t Read(void *buffer, size_t nbyte)
virtual bool GetSize(uint64_t *size)
virtual bool IsRealFile() const
bool GetSize(uint64_t *size)
virtual ~IngestionSource()
int platform_fstat(int filedes, platform_stat64 *buf)
std::string GetPath() const
static void size_t size
Definition: smalloc.h:54
virtual std::string GetPath() const
MemoryIngestionSource source_
virtual ssize_t Read(void *buffer, size_t nbyte)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545