GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/ingestion_source.h
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 64 95 67.4%
Branches: 17 38 44.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_
6 #define CVMFS_INGESTION_INGESTION_SOURCE_H_
7
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <unistd.h>
11
12 #include <algorithm>
13 #include <cassert>
14 #include <cerrno>
15 #include <cstdio>
16 #include <string>
17
18 #include "duplex_libarchive.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/posix.h"
23 #include "util/single_copy.h"
24
25 /*
26 * The purpose of this class is to add a common interface for object that are
27 * ingested by the pipeline. Hence the pipeline is able to ingest everything
28 * that implements this interface.
29 * The ownership of new IngestionSource objects is transferred from their
30 * creator directly to the pipeline itself that will take care of deallocating
31 * everything.
32 * The pipeline is multithreaded so it is very likely that the code implement in
33 * this interface will be called inside a different thread from the one that
34 * originally
35 * allocated the object, hence is necessary to take extra care in the use of
36 * locks, prefer conditional variables.
37 */
38 class IngestionSource : SingleCopy {
39 public:
40 727758 virtual ~IngestionSource() { }
41 virtual std::string GetPath() const = 0;
42 virtual bool IsRealFile() const = 0;
43 virtual bool Open() = 0;
44 virtual ssize_t Read(void *buffer, size_t nbyte) = 0;
45 virtual bool Close() = 0;
46 virtual bool GetSize(uint64_t *size) = 0;
47 };
48
49 class FileIngestionSource : public IngestionSource {
50 public:
51 114760 explicit FileIngestionSource(const std::string &path)
52
1/2
✓ Branch 2 taken 114760 times.
✗ Branch 3 not taken.
114760 : path_(path), fd_(-1), stat_obtained_(false) { }
53 391366 ~FileIngestionSource() { }
54
55 208650 std::string GetPath() const { return path_; }
56 virtual bool IsRealFile() const { return true; }
57
58 34280 bool Open() {
59 34280 fd_ = open(path_.c_str(), O_RDONLY);
60
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 34280 times.
34280 if (fd_ < 0) {
61 LogCvmfs(kLogCvmfs, kLogStderr,
62 "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(),
63 errno, strerror(errno));
64 return false;
65 }
66 34280 return true;
67 }
68
69 12454642 ssize_t Read(void *buffer, size_t nbyte) {
70
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12454642 times.
12454642 assert(fd_ >= 0);
71 12454642 const ssize_t read = SafeRead(fd_, buffer, nbyte);
72
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12455083 times.
12455083 if (read < 0) {
73 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s",
74 path_.c_str(), errno, strerror(errno));
75 }
76 12455082 return read;
77 }
78
79 34280 bool Close() {
80
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 34280 times.
34280 if (fd_ == -1)
81 return true;
82
83 // tell to the OS that we are not going to access the file again in the
84 // foreaseable future.
85 34280 (void)platform_invalidate_kcache(fd_, 0, 0);
86
87 34280 const int ret = close(fd_);
88 34279 fd_ = -1;
89 34279 return (ret == 0);
90 }
91
92 9031 bool GetSize(uint64_t *size) {
93
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9031 times.
9031 if (stat_obtained_) {
94 *size = stat_.st_size;
95 return true;
96 }
97 9031 const int ret = platform_fstat(fd_, &stat_);
98
1/2
✓ Branch 0 taken 9031 times.
✗ Branch 1 not taken.
9031 if (ret == 0) {
99 9031 *size = stat_.st_size;
100 9031 stat_obtained_ = true;
101 9031 return true;
102 }
103 return false;
104 }
105
106 private:
107 const std::string path_;
108 int fd_;
109 platform_stat64 stat_;
110 bool stat_obtained_;
111 };
112
113
114 /**
115 * Wraps around existing memory without owning it.
116 */
117 class MemoryIngestionSource : public IngestionSource {
118 public:
119 250062 MemoryIngestionSource(const std::string &p, const unsigned char *d,
120 unsigned s)
121
1/2
✓ Branch 2 taken 250062 times.
✗ Branch 3 not taken.
250062 : path_(p), data_(d), size_(s), pos_(0) { }
122 999232 virtual ~MemoryIngestionSource() { }
123 1240789 virtual std::string GetPath() const { return path_; }
124 virtual bool IsRealFile() const { return false; }
125 249901 virtual bool Open() { return true; }
126 499559 virtual ssize_t Read(void *buffer, size_t nbyte) {
127 499559 const size_t remaining = size_ - pos_;
128 499559 const size_t size = std::min(remaining, nbyte);
129
2/2
✓ Branch 0 taken 250705 times.
✓ Branch 1 taken 248648 times.
499353 if (size > 0)
130 250705 memcpy(buffer, data_ + pos_, size);
131 499353 pos_ += size;
132 499353 return static_cast<ssize_t>(size);
133 }
134 249671 virtual bool Close() { return true; }
135 249828 virtual bool GetSize(uint64_t *size) {
136 249828 *size = size_;
137 249828 return true;
138 }
139
140 private:
141 std::string path_;
142 const unsigned char *data_;
143 unsigned size_;
144 unsigned pos_;
145 };
146
147
148 /**
149 * Uses an std::string as data buffer
150 */
151 class StringIngestionSource : public IngestionSource {
152 public:
153 62 explicit StringIngestionSource(const std::string &data)
154 124 : data_(data)
155
2/4
✓ Branch 3 taken 62 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 62 times.
✗ Branch 7 not taken.
62 , source_("MEM", reinterpret_cast<const unsigned char *>(data_.data()),
156
1/2
✓ Branch 2 taken 62 times.
✗ Branch 3 not taken.
124 data_.length()) { }
157 StringIngestionSource(const std::string &data, const std::string &filename)
158 : data_(data)
159 , source_(filename, reinterpret_cast<const unsigned char *>(data_.data()),
160 data_.length()) { }
161 virtual ~StringIngestionSource() { }
162 76 virtual std::string GetPath() const { return source_.GetPath(); }
163 virtual bool IsRealFile() const { return false; }
164 62 virtual bool Open() { return source_.Open(); }
165 806 virtual ssize_t Read(void *buffer, size_t nbyte) {
166 806 return source_.Read(buffer, nbyte);
167 }
168 62 virtual bool Close() { return source_.Close(); }
169 14 virtual bool GetSize(uint64_t *size) { return source_.GetSize(size); }
170
171 private:
172 std::string data_;
173 MemoryIngestionSource source_;
174 };
175
176
177 class TarIngestionSource : public IngestionSource {
178 public:
179 126 TarIngestionSource(const std::string &path, struct archive *archive,
180 struct archive_entry *entry, Signal *read_archive_signal)
181 252 : path_(path)
182 126 , archive_(archive)
183
1/2
✓ Branch 2 taken 126 times.
✗ Branch 3 not taken.
126 , read_archive_signal_(read_archive_signal) {
184
2/4
✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 126 times.
126 assert(read_archive_signal_->IsSleeping());
185
1/2
✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
126 const struct stat *stat_ = archive_entry_stat(entry);
186 126 size_ = stat_->st_size;
187 126 }
188
189 std::string GetPath() const { return path_; }
190 virtual bool IsRealFile() const { return false; }
191
192 bool Open() {
193 assert(size_ >= 0);
194 return true;
195 }
196
197 ssize_t Read(void *external_buffer, size_t nbytes) {
198 const ssize_t read = archive_read_data(archive_, external_buffer, nbytes);
199 if (read < 0) {
200 errno = archive_errno(archive_);
201 LogCvmfs(kLogCvmfs, kLogStderr,
202 "failed to read data from the tar entry: %s (%d)\n %s",
203 path_.c_str(), errno, archive_error_string(archive_));
204 }
205 return read;
206 }
207
208 126 bool Close() {
209 126 read_archive_signal_->Wakeup();
210 126 return true;
211 }
212
213 bool GetSize(uint64_t *size) {
214 *size = size_;
215 return true;
216 }
217
218 private:
219 std::string path_;
220 struct archive *archive_;
221 uint64_t size_;
222 Signal *read_archive_signal_;
223 };
224
225 #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_
226