| Directory: | cvmfs/ |
|---|---|
| File: | cvmfs/ingestion/ingestion_source.h |
| Date: | 2025-11-09 02:35:23 |
| Exec | Total | Coverage | |
|---|---|---|---|
| Lines: | 64 | 95 | 67.4% |
| Branches: | 17 | 38 | 44.7% |
| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | /** | ||
| 2 | * This file is part of the CernVM File System. | ||
| 3 | */ | ||
| 4 | |||
| 5 | #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
| 6 | #define CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
| 7 | |||
| 8 | #include <fcntl.h> | ||
| 9 | #include <pthread.h> | ||
| 10 | #include <unistd.h> | ||
| 11 | |||
| 12 | #include <algorithm> | ||
| 13 | #include <cassert> | ||
| 14 | #include <cerrno> | ||
| 15 | #include <cstdio> | ||
| 16 | #include <string> | ||
| 17 | |||
| 18 | #include "duplex_libarchive.h" | ||
| 19 | #include "util/concurrency.h" | ||
| 20 | #include "util/logging.h" | ||
| 21 | #include "util/platform.h" | ||
| 22 | #include "util/posix.h" | ||
| 23 | #include "util/single_copy.h" | ||
| 24 | |||
| 25 | /* | ||
| 26 | * The purpose of this class is to add a common interface for object that are | ||
| 27 | * ingested by the pipeline. Hence the pipeline is able to ingest everything | ||
| 28 | * that implements this interface. | ||
| 29 | * The ownership of new IngestionSource objects is transferred from their | ||
| 30 | * creator directly to the pipeline itself that will take care of deallocating | ||
| 31 | * everything. | ||
| 32 | * The pipeline is multithreaded so it is very likely that the code implement in | ||
| 33 | * this interface will be called inside a different thread from the one that | ||
| 34 | * originally | ||
| 35 | * allocated the object, hence is necessary to take extra care in the use of | ||
| 36 | * locks, prefer conditional variables. | ||
| 37 | */ | ||
| 38 | class IngestionSource : SingleCopy { | ||
| 39 | public: | ||
| 40 | 14743582 | virtual ~IngestionSource() { } | |
| 41 | virtual std::string GetPath() const = 0; | ||
| 42 | virtual bool IsRealFile() const = 0; | ||
| 43 | virtual bool Open() = 0; | ||
| 44 | virtual ssize_t Read(void *buffer, size_t nbyte) = 0; | ||
| 45 | virtual bool Close() = 0; | ||
| 46 | virtual bool GetSize(uint64_t *size) = 0; | ||
| 47 | }; | ||
| 48 | |||
| 49 | class FileIngestionSource : public IngestionSource { | ||
| 50 | public: | ||
| 51 | 128429 | explicit FileIngestionSource(const std::string &path) | |
| 52 |
1/2✓ Branch 2 taken 128429 times.
✗ Branch 3 not taken.
|
128429 | : path_(path), fd_(-1), stat_obtained_(false) { } |
| 53 | 436890 | ~FileIngestionSource() { } | |
| 54 | |||
| 55 | 237539 | std::string GetPath() const { return path_; } | |
| 56 | ✗ | virtual bool IsRealFile() const { return true; } | |
| 57 | |||
| 58 | 39472 | bool Open() { | |
| 59 | 39472 | fd_ = open(path_.c_str(), O_RDONLY); | |
| 60 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 39501 times.
|
39501 | if (fd_ < 0) { |
| 61 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, | |
| 62 | "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(), | ||
| 63 | ✗ | errno, strerror(errno)); | |
| 64 | ✗ | return false; | |
| 65 | } | ||
| 66 | 39501 | return true; | |
| 67 | } | ||
| 68 | |||
| 69 | 15751715 | ssize_t Read(void *buffer, size_t nbyte) { | |
| 70 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15751715 times.
|
15751715 | assert(fd_ >= 0); |
| 71 | 15751715 | const ssize_t read = SafeRead(fd_, buffer, nbyte); | |
| 72 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 15761778 times.
|
15761778 | if (read < 0) { |
| 73 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s", | |
| 74 | ✗ | path_.c_str(), errno, strerror(errno)); | |
| 75 | } | ||
| 76 | 15757370 | return read; | |
| 77 | } | ||
| 78 | |||
| 79 | 39501 | bool Close() { | |
| 80 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 39501 times.
|
39501 | if (fd_ == -1) |
| 81 | ✗ | return true; | |
| 82 | |||
| 83 | // tell to the OS that we are not going to access the file again in the | ||
| 84 | // foreaseable future. | ||
| 85 | 39501 | (void)platform_invalidate_kcache(fd_, 0, 0); | |
| 86 | |||
| 87 | 39501 | const int ret = close(fd_); | |
| 88 | 39501 | fd_ = -1; | |
| 89 | 39501 | return (ret == 0); | |
| 90 | } | ||
| 91 | |||
| 92 | 14034 | bool GetSize(uint64_t *size) { | |
| 93 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 14034 times.
|
14034 | if (stat_obtained_) { |
| 94 | ✗ | *size = stat_.st_size; | |
| 95 | ✗ | return true; | |
| 96 | } | ||
| 97 | 14034 | const int ret = platform_fstat(fd_, &stat_); | |
| 98 |
1/2✓ Branch 0 taken 14063 times.
✗ Branch 1 not taken.
|
14063 | if (ret == 0) { |
| 99 | 14063 | *size = stat_.st_size; | |
| 100 | 14063 | stat_obtained_ = true; | |
| 101 | 14063 | return true; | |
| 102 | } | ||
| 103 | ✗ | return false; | |
| 104 | } | ||
| 105 | |||
| 106 | private: | ||
| 107 | const std::string path_; | ||
| 108 | int fd_; | ||
| 109 | platform_stat64 stat_; | ||
| 110 | bool stat_obtained_; | ||
| 111 | }; | ||
| 112 | |||
| 113 | |||
| 114 | /** | ||
| 115 | * Wraps around existing memory without owning it. | ||
| 116 | */ | ||
| 117 | class MemoryIngestionSource : public IngestionSource { | ||
| 118 | public: | ||
| 119 | 7250071 | MemoryIngestionSource(const std::string &p, const unsigned char *d, | |
| 120 | unsigned s) | ||
| 121 |
1/2✓ Branch 2 taken 7250071 times.
✗ Branch 3 not taken.
|
7250071 | : path_(p), data_(d), size_(s), pos_(0) { } |
| 122 | 28979294 | virtual ~MemoryIngestionSource() { } | |
| 123 | 35987005 | virtual std::string GetPath() const { return path_; } | |
| 124 | ✗ | virtual bool IsRealFile() const { return false; } | |
| 125 | 7245460 | virtual bool Open() { return true; } | |
| 126 | 14457249 | virtual ssize_t Read(void *buffer, size_t nbyte) { | |
| 127 | 14457249 | const size_t remaining = size_ - pos_; | |
| 128 | 14457249 | const size_t size = std::min(remaining, nbyte); | |
| 129 |
2/2✓ Branch 0 taken 7248632 times.
✓ Branch 1 taken 7208588 times.
|
14457220 | if (size > 0) |
| 130 | 7248632 | memcpy(buffer, data_ + pos_, size); | |
| 131 | 14457220 | pos_ += size; | |
| 132 | 14457220 | return static_cast<ssize_t>(size); | |
| 133 | } | ||
| 134 | 7240385 | virtual bool Close() { return true; } | |
| 135 | 7245817 | virtual bool GetSize(uint64_t *size) { | |
| 136 | 7245817 | *size = size_; | |
| 137 | 7245817 | return true; | |
| 138 | } | ||
| 139 | |||
| 140 | private: | ||
| 141 | std::string path_; | ||
| 142 | const unsigned char *data_; | ||
| 143 | unsigned size_; | ||
| 144 | unsigned pos_; | ||
| 145 | }; | ||
| 146 | |||
| 147 | |||
| 148 | /** | ||
| 149 | * Uses an std::string as data buffer | ||
| 150 | */ | ||
| 151 | class StringIngestionSource : public IngestionSource { | ||
| 152 | public: | ||
| 153 | 71 | explicit StringIngestionSource(const std::string &data) | |
| 154 | 142 | : data_(data) | |
| 155 |
2/4✓ Branch 3 taken 71 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 71 times.
✗ Branch 7 not taken.
|
71 | , source_("MEM", reinterpret_cast<const unsigned char *>(data_.data()), |
| 156 |
1/2✓ Branch 2 taken 71 times.
✗ Branch 3 not taken.
|
142 | data_.length()) { } |
| 157 | ✗ | StringIngestionSource(const std::string &data, const std::string &filename) | |
| 158 | ✗ | : data_(data) | |
| 159 | ✗ | , source_(filename, reinterpret_cast<const unsigned char *>(data_.data()), | |
| 160 | ✗ | data_.length()) { } | |
| 161 | ✗ | virtual ~StringIngestionSource() { } | |
| 162 | 93 | virtual std::string GetPath() const { return source_.GetPath(); } | |
| 163 | ✗ | virtual bool IsRealFile() const { return false; } | |
| 164 | 71 | virtual bool Open() { return source_.Open(); } | |
| 165 | 923 | virtual ssize_t Read(void *buffer, size_t nbyte) { | |
| 166 | 923 | return source_.Read(buffer, nbyte); | |
| 167 | } | ||
| 168 | 71 | virtual bool Close() { return source_.Close(); } | |
| 169 | 22 | virtual bool GetSize(uint64_t *size) { return source_.GetSize(size); } | |
| 170 | |||
| 171 | private: | ||
| 172 | std::string data_; | ||
| 173 | MemoryIngestionSource source_; | ||
| 174 | }; | ||
| 175 | |||
| 176 | |||
| 177 | class TarIngestionSource : public IngestionSource { | ||
| 178 | public: | ||
| 179 | 162 | TarIngestionSource(const std::string &path, struct archive *archive, | |
| 180 | struct archive_entry *entry, Signal *read_archive_signal) | ||
| 181 | 324 | : path_(path) | |
| 182 | 162 | , archive_(archive) | |
| 183 |
1/2✓ Branch 2 taken 162 times.
✗ Branch 3 not taken.
|
162 | , read_archive_signal_(read_archive_signal) { |
| 184 |
2/4✓ Branch 1 taken 162 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 162 times.
|
162 | assert(read_archive_signal_->IsSleeping()); |
| 185 |
1/2✓ Branch 1 taken 162 times.
✗ Branch 2 not taken.
|
162 | const struct stat *stat_ = archive_entry_stat(entry); |
| 186 | 162 | size_ = stat_->st_size; | |
| 187 | 162 | } | |
| 188 | |||
| 189 | ✗ | std::string GetPath() const { return path_; } | |
| 190 | ✗ | virtual bool IsRealFile() const { return false; } | |
| 191 | |||
| 192 | ✗ | bool Open() { | |
| 193 | assert(size_ >= 0); | ||
| 194 | ✗ | return true; | |
| 195 | } | ||
| 196 | |||
| 197 | ✗ | ssize_t Read(void *external_buffer, size_t nbytes) { | |
| 198 | ✗ | const ssize_t read = archive_read_data(archive_, external_buffer, nbytes); | |
| 199 | ✗ | if (read < 0) { | |
| 200 | ✗ | errno = archive_errno(archive_); | |
| 201 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, | |
| 202 | "failed to read data from the tar entry: %s (%d)\n %s", | ||
| 203 | ✗ | path_.c_str(), errno, archive_error_string(archive_)); | |
| 204 | } | ||
| 205 | ✗ | return read; | |
| 206 | } | ||
| 207 | |||
| 208 | 162 | bool Close() { | |
| 209 | 162 | read_archive_signal_->Wakeup(); | |
| 210 | 162 | return true; | |
| 211 | } | ||
| 212 | |||
| 213 | ✗ | bool GetSize(uint64_t *size) { | |
| 214 | ✗ | *size = size_; | |
| 215 | ✗ | return true; | |
| 216 | } | ||
| 217 | |||
| 218 | private: | ||
| 219 | std::string path_; | ||
| 220 | struct archive *archive_; | ||
| 221 | uint64_t size_; | ||
| 222 | Signal *read_archive_signal_; | ||
| 223 | }; | ||
| 224 | |||
| 225 | #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
| 226 |