Directory: | cvmfs/ |
---|---|
File: | cvmfs/ingestion/ingestion_source.h |
Date: | 2025-06-22 02:36:02 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 64 | 95 | 67.4% |
Branches: | 17 | 38 | 44.7% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
6 | #define CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
7 | |||
8 | #include <fcntl.h> | ||
9 | #include <pthread.h> | ||
10 | #include <unistd.h> | ||
11 | |||
12 | #include <algorithm> | ||
13 | #include <cassert> | ||
14 | #include <cerrno> | ||
15 | #include <cstdio> | ||
16 | #include <string> | ||
17 | |||
18 | #include "duplex_libarchive.h" | ||
19 | #include "util/concurrency.h" | ||
20 | #include "util/logging.h" | ||
21 | #include "util/platform.h" | ||
22 | #include "util/posix.h" | ||
23 | #include "util/single_copy.h" | ||
24 | |||
25 | /* | ||
26 | * The purpose of this class is to add a common interface for object that are | ||
27 | * ingested by the pipeline. Hence the pipeline is able to ingest everything | ||
28 | * that implements this interface. | ||
29 | * The ownership of new IngestionSource objects is transferred from their | ||
30 | * creator directly to the pipeline itself that will take care of deallocating | ||
31 | * everything. | ||
32 | * The pipeline is multithreaded so it is very likely that the code implement in | ||
33 | * this interface will be called inside a different thread from the one that | ||
34 | * originally | ||
35 | * allocated the object, hence is necessary to take extra care in the use of | ||
36 | * locks, prefer conditional variables. | ||
37 | */ | ||
38 | class IngestionSource : SingleCopy { | ||
39 | public: | ||
40 | 727758 | virtual ~IngestionSource() { } | |
41 | virtual std::string GetPath() const = 0; | ||
42 | virtual bool IsRealFile() const = 0; | ||
43 | virtual bool Open() = 0; | ||
44 | virtual ssize_t Read(void *buffer, size_t nbyte) = 0; | ||
45 | virtual bool Close() = 0; | ||
46 | virtual bool GetSize(uint64_t *size) = 0; | ||
47 | }; | ||
48 | |||
49 | class FileIngestionSource : public IngestionSource { | ||
50 | public: | ||
51 | 114760 | explicit FileIngestionSource(const std::string &path) | |
52 |
1/2✓ Branch 2 taken 114760 times.
✗ Branch 3 not taken.
|
114760 | : path_(path), fd_(-1), stat_obtained_(false) { } |
53 | 391366 | ~FileIngestionSource() { } | |
54 | |||
55 | 208650 | std::string GetPath() const { return path_; } | |
56 | ✗ | virtual bool IsRealFile() const { return true; } | |
57 | |||
58 | 34280 | bool Open() { | |
59 | 34280 | fd_ = open(path_.c_str(), O_RDONLY); | |
60 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 34280 times.
|
34280 | if (fd_ < 0) { |
61 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, | |
62 | "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(), | ||
63 | ✗ | errno, strerror(errno)); | |
64 | ✗ | return false; | |
65 | } | ||
66 | 34280 | return true; | |
67 | } | ||
68 | |||
69 | 12454642 | ssize_t Read(void *buffer, size_t nbyte) { | |
70 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12454642 times.
|
12454642 | assert(fd_ >= 0); |
71 | 12454642 | const ssize_t read = SafeRead(fd_, buffer, nbyte); | |
72 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 12455083 times.
|
12455083 | if (read < 0) { |
73 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s", | |
74 | ✗ | path_.c_str(), errno, strerror(errno)); | |
75 | } | ||
76 | 12455082 | return read; | |
77 | } | ||
78 | |||
79 | 34280 | bool Close() { | |
80 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 34280 times.
|
34280 | if (fd_ == -1) |
81 | ✗ | return true; | |
82 | |||
83 | // tell to the OS that we are not going to access the file again in the | ||
84 | // foreaseable future. | ||
85 | 34280 | (void)platform_invalidate_kcache(fd_, 0, 0); | |
86 | |||
87 | 34280 | const int ret = close(fd_); | |
88 | 34279 | fd_ = -1; | |
89 | 34279 | return (ret == 0); | |
90 | } | ||
91 | |||
92 | 9031 | bool GetSize(uint64_t *size) { | |
93 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 9031 times.
|
9031 | if (stat_obtained_) { |
94 | ✗ | *size = stat_.st_size; | |
95 | ✗ | return true; | |
96 | } | ||
97 | 9031 | const int ret = platform_fstat(fd_, &stat_); | |
98 |
1/2✓ Branch 0 taken 9031 times.
✗ Branch 1 not taken.
|
9031 | if (ret == 0) { |
99 | 9031 | *size = stat_.st_size; | |
100 | 9031 | stat_obtained_ = true; | |
101 | 9031 | return true; | |
102 | } | ||
103 | ✗ | return false; | |
104 | } | ||
105 | |||
106 | private: | ||
107 | const std::string path_; | ||
108 | int fd_; | ||
109 | platform_stat64 stat_; | ||
110 | bool stat_obtained_; | ||
111 | }; | ||
112 | |||
113 | |||
114 | /** | ||
115 | * Wraps around existing memory without owning it. | ||
116 | */ | ||
117 | class MemoryIngestionSource : public IngestionSource { | ||
118 | public: | ||
119 | 250062 | MemoryIngestionSource(const std::string &p, const unsigned char *d, | |
120 | unsigned s) | ||
121 |
1/2✓ Branch 2 taken 250062 times.
✗ Branch 3 not taken.
|
250062 | : path_(p), data_(d), size_(s), pos_(0) { } |
122 | 999232 | virtual ~MemoryIngestionSource() { } | |
123 | 1240789 | virtual std::string GetPath() const { return path_; } | |
124 | ✗ | virtual bool IsRealFile() const { return false; } | |
125 | 249901 | virtual bool Open() { return true; } | |
126 | 499559 | virtual ssize_t Read(void *buffer, size_t nbyte) { | |
127 | 499559 | const size_t remaining = size_ - pos_; | |
128 | 499559 | const size_t size = std::min(remaining, nbyte); | |
129 |
2/2✓ Branch 0 taken 250705 times.
✓ Branch 1 taken 248648 times.
|
499353 | if (size > 0) |
130 | 250705 | memcpy(buffer, data_ + pos_, size); | |
131 | 499353 | pos_ += size; | |
132 | 499353 | return static_cast<ssize_t>(size); | |
133 | } | ||
134 | 249671 | virtual bool Close() { return true; } | |
135 | 249828 | virtual bool GetSize(uint64_t *size) { | |
136 | 249828 | *size = size_; | |
137 | 249828 | return true; | |
138 | } | ||
139 | |||
140 | private: | ||
141 | std::string path_; | ||
142 | const unsigned char *data_; | ||
143 | unsigned size_; | ||
144 | unsigned pos_; | ||
145 | }; | ||
146 | |||
147 | |||
148 | /** | ||
149 | * Uses an std::string as data buffer | ||
150 | */ | ||
151 | class StringIngestionSource : public IngestionSource { | ||
152 | public: | ||
153 | 62 | explicit StringIngestionSource(const std::string &data) | |
154 | 124 | : data_(data) | |
155 |
2/4✓ Branch 3 taken 62 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 62 times.
✗ Branch 7 not taken.
|
62 | , source_("MEM", reinterpret_cast<const unsigned char *>(data_.data()), |
156 |
1/2✓ Branch 2 taken 62 times.
✗ Branch 3 not taken.
|
124 | data_.length()) { } |
157 | ✗ | StringIngestionSource(const std::string &data, const std::string &filename) | |
158 | ✗ | : data_(data) | |
159 | ✗ | , source_(filename, reinterpret_cast<const unsigned char *>(data_.data()), | |
160 | ✗ | data_.length()) { } | |
161 | ✗ | virtual ~StringIngestionSource() { } | |
162 | 76 | virtual std::string GetPath() const { return source_.GetPath(); } | |
163 | ✗ | virtual bool IsRealFile() const { return false; } | |
164 | 62 | virtual bool Open() { return source_.Open(); } | |
165 | 806 | virtual ssize_t Read(void *buffer, size_t nbyte) { | |
166 | 806 | return source_.Read(buffer, nbyte); | |
167 | } | ||
168 | 62 | virtual bool Close() { return source_.Close(); } | |
169 | 14 | virtual bool GetSize(uint64_t *size) { return source_.GetSize(size); } | |
170 | |||
171 | private: | ||
172 | std::string data_; | ||
173 | MemoryIngestionSource source_; | ||
174 | }; | ||
175 | |||
176 | |||
177 | class TarIngestionSource : public IngestionSource { | ||
178 | public: | ||
179 | 126 | TarIngestionSource(const std::string &path, struct archive *archive, | |
180 | struct archive_entry *entry, Signal *read_archive_signal) | ||
181 | 252 | : path_(path) | |
182 | 126 | , archive_(archive) | |
183 |
1/2✓ Branch 2 taken 126 times.
✗ Branch 3 not taken.
|
126 | , read_archive_signal_(read_archive_signal) { |
184 |
2/4✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 126 times.
|
126 | assert(read_archive_signal_->IsSleeping()); |
185 |
1/2✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
|
126 | const struct stat *stat_ = archive_entry_stat(entry); |
186 | 126 | size_ = stat_->st_size; | |
187 | 126 | } | |
188 | |||
189 | ✗ | std::string GetPath() const { return path_; } | |
190 | ✗ | virtual bool IsRealFile() const { return false; } | |
191 | |||
192 | ✗ | bool Open() { | |
193 | assert(size_ >= 0); | ||
194 | ✗ | return true; | |
195 | } | ||
196 | |||
197 | ✗ | ssize_t Read(void *external_buffer, size_t nbytes) { | |
198 | ✗ | const ssize_t read = archive_read_data(archive_, external_buffer, nbytes); | |
199 | ✗ | if (read < 0) { | |
200 | ✗ | errno = archive_errno(archive_); | |
201 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, | |
202 | "failed to read data from the tar entry: %s (%d)\n %s", | ||
203 | ✗ | path_.c_str(), errno, archive_error_string(archive_)); | |
204 | } | ||
205 | ✗ | return read; | |
206 | } | ||
207 | |||
208 | 126 | bool Close() { | |
209 | 126 | read_archive_signal_->Wakeup(); | |
210 | 126 | return true; | |
211 | } | ||
212 | |||
213 | ✗ | bool GetSize(uint64_t *size) { | |
214 | ✗ | *size = size_; | |
215 | ✗ | return true; | |
216 | } | ||
217 | |||
218 | private: | ||
219 | std::string path_; | ||
220 | struct archive *archive_; | ||
221 | uint64_t size_; | ||
222 | Signal *read_archive_signal_; | ||
223 | }; | ||
224 | |||
225 | #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
226 |