CernVM-FS  2.12.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ingestion_source.h
Go to the documentation of this file.
1 
5 #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_
6 #define CVMFS_INGESTION_INGESTION_SOURCE_H_
7 
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <unistd.h>
11 
12 #include <algorithm>
13 #include <cassert>
14 #include <cerrno>
15 #include <cstdio>
16 #include <string>
17 
18 #include "duplex_libarchive.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/posix.h"
23 #include "util/single_copy.h"
24 
25 /*
26  * The purpose of this class is to add a common interface for object that are
27  * ingested by the pipeline. Hence the pipeline is able to ingest everything
28  * that implements this interface.
29  * The ownership of new IngestionSource objects is transferred from their creator
30  * directly to the pipeline itself that will take care of deallocating
31  * everything.
32  * The pipeline is multithreaded so it is very likely that the code implement in
33  * this interface will be called inside a different thread from the one that
34  * originally
35  * allocated the object, hence is necessary to take extra care in the use of
36  * locks, prefer conditional variables.
37  */
39  public:
40  virtual ~IngestionSource() {}
41  virtual std::string GetPath() const = 0;
42  virtual bool IsRealFile() const = 0;
43  virtual bool Open() = 0;
44  virtual ssize_t Read(void* buffer, size_t nbyte) = 0;
45  virtual bool Close() = 0;
46  virtual bool GetSize(uint64_t* size) = 0;
47 };
48 
50  public:
51  explicit FileIngestionSource(const std::string& path)
52  : path_(path), fd_(-1), stat_obtained_(false) {}
54 
55  std::string GetPath() const { return path_; }
56  virtual bool IsRealFile() const { return true; }
57 
58  bool Open() {
59  fd_ = open(path_.c_str(), O_RDONLY);
60  if (fd_ < 0) {
62  "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(),
63  errno, strerror(errno));
64  return false;
65  }
66  return true;
67  }
68 
69  ssize_t Read(void* buffer, size_t nbyte) {
70  assert(fd_ >= 0);
71  ssize_t read = SafeRead(fd_, buffer, nbyte);
72  if (read < 0) {
73  LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s",
74  path_.c_str(), errno, strerror(errno));
75  }
76  return read;
77  }
78 
79  bool Close() {
80  if (fd_ == -1) return true;
81 
82  // tell to the OS that we are not going to access the file again in the
83  // foreaseable future.
84  (void)platform_invalidate_kcache(fd_, 0, 0);
85 
86  int ret = close(fd_);
87  fd_ = -1;
88  return (ret == 0);
89  }
90 
91  bool GetSize(uint64_t* size) {
92  if (stat_obtained_) {
93  *size = stat_.st_size;
94  return true;
95  }
96  int ret = platform_fstat(fd_, &stat_);
97  if (ret == 0) {
98  *size = stat_.st_size;
99  stat_obtained_ = true;
100  return true;
101  }
102  return false;
103  }
104 
105  private:
106  const std::string path_;
107  int fd_;
110 };
111 
112 
117  public:
119  const std::string &p, const unsigned char *d, unsigned s)
120  : path_(p), data_(d), size_(s), pos_(0) {}
122  virtual std::string GetPath() const { return path_; }
123  virtual bool IsRealFile() const { return false; }
124  virtual bool Open() { return true; }
125  virtual ssize_t Read(void* buffer, size_t nbyte) {
126  size_t remaining = size_ - pos_;
127  size_t size = std::min(remaining, nbyte);
128  if (size > 0) memcpy(buffer, data_ + pos_, size);
129  pos_ += size;
130  return static_cast<ssize_t>(size);
131  }
132  virtual bool Close() { return true; }
133  virtual bool GetSize(uint64_t* size) { *size = size_; return true; }
134 
135  private:
136  std::string path_;
137  const unsigned char *data_;
138  unsigned size_;
139  unsigned pos_;
140 };
141 
142 
147  public:
148  explicit StringIngestionSource(const std::string& data)
149  : data_(data),
150  source_("MEM", reinterpret_cast<const unsigned char*>(data_.data()),
151  data_.length()) {}
152  StringIngestionSource(const std::string& data, const std::string& filename)
153  : data_(data),
154  source_(filename, reinterpret_cast<const unsigned char*>(data_.data()),
155  data_.length()) {}
157  virtual std::string GetPath() const { return source_.GetPath(); }
158  virtual bool IsRealFile() const { return false; }
159  virtual bool Open() { return source_.Open(); }
160  virtual ssize_t Read(void* buffer, size_t nbyte) {
161  return source_.Read(buffer, nbyte);
162  }
163  virtual bool Close() { return source_.Close(); }
164  virtual bool GetSize(uint64_t* size) { return source_.GetSize(size); }
165 
166  private:
167  std::string data_;
169 };
170 
171 
173  public:
174  TarIngestionSource(const std::string &path, struct archive* archive,
175  struct archive_entry* entry, Signal* read_archive_signal)
176  : path_(path),
177  archive_(archive),
178  read_archive_signal_(read_archive_signal) {
180  const struct stat* stat_ = archive_entry_stat(entry);
181  size_ = stat_->st_size;
182  }
183 
184  std::string GetPath() const { return path_; }
185  virtual bool IsRealFile() const { return false; }
186 
187  bool Open() {
188  assert(size_ >= 0);
189  return true;
190  }
191 
192  ssize_t Read(void* external_buffer, size_t nbytes) {
193  ssize_t read = archive_read_data(archive_, external_buffer, nbytes);
194  if (read < 0) {
195  errno = archive_errno(archive_);
197  "failed to read data from the tar entry: %s (%d)\n %s",
198  path_.c_str(), errno, archive_error_string(archive_));
199  }
200  return read;
201  }
202 
203  bool Close() {
205  return true;
206  }
207 
208  bool GetSize(uint64_t* size) {
209  *size = size_;
210  return true;
211  }
212 
213  private:
214  std::string path_;
215  struct archive* archive_;
216  uint64_t size_;
218 };
219 
220 #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_
virtual bool GetSize(uint64_t *size)
std::string GetPath() const
virtual bool IsRealFile() const
struct stat64 platform_stat64
const std::string path_
MemoryIngestionSource(const std::string &p, const unsigned char *d, unsigned s)
FileIngestionSource(const std::string &path)
virtual bool IsRealFile() const
void Wakeup()
Definition: concurrency.cc:59
virtual ssize_t Read(void *buffer, size_t nbyte)
virtual bool Close()=0
assert((mem||(size==0))&&"Out Of Memory")
virtual bool GetSize(uint64_t *size)=0
virtual std::string GetPath() const
StringIngestionSource(const std::string &data)
bool IsSleeping()
Definition: concurrency.cc:66
StringIngestionSource(const std::string &data, const std::string &filename)
bool GetSize(uint64_t *size)
struct archive * archive_
virtual bool Open()=0
ssize_t SafeRead(int fd, void *buf, size_t nbyte)
Definition: posix.cc:2045
int platform_invalidate_kcache(const int fd, const off_t offset, const off_t length)
virtual bool IsRealFile() const
const unsigned char * data_
virtual std::string GetPath() const =0
virtual bool IsRealFile() const =0
ssize_t Read(void *external_buffer, size_t nbytes)
TarIngestionSource(const std::string &path, struct archive *archive, struct archive_entry *entry, Signal *read_archive_signal)
platform_stat64 stat_
virtual ssize_t Read(void *buffer, size_t nbyte)=0
ssize_t Read(void *buffer, size_t nbyte)
virtual bool GetSize(uint64_t *size)
virtual bool IsRealFile() const
bool GetSize(uint64_t *size)
virtual ~IngestionSource()
int platform_fstat(int filedes, platform_stat64 *buf)
std::string GetPath() const
static void size_t size
Definition: smalloc.h:54
virtual std::string GetPath() const
MemoryIngestionSource source_
virtual ssize_t Read(void *buffer, size_t nbyte)
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:528