GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/ingestion/ingestion_source.h
Date: 2024-04-28 02:33:07
Exec Total Coverage
Lines: 61 91 67.0%
Branches: 17 38 44.7%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System.
3 */
4
5 #ifndef CVMFS_INGESTION_INGESTION_SOURCE_H_
6 #define CVMFS_INGESTION_INGESTION_SOURCE_H_
7
8 #include <fcntl.h>
9 #include <pthread.h>
10 #include <unistd.h>
11
12 #include <algorithm>
13 #include <cassert>
14 #include <cerrno>
15 #include <cstdio>
16 #include <string>
17
18 #include "duplex_libarchive.h"
19 #include "util/concurrency.h"
20 #include "util/logging.h"
21 #include "util/platform.h"
22 #include "util/posix.h"
23 #include "util/single_copy.h"
24
25 /*
26 * The purpose of this class is to add a common interface for object that are
27 * ingested by the pipeline. Hence the pipeline is able to ingest everything
28 * that implements this interface.
29 * The ownership of new IngestionSource objects is transferred from their creator
30 * directly to the pipeline itself that will take care of deallocating
31 * everything.
32 * The pipeline is multithreaded so it is very likely that the code implement in
33 * this interface will be called inside a different thread from the one that
34 * originally
35 * allocated the object, hence is necessary to take extra care in the use of
36 * locks, prefer conditional variables.
37 */
38 class IngestionSource : SingleCopy {
39 public:
40 508464 virtual ~IngestionSource() {}
41 virtual std::string GetPath() const = 0;
42 virtual bool IsRealFile() const = 0;
43 virtual bool Open() = 0;
44 virtual ssize_t Read(void* buffer, size_t nbyte) = 0;
45 virtual bool Close() = 0;
46 virtual bool GetSize(uint64_t* size) = 0;
47 };
48
49 class FileIngestionSource : public IngestionSource {
50 public:
51 4293 explicit FileIngestionSource(const std::string& path)
52
1/2
✓ Branch 2 taken 4293 times.
✗ Branch 3 not taken.
4293 : path_(path), fd_(-1), stat_obtained_(false) {}
53 14968 ~FileIngestionSource() {}
54
55 8225 std::string GetPath() const { return path_; }
56 virtual bool IsRealFile() const { return true; }
57
58 1121 bool Open() {
59 1121 fd_ = open(path_.c_str(), O_RDONLY);
60
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1121 times.
1121 if (fd_ < 0) {
61 LogCvmfs(kLogCvmfs, kLogStderr,
62 "Err: Impossible to open the file: %s (%d)\n %s", path_.c_str(),
63 errno, strerror(errno));
64 return false;
65 }
66 1121 return true;
67 }
68
69 452771 ssize_t Read(void* buffer, size_t nbyte) {
70
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 452771 times.
452771 assert(fd_ >= 0);
71 452771 ssize_t read = SafeRead(fd_, buffer, nbyte);
72
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 453523 times.
453523 if (read < 0) {
73 LogCvmfs(kLogCvmfs, kLogStderr, "failed to read the file: %s (%d)\n %s",
74 path_.c_str(), errno, strerror(errno));
75 }
76 453523 return read;
77 }
78
79 1121 bool Close() {
80
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1121 times.
1121 if (fd_ == -1) return true;
81
82 // tell to the OS that we are not going to access the file again in the
83 // foreaseable future.
84 1121 (void)platform_invalidate_kcache(fd_, 0, 0);
85
86 1121 int ret = close(fd_);
87 1121 fd_ = -1;
88 1121 return (ret == 0);
89 }
90
91 592 bool GetSize(uint64_t* size) {
92
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 592 times.
592 if (stat_obtained_) {
93 *size = stat_.st_size;
94 return true;
95 }
96 592 int ret = platform_fstat(fd_, &stat_);
97
1/2
✓ Branch 0 taken 593 times.
✗ Branch 1 not taken.
593 if (ret == 0) {
98 593 *size = stat_.st_size;
99 593 stat_obtained_ = true;
100 593 return true;
101 }
102 return false;
103 }
104
105 private:
106 const std::string path_;
107 int fd_;
108 platform_stat64 stat_;
109 bool stat_obtained_;
110 };
111
112
113 /**
114 * Wraps around existing memory without owning it.
115 */
116 class MemoryIngestionSource : public IngestionSource {
117 public:
118 250002 MemoryIngestionSource(
119 const std::string &p, const unsigned char *d, unsigned s)
120
1/2
✓ Branch 2 taken 250002 times.
✗ Branch 3 not taken.
250002 : path_(p), data_(d), size_(s), pos_(0) {}
121 999850 virtual ~MemoryIngestionSource() {}
122 1242489 virtual std::string GetPath() const { return path_; }
123 virtual bool IsRealFile() const { return false; }
124 249921 virtual bool Open() { return true; }
125 498923 virtual ssize_t Read(void* buffer, size_t nbyte) {
126 498923 size_t remaining = size_ - pos_;
127 498923 size_t size = std::min(remaining, nbyte);
128
2/2
✓ Branch 0 taken 250004 times.
✓ Branch 1 taken 248897 times.
498901 if (size > 0) memcpy(buffer, data_ + pos_, size);
129 498901 pos_ += size;
130 498901 return static_cast<ssize_t>(size);
131 }
132 249808 virtual bool Close() { return true; }
133 249905 virtual bool GetSize(uint64_t* size) { *size = size_; return true; }
134
135 private:
136 std::string path_;
137 const unsigned char *data_;
138 unsigned size_;
139 unsigned pos_;
140 };
141
142
143 /**
144 * Uses an std::string as data buffer
145 */
146 class StringIngestionSource : public IngestionSource {
147 public:
148 2 explicit StringIngestionSource(const std::string& data)
149 4 : data_(data),
150
2/4
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 6 taken 2 times.
✗ Branch 7 not taken.
2 source_("MEM", reinterpret_cast<const unsigned char*>(data_.data()),
151
1/2
✓ Branch 2 taken 2 times.
✗ Branch 3 not taken.
4 data_.length()) {}
152 StringIngestionSource(const std::string& data, const std::string& filename)
153 : data_(data),
154 source_(filename, reinterpret_cast<const unsigned char*>(data_.data()),
155 data_.length()) {}
156 virtual ~StringIngestionSource() {}
157 3 virtual std::string GetPath() const { return source_.GetPath(); }
158 virtual bool IsRealFile() const { return false; }
159 2 virtual bool Open() { return source_.Open(); }
160 26 virtual ssize_t Read(void* buffer, size_t nbyte) {
161 26 return source_.Read(buffer, nbyte);
162 }
163 2 virtual bool Close() { return source_.Close(); }
164 1 virtual bool GetSize(uint64_t* size) { return source_.GetSize(size); }
165
166 private:
167 std::string data_;
168 MemoryIngestionSource source_;
169 };
170
171
172 class TarIngestionSource : public IngestionSource {
173 public:
174 9 TarIngestionSource(const std::string &path, struct archive* archive,
175 struct archive_entry* entry, Signal* read_archive_signal)
176 18 : path_(path),
177 9 archive_(archive),
178
1/2
✓ Branch 2 taken 9 times.
✗ Branch 3 not taken.
9 read_archive_signal_(read_archive_signal) {
179
2/4
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 9 times.
9 assert(read_archive_signal_->IsSleeping());
180
1/2
✓ Branch 1 taken 9 times.
✗ Branch 2 not taken.
9 const struct stat* stat_ = archive_entry_stat(entry);
181 9 size_ = stat_->st_size;
182 9 }
183
184 std::string GetPath() const { return path_; }
185 virtual bool IsRealFile() const { return false; }
186
187 bool Open() {
188 assert(size_ >= 0);
189 return true;
190 }
191
192 ssize_t Read(void* external_buffer, size_t nbytes) {
193 ssize_t read = archive_read_data(archive_, external_buffer, nbytes);
194 if (read < 0) {
195 errno = archive_errno(archive_);
196 LogCvmfs(kLogCvmfs, kLogStderr,
197 "failed to read data from the tar entry: %s (%d)\n %s",
198 path_.c_str(), errno, archive_error_string(archive_));
199 }
200 return read;
201 }
202
203 9 bool Close() {
204 9 read_archive_signal_->Wakeup();
205 9 return true;
206 }
207
208 bool GetSize(uint64_t* size) {
209 *size = size_;
210 return true;
211 }
212
213 private:
214 std::string path_;
215 struct archive* archive_;
216 uint64_t size_;
217 Signal* read_archive_signal_;
218 };
219
220 #endif // CVMFS_INGESTION_INGESTION_SOURCE_H_
221