Directory: | cvmfs/ |
---|---|
File: | cvmfs/ingestion/ingestion_source.h |
Date: | 2025-02-02 02:34:22 |
Exec | Total | Coverage | |
---|---|---|---|
Lines: | 61 | 91 | 67.0% |
Branches: | 17 | 38 | 44.7% |
Line | Branch | Exec | Source |
---|---|---|---|
1 | /** | ||
2 | * This file is part of the CernVM File System. | ||
3 | */ | ||
4 | |||
5 | #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
6 | #define CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
7 | |||
8 | #include <fcntl.h> | ||
9 | #include <pthread.h> | ||
10 | #include <unistd.h> | ||
11 | |||
12 | #include <algorithm> | ||
13 | #include <cassert> | ||
14 | #include <cerrno> | ||
15 | #include <cstdio> | ||
16 | #include <string> | ||
17 | |||
18 | #include "duplex_libarchive.h" | ||
19 | #include "util/concurrency.h" | ||
20 | #include "util/logging.h" | ||
21 | #include "util/platform.h" | ||
22 | #include "util/posix.h" | ||
23 | #include "util/single_copy.h" | ||
24 | |||
25 | /* | ||
26 | * The purpose of this class is to add a common interface for object that are | ||
27 | * ingested by the pipeline. Hence the pipeline is able to ingest everything | ||
28 | * that implements this interface. | ||
29 | * The ownership of new IngestionSource objects is transferred from their creator | ||
30 | * directly to the pipeline itself that will take care of deallocating | ||
31 | * everything. | ||
32 | * The pipeline is multithreaded so it is very likely that the code implement in | ||
33 | * this interface will be called inside a different thread from the one that | ||
34 | * originally | ||
35 | * allocated the object, hence is necessary to take extra care in the use of | ||
36 | * locks, prefer conditional variables. | ||
37 | */ | ||
38 | class IngestionSource : SingleCopy { | ||
39 | public: | ||
40 | 508180 | virtual ~IngestionSource() {} | |
41 | virtual std::string GetPath() const = 0; | ||
42 | virtual bool IsRealFile() const = 0; | ||
43 | virtual bool Open() = 0; | ||
44 | virtual ssize_t Read(void* buffer, size_t nbyte) = 0; | ||
45 | virtual bool Close() = 0; | ||
46 | virtual bool GetSize(uint64_t* size) = 0; | ||
47 | }; | ||
48 | |||
49 | class FileIngestionSource : public IngestionSource { | ||
50 | public: | ||
51 | 4293 | explicit FileIngestionSource(const std::string& path) | |
52 |
1/2✓ Branch 2 taken 4293 times.
✗ Branch 3 not taken.
|
4293 | : path_(path), fd_(-1), stat_obtained_(false) {} |
53 | 15026 | ~FileIngestionSource() {} | |
54 | |||
55 | 8246 | std::string GetPath() const { return path_; } | |
56 | ✗ | virtual bool IsRealFile() const { return true; } | |
57 | |||
58 | 1121 | bool Open() { | |
59 | 1121 | fd_ = open(path_.c_str(), O_RDONLY); | |
60 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1121 times.
|
1121 | if (fd_ < 0) { |
61 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, | |
62 | "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(), | ||
63 | ✗ | errno, strerror(errno)); | |
64 | ✗ | return false; | |
65 | } | ||
66 | 1121 | return true; | |
67 | } | ||
68 | |||
69 | 452706 | ssize_t Read(void* buffer, size_t nbyte) { | |
70 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 452706 times.
|
452706 | assert(fd_ >= 0); |
71 | 452706 | ssize_t read = SafeRead(fd_, buffer, nbyte); | |
72 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 453516 times.
|
453516 | if (read < 0) { |
73 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s", | |
74 | ✗ | path_.c_str(), errno, strerror(errno)); | |
75 | } | ||
76 | 453426 | return read; | |
77 | } | ||
78 | |||
79 | 1121 | bool Close() { | |
80 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 1121 times.
|
1121 | if (fd_ == -1) return true; |
81 | |||
82 | // tell to the OS that we are not going to access the file again in the | ||
83 | // foreaseable future. | ||
84 | 1121 | (void)platform_invalidate_kcache(fd_, 0, 0); | |
85 | |||
86 | 1121 | int ret = close(fd_); | |
87 | 1121 | fd_ = -1; | |
88 | 1121 | return (ret == 0); | |
89 | } | ||
90 | |||
91 | 593 | bool GetSize(uint64_t* size) { | |
92 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 593 times.
|
593 | if (stat_obtained_) { |
93 | ✗ | *size = stat_.st_size; | |
94 | ✗ | return true; | |
95 | } | ||
96 | 593 | int ret = platform_fstat(fd_, &stat_); | |
97 |
1/2✓ Branch 0 taken 593 times.
✗ Branch 1 not taken.
|
593 | if (ret == 0) { |
98 | 593 | *size = stat_.st_size; | |
99 | 593 | stat_obtained_ = true; | |
100 | 593 | return true; | |
101 | } | ||
102 | ✗ | return false; | |
103 | } | ||
104 | |||
105 | private: | ||
106 | const std::string path_; | ||
107 | int fd_; | ||
108 | platform_stat64 stat_; | ||
109 | bool stat_obtained_; | ||
110 | }; | ||
111 | |||
112 | |||
113 | /** | ||
114 | * Wraps around existing memory without owning it. | ||
115 | */ | ||
116 | class MemoryIngestionSource : public IngestionSource { | ||
117 | public: | ||
118 | 250002 | MemoryIngestionSource( | |
119 | const std::string &p, const unsigned char *d, unsigned s) | ||
120 |
1/2✓ Branch 2 taken 250002 times.
✗ Branch 3 not taken.
|
250002 | : path_(p), data_(d), size_(s), pos_(0) {} |
121 | 999248 | virtual ~MemoryIngestionSource() {} | |
122 | 1240630 | virtual std::string GetPath() const { return path_; } | |
123 | ✗ | virtual bool IsRealFile() const { return false; } | |
124 | 249883 | virtual bool Open() { return true; } | |
125 | 498911 | virtual ssize_t Read(void* buffer, size_t nbyte) { | |
126 | 498911 | size_t remaining = size_ - pos_; | |
127 | 498911 | size_t size = std::min(remaining, nbyte); | |
128 |
2/2✓ Branch 0 taken 249966 times.
✓ Branch 1 taken 248742 times.
|
498708 | if (size > 0) memcpy(buffer, data_ + pos_, size); |
129 | 498708 | pos_ += size; | |
130 | 498708 | return static_cast<ssize_t>(size); | |
131 | } | ||
132 | 249658 | virtual bool Close() { return true; } | |
133 | 249912 | virtual bool GetSize(uint64_t* size) { *size = size_; return true; } | |
134 | |||
135 | private: | ||
136 | std::string path_; | ||
137 | const unsigned char *data_; | ||
138 | unsigned size_; | ||
139 | unsigned pos_; | ||
140 | }; | ||
141 | |||
142 | |||
143 | /** | ||
144 | * Uses an std::string as data buffer | ||
145 | */ | ||
146 | class StringIngestionSource : public IngestionSource { | ||
147 | public: | ||
148 | 2 | explicit StringIngestionSource(const std::string& data) | |
149 | 4 | : data_(data), | |
150 |
2/4✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
|
2 | source_("MEM", reinterpret_cast<const unsigned char*>(data_.data()), |
151 |
1/2✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
|
4 | data_.length()) {} |
152 | ✗ | StringIngestionSource(const std::string& data, const std::string& filename) | |
153 | ✗ | : data_(data), | |
154 | ✗ | source_(filename, reinterpret_cast<const unsigned char*>(data_.data()), | |
155 | ✗ | data_.length()) {} | |
156 | ✗ | virtual ~StringIngestionSource() {} | |
157 | 3 | virtual std::string GetPath() const { return source_.GetPath(); } | |
158 | ✗ | virtual bool IsRealFile() const { return false; } | |
159 | 2 | virtual bool Open() { return source_.Open(); } | |
160 | 26 | virtual ssize_t Read(void* buffer, size_t nbyte) { | |
161 | 26 | return source_.Read(buffer, nbyte); | |
162 | } | ||
163 | 2 | virtual bool Close() { return source_.Close(); } | |
164 | 1 | virtual bool GetSize(uint64_t* size) { return source_.GetSize(size); } | |
165 | |||
166 | private: | ||
167 | std::string data_; | ||
168 | MemoryIngestionSource source_; | ||
169 | }; | ||
170 | |||
171 | |||
172 | class TarIngestionSource : public IngestionSource { | ||
173 | public: | ||
174 | 9 | TarIngestionSource(const std::string &path, struct archive* archive, | |
175 | struct archive_entry* entry, Signal* read_archive_signal) | ||
176 | 18 | : path_(path), | |
177 | 9 | archive_(archive), | |
178 |
1/2✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
|
9 | read_archive_signal_(read_archive_signal) { |
179 |
2/4✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 9 times.
|
9 | assert(read_archive_signal_->IsSleeping()); |
180 |
1/2✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
|
9 | const struct stat* stat_ = archive_entry_stat(entry); |
181 | 9 | size_ = stat_->st_size; | |
182 | 9 | } | |
183 | |||
184 | ✗ | std::string GetPath() const { return path_; } | |
185 | ✗ | virtual bool IsRealFile() const { return false; } | |
186 | |||
187 | ✗ | bool Open() { | |
188 | assert(size_ >= 0); | ||
189 | ✗ | return true; | |
190 | } | ||
191 | |||
192 | ✗ | ssize_t Read(void* external_buffer, size_t nbytes) { | |
193 | ✗ | ssize_t read = archive_read_data(archive_, external_buffer, nbytes); | |
194 | ✗ | if (read < 0) { | |
195 | ✗ | errno = archive_errno(archive_); | |
196 | ✗ | LogCvmfs(kLogCvmfs, kLogStderr, | |
197 | "failed to read data from the tar entry: %s (%d)\n %s", | ||
198 | ✗ | path_.c_str(), errno, archive_error_string(archive_)); | |
199 | } | ||
200 | ✗ | return read; | |
201 | } | ||
202 | |||
203 | 9 | bool Close() { | |
204 | 9 | read_archive_signal_->Wakeup(); | |
205 | 9 | return true; | |
206 | } | ||
207 | |||
208 | ✗ | bool GetSize(uint64_t* size) { | |
209 | ✗ | *size = size_; | |
210 | ✗ | return true; | |
211 | } | ||
212 | |||
213 | private: | ||
214 | std::string path_; | ||
215 | struct archive* archive_; | ||
216 | uint64_t size_; | ||
217 | Signal* read_archive_signal_; | ||
218 | }; | ||
219 | |||
220 | #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_ | ||
221 |