GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/ingestion_source.h
Date: 2025-09-28 02:35:26
Exec Total Coverage
Lines: 64 95 67.4%
Branches: 17 38 44.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_
6 #define CVMFS_INGESTION_INGESTION_SOURCE_H_
7
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <unistd.h>
11
12 #include <algorithm>
13 #include <cassert>
14 #include <cerrno>
15 #include <cstdio>
16 #include <string>
17
18 #include "duplex_libarchive.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/posix.h"
23 #include "util/single_copy.h"
24
25 /*
26 * The purpose of this class is to add a common interface for object that are
27 * ingested by the pipeline. Hence the pipeline is able to ingest everything
28 * that implements this interface.
29 * The ownership of new IngestionSource objects is transferred from their
30 * creator directly to the pipeline itself that will take care of deallocating
31 * everything.
32 * The pipeline is multithreaded so it is very likely that the code implement in
33 * this interface will be called inside a different thread from the one that
34 * originally
35 * allocated the object, hence is necessary to take extra care in the use of
36 * locks, prefer conditional variables.
37 */
38 class IngestionSource : SingleCopy {
39 public:
40 1615258 virtual ~IngestionSource() { }
41 virtual std::string GetPath() const = 0;
42 virtual bool IsRealFile() const = 0;
43 virtual bool Open() = 0;
44 virtual ssize_t Read(void *buffer, size_t nbyte) = 0;
45 virtual bool Close() = 0;
46 virtual bool GetSize(uint64_t *size) = 0;
47 };
48
49 class FileIngestionSource : public IngestionSource {
50 public:
51 58582 explicit FileIngestionSource(const std::string &path)
52
1/2
✓ Branch 2 taken 58582 times.
✗ Branch 3 not taken.
58582 : path_(path), fd_(-1), stat_obtained_(false) { }
53 227268 ~FileIngestionSource() { }
54
55 119041 std::string GetPath() const { return path_; }
56 virtual bool IsRealFile() const { return true; }
57
58 4570 bool Open() {
59 4570 fd_ = open(path_.c_str(), O_RDONLY);
60
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4570 times.
4570 if (fd_ < 0) {
61 LogCvmfs(kLogCvmfs, kLogStderr,
62 "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(),
63 errno, strerror(errno));
64 return false;
65 }
66 4570 return true;
67 }
68
69 965238 ssize_t Read(void *buffer, size_t nbyte) {
70
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 965238 times.
965238 assert(fd_ >= 0);
71 965238 const ssize_t read = SafeRead(fd_, buffer, nbyte);
72
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 966429 times.
966429 if (read < 0) {
73 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s",
74 path_.c_str(), errno, strerror(errno));
75 }
76 966318 return read;
77 }
78
79 4570 bool Close() {
80
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4570 times.
4570 if (fd_ == -1)
81 return true;
82
83 // tell to the OS that we are not going to access the file again in the
84 // foreaseable future.
85 4570 (void)platform_invalidate_kcache(fd_, 0, 0);
86
87 4570 const int ret = close(fd_);
88 4567 fd_ = -1;
89 4567 return (ret == 0);
90 }
91
92 2569 bool GetSize(uint64_t *size) {
93
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2569 times.
2569 if (stat_obtained_) {
94 *size = stat_.st_size;
95 return true;
96 }
97 2569 const int ret = platform_fstat(fd_, &stat_);
98
1/2
✓ Branch 0 taken 2569 times.
✗ Branch 1 not taken.
2569 if (ret == 0) {
99 2569 *size = stat_.st_size;
100 2569 stat_obtained_ = true;
101 2569 return true;
102 }
103 return false;
104 }
105
106 private:
107 const std::string path_;
108 int fd_;
109 platform_stat64 stat_;
110 bool stat_obtained_;
111 };
112
113
114 /**
115 * Wraps around existing memory without owning it.
116 */
117 class MemoryIngestionSource : public IngestionSource {
118 public:
119 750004 MemoryIngestionSource(const std::string &p, const unsigned char *d,
120 unsigned s)
121
1/2
✓ Branch 2 taken 750004 times.
✗ Branch 3 not taken.
750004 : path_(p), data_(d), size_(s), pos_(0) { }
122 2998332 virtual ~MemoryIngestionSource() { }
123 3724979 virtual std::string GetPath() const { return path_; }
124 virtual bool IsRealFile() const { return false; }
125 749704 virtual bool Open() { return true; }
126 1496356 virtual ssize_t Read(void *buffer, size_t nbyte) {
127 1496356 const size_t remaining = size_ - pos_;
128 1496356 const size_t size = std::min(remaining, nbyte);
129
2/2
✓ Branch 0 taken 749896 times.
✓ Branch 1 taken 746298 times.
1496194 if (size > 0)
130 749896 memcpy(buffer, data_ + pos_, size);
131 1496194 pos_ += size;
132 1496194 return static_cast<ssize_t>(size);
133 }
134 749164 virtual bool Close() { return true; }
135 749635 virtual bool GetSize(uint64_t *size) {
136 749635 *size = size_;
137 749635 return true;
138 }
139
140 private:
141 std::string path_;
142 const unsigned char *data_;
143 unsigned size_;
144 unsigned pos_;
145 };
146
147
148 /**
149 * Uses an std::string as data buffer
150 */
151 class StringIngestionSource : public IngestionSource {
152 public:
153 4 explicit StringIngestionSource(const std::string &data)
154 8 : data_(data)
155
2/4
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 4 times.
✗ Branch 7 not taken.
4 , source_("MEM", reinterpret_cast<const unsigned char *>(data_.data()),
156
1/2
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
8 data_.length()) { }
157 StringIngestionSource(const std::string &data, const std::string &filename)
158 : data_(data)
159 , source_(filename, reinterpret_cast<const unsigned char *>(data_.data()),
160 data_.length()) { }
161 virtual ~StringIngestionSource() { }
162 5 virtual std::string GetPath() const { return source_.GetPath(); }
163 virtual bool IsRealFile() const { return false; }
164 4 virtual bool Open() { return source_.Open(); }
165 52 virtual ssize_t Read(void *buffer, size_t nbyte) {
166 52 return source_.Read(buffer, nbyte);
167 }
168 4 virtual bool Close() { return source_.Close(); }
169 1 virtual bool GetSize(uint64_t *size) { return source_.GetSize(size); }
170
171 private:
172 std::string data_;
173 MemoryIngestionSource source_;
174 };
175
176
177 class TarIngestionSource : public IngestionSource {
178 public:
179 9 TarIngestionSource(const std::string &path, struct archive *archive,
180 struct archive_entry *entry, Signal *read_archive_signal)
181 18 : path_(path)
182 9 , archive_(archive)
183
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 , read_archive_signal_(read_archive_signal) {
184
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 9 times.
9 assert(read_archive_signal_->IsSleeping());
185
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 const struct stat *stat_ = archive_entry_stat(entry);
186 9 size_ = stat_->st_size;
187 9 }
188
189 std::string GetPath() const { return path_; }
190 virtual bool IsRealFile() const { return false; }
191
192 bool Open() {
193 assert(size_ >= 0);
194 return true;
195 }
196
197 ssize_t Read(void *external_buffer, size_t nbytes) {
198 const ssize_t read = archive_read_data(archive_, external_buffer, nbytes);
199 if (read < 0) {
200 errno = archive_errno(archive_);
201 LogCvmfs(kLogCvmfs, kLogStderr,
202 "failed to read data from the tar entry: %s (%d)\n %s",
203 path_.c_str(), errno, archive_error_string(archive_));
204 }
205 return read;
206 }
207
208 9 bool Close() {
209 9 read_archive_signal_->Wakeup();
210 9 return true;
211 }
212
213 bool GetSize(uint64_t *size) {
214 *size = size_;
215 return true;
216 }
217
218 private:
219 std::string path_;
220 struct archive *archive_;
221 uint64_t size_;
222 Signal *read_archive_signal_;
223 };
224
225 #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_
226