GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/ingestion_source.h
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 64 95 67.4%
Branches: 17 38 44.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_
6 #define CVMFS_INGESTION_INGESTION_SOURCE_H_
7
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <unistd.h>
11
12 #include <algorithm>
13 #include <cassert>
14 #include <cerrno>
15 #include <cstdio>
16 #include <string>
17
18 #include "duplex_libarchive.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/posix.h"
23 #include "util/single_copy.h"
24
25 /*
26 * The purpose of this class is to add a common interface for object that are
27 * ingested by the pipeline. Hence the pipeline is able to ingest everything
28 * that implements this interface.
29 * The ownership of new IngestionSource objects is transferred from their
30 * creator directly to the pipeline itself that will take care of deallocating
31 * everything.
32 * The pipeline is multithreaded so it is very likely that the code implement in
33 * this interface will be called inside a different thread from the one that
34 * originally
35 * allocated the object, hence is necessary to take extra care in the use of
36 * locks, prefer conditional variables.
37 */
38 class IngestionSource : SingleCopy {
39 public:
40 2620422 virtual ~IngestionSource() { }
41 virtual std::string GetPath() const = 0;
42 virtual bool IsRealFile() const = 0;
43 virtual bool Open() = 0;
44 virtual ssize_t Read(void *buffer, size_t nbyte) = 0;
45 virtual bool Close() = 0;
46 virtual bool GetSize(uint64_t *size) = 0;
47 };
48
49 class FileIngestionSource : public IngestionSource {
50 public:
51 61063 explicit FileIngestionSource(const std::string &path)
52
1/2
✓ Branch 2 taken 61063 times.
✗ Branch 3 not taken.
61063 : path_(path), fd_(-1), stat_obtained_(false) { }
53 228510 ~FileIngestionSource() { }
54
55 122643 std::string GetPath() const { return path_; }
56 virtual bool IsRealFile() const { return true; }
57
58 8691 bool Open() {
59 8691 fd_ = open(path_.c_str(), O_RDONLY);
60
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8691 times.
8691 if (fd_ < 0) {
61 LogCvmfs(kLogCvmfs, kLogStderr,
62 "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(),
63 errno, strerror(errno));
64 return false;
65 }
66 8691 return true;
67 }
68
69 2669627 ssize_t Read(void *buffer, size_t nbyte) {
70
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2669627 times.
2669627 assert(fd_ >= 0);
71 2669627 const ssize_t read = SafeRead(fd_, buffer, nbyte);
72
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2671672 times.
2671672 if (read < 0) {
73 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s",
74 path_.c_str(), errno, strerror(errno));
75 }
76 2671682 return read;
77 }
78
79 8691 bool Close() {
80
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 8691 times.
8691 if (fd_ == -1)
81 return true;
82
83 // tell to the OS that we are not going to access the file again in the
84 // foreaseable future.
85 8691 (void)platform_invalidate_kcache(fd_, 0, 0);
86
87 8691 const int ret = close(fd_);
88 8691 fd_ = -1;
89 8691 return (ret == 0);
90 }
91
92 4541 bool GetSize(uint64_t *size) {
93
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4541 times.
4541 if (stat_obtained_) {
94 *size = stat_.st_size;
95 return true;
96 }
97 4541 const int ret = platform_fstat(fd_, &stat_);
98
1/2
✓ Branch 0 taken 4541 times.
✗ Branch 1 not taken.
4541 if (ret == 0) {
99 4541 *size = stat_.st_size;
100 4541 stat_obtained_ = true;
101 4541 return true;
102 }
103 return false;
104 }
105
106 private:
107 const std::string path_;
108 int fd_;
109 platform_stat64 stat_;
110 bool stat_obtained_;
111 };
112
113
114 /**
115 * Wraps around existing memory without owning it.
116 */
117 class MemoryIngestionSource : public IngestionSource {
118 public:
119 1250012 MemoryIngestionSource(const std::string &p, const unsigned char *d,
120 unsigned s)
121
1/2
✓ Branch 2 taken 1250012 times.
✗ Branch 3 not taken.
1250012 : path_(p), data_(d), size_(s), pos_(0) { }
122 4998050 virtual ~MemoryIngestionSource() { }
123 6211697 virtual std::string GetPath() const { return path_; }
124 virtual bool IsRealFile() const { return false; }
125 1249222 virtual bool Open() { return true; }
126 2494331 virtual ssize_t Read(void *buffer, size_t nbyte) {
127 2494331 const size_t remaining = size_ - pos_;
128 2494331 const size_t size = std::min(remaining, nbyte);
129
2/2
✓ Branch 0 taken 1249781 times.
✓ Branch 1 taken 1243470 times.
2493251 if (size > 0)
130 1249781 memcpy(buffer, data_ + pos_, size);
131 2493251 pos_ += size;
132 2493251 return static_cast<ssize_t>(size);
133 }
134 1248122 virtual bool Close() { return true; }
135 1249190 virtual bool GetSize(uint64_t *size) {
136 1249190 *size = size_;
137 1249190 return true;
138 }
139
140 private:
141 std::string path_;
142 const unsigned char *data_;
143 unsigned size_;
144 unsigned pos_;
145 };
146
147
148 /**
149 * Uses an std::string as data buffer
150 */
151 class StringIngestionSource : public IngestionSource {
152 public:
153 12 explicit StringIngestionSource(const std::string &data)
154 24 : data_(data)
155
2/4
✓ Branch 3 taken 12 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 12 times.
✗ Branch 7 not taken.
12 , source_("MEM", reinterpret_cast<const unsigned char *>(data_.data()),
156
1/2
✓ Branch 2 taken 12 times.
✗ Branch 3 not taken.
24 data_.length()) { }
157 StringIngestionSource(const std::string &data, const std::string &filename)
158 : data_(data)
159 , source_(filename, reinterpret_cast<const unsigned char *>(data_.data()),
160 data_.length()) { }
161 virtual ~StringIngestionSource() { }
162 17 virtual std::string GetPath() const { return source_.GetPath(); }
163 virtual bool IsRealFile() const { return false; }
164 12 virtual bool Open() { return source_.Open(); }
165 156 virtual ssize_t Read(void *buffer, size_t nbyte) {
166 156 return source_.Read(buffer, nbyte);
167 }
168 12 virtual bool Close() { return source_.Close(); }
169 5 virtual bool GetSize(uint64_t *size) { return source_.GetSize(size); }
170
171 private:
172 std::string data_;
173 MemoryIngestionSource source_;
174 };
175
176
177 class TarIngestionSource : public IngestionSource {
178 public:
179 387 TarIngestionSource(const std::string &path, struct archive *archive,
180 struct archive_entry *entry, Signal *read_archive_signal)
181 774 : path_(path)
182 387 , archive_(archive)
183
1/2
✓ Branch 2 taken 387 times.
✗ Branch 3 not taken.
387 , read_archive_signal_(read_archive_signal) {
184
2/4
✓ Branch 1 taken 387 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 387 times.
387 assert(read_archive_signal_->IsSleeping());
185
1/2
✓ Branch 1 taken 387 times.
✗ Branch 2 not taken.
387 const struct stat *stat_ = archive_entry_stat(entry);
186 387 size_ = stat_->st_size;
187 387 }
188
189 std::string GetPath() const { return path_; }
190 virtual bool IsRealFile() const { return false; }
191
192 bool Open() {
193 assert(size_ >= 0);
194 return true;
195 }
196
197 ssize_t Read(void *external_buffer, size_t nbytes) {
198 const ssize_t read = archive_read_data(archive_, external_buffer, nbytes);
199 if (read < 0) {
200 errno = archive_errno(archive_);
201 LogCvmfs(kLogCvmfs, kLogStderr,
202 "failed to read data from the tar entry: %s (%d)\n %s",
203 path_.c_str(), errno, archive_error_string(archive_));
204 }
205 return read;
206 }
207
208 387 bool Close() {
209 387 read_archive_signal_->Wakeup();
210 387 return true;
211 }
212
213 bool GetSize(uint64_t *size) {
214 *size = size_;
215 return true;
216 }
217
218 private:
219 std::string path_;
220 struct archive *archive_;
221 uint64_t size_;
222 Signal *read_archive_signal_;
223 };
224
225 #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_
226