GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_ingest.cc
Date: 2026-05-24 02:35:55
Exec Total Coverage
Lines: 0 151 0.0%
Branches: 0 90 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System
3 */
4
5 #include "swissknife_ingest.h"
6
7 #include <fcntl.h>
8 #include <unistd.h>
9
10 #include <string>
11 #include <vector>
12
13 #include "catalog_virtual.h"
14 #include "manifest.h"
15 #include "statistics.h"
16 #include "statistics_database.h"
17 #include "swissknife_ingest_gc.h"
18 #include "sync_mediator.h"
19 #include "sync_union.h"
20 #include "sync_union_tarball.h"
21 #include "util/capabilities.h"
22 #include "util/logging.h"
23 #include "util/pointer.h"
24 #include "util/posix.h"
25
26 /*
27 * Many of the options possible to set in the ArgumentList are not actually used
28 * by the ingest command since they are not part of its interface, hence those
29 * unused options cannot be set by the shell script. Of course if there is the
30 * necessitty those parameters can be added and managed.
31 * At the moment this approach worked fine and didn't add much complexity,
32 * however if yet another command will need to use a similar approach it would
33 * be good to consider creating different options handler for each command.
34 */
35 int swissknife::Ingest::Main(const swissknife::ArgumentList &args) {
36 const std::string start_time = GetGMTimestamp();
37
38 SyncParameters params;
39 params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
40 params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
41 params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
42 shash::kSuffixCatalog);
43 params.stratum0 = *args.find('w')->second;
44 params.manifest_path = *args.find('o')->second;
45 params.spooler_definition = *args.find('r')->second;
46
47 params.public_keys = *args.find('K')->second;
48 params.repo_name = *args.find('N')->second;
49
50 if (args.find('T') != args.end()) {
51 params.tar_file = *args.find('T')->second;
52 }
53 if (args.find('B') != args.end()) {
54 params.base_directory = *args.find('B')->second;
55 }
56 if (args.find('D') != args.end()) {
57 params.to_delete = *args.find('D')->second;
58 }
59
60 std::string gc_db_path;
61 std::vector<int64_t> gc_batch_ids;
62 if (args.find('Q') != args.end()) {
63 gc_db_path = *args.find('Q')->second;
64 // Batch size comes from the -X flag (populated by the shell from
65 // CVMFS_GC_DB_BATCH_SIZE in the repository server.conf). 0 / missing /
66 // negative means "no limit": read all pending rows. Default is 1000.
67 int gc_db_batch_size = 1000;
68 if (args.find('X') != args.end()) {
69 gc_db_batch_size = static_cast<int>(String2Int64(*args.find('X')->second));
70 if (gc_db_batch_size < 0) gc_db_batch_size = 0;
71 }
72 std::vector<std::string> gc_paths;
73 if (!ReadGCDatabase(gc_db_path, &gc_paths, &gc_batch_ids,
74 gc_db_batch_size)) {
75 PrintError("Swissknife Ingest: failed to read GC database");
76 return 1;
77 }
78 LogCvmfs(kLogCvmfs, kLogStdout,
79 "Swissknife Ingest: Read %lu paths from GC database %s "
80 "(batch size %d)",
81 gc_paths.size(), gc_db_path.c_str(), gc_db_batch_size);
82 // Append GC paths to the existing to_delete string using /// delimiter
83 for (size_t i = 0; i < gc_paths.size(); ++i) {
84 if (!params.to_delete.empty()) {
85 params.to_delete += "///";
86 }
87 params.to_delete += gc_paths[i];
88 }
89 // GC database implies fast delete
90 if (!gc_paths.empty()) {
91 params.fast_delete = true;
92 }
93 }
94
95 if (args.find('O') != args.end()) {
96 params.generate_legacy_bulk_chunks = true;
97 }
98 if (args.find('j') != args.end()) {
99 params.enable_mtime_ns = true;
100 }
101 if (args.find('f') != args.end()) {
102 params.fast_delete = true;
103 }
104 shash::Algorithms hash_algorithm = shash::kSha1;
105 if (args.find('e') != args.end()) {
106 hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
107 if (hash_algorithm == shash::kAny) {
108 PrintError("Swissknife Ingest: unknown hash algorithm");
109 return 1;
110 }
111 }
112 if (args.find('Z') != args.end()) {
113 params.compression_alg = zlib::ParseCompressionAlgorithm(
114 *args.find('Z')->second);
115 }
116 if (args.find('U') != args.end()) {
117 params.uid = static_cast<uid_t>(String2Int64(*args.find('U')->second));
118 }
119 if (args.find('G') != args.end()) {
120 params.gid = static_cast<gid_t>(String2Int64(*args.find('G')->second));
121 }
122
123 const bool create_catalog = args.find('C') != args.end();
124
125 params.nested_kcatalog_limit = SyncParameters::kDefaultNestedKcatalogLimit;
126 params.root_kcatalog_limit = SyncParameters::kDefaultRootKcatalogLimit;
127 params.file_mbyte_limit = SyncParameters::kDefaultFileMbyteLimit;
128
129 params.branched_catalog = false; // could be true?
130
131 if (args.find('P') != args.end()) {
132 params.session_token_file = *args.find('P')->second;
133 }
134
135 if (args.find('H') != args.end()) {
136 params.key_file = *args.find('H')->second;
137 }
138
139 const bool upload_statsdb = (args.count('I') > 0);
140
141 perf::StatisticsTemplate publish_statistics("publish", this->statistics());
142 StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(
143 params.repo_name);
144
145 upload::SpoolerDefinition spooler_definition(
146 params.spooler_definition, hash_algorithm, params.compression_alg,
147 params.generate_legacy_bulk_chunks, params.use_file_chunking,
148 params.min_file_chunk_size, params.avg_file_chunk_size,
149 params.max_file_chunk_size, params.session_token_file, params.key_file);
150 if (params.max_concurrent_write_jobs > 0) {
151 spooler_definition
152 .number_of_concurrent_uploads = params.max_concurrent_write_jobs;
153 }
154
155 // Sanitize base_directory, removing any leading or trailing slashes
156 // from non-root (!= "/") paths
157 params.base_directory = TrimString(params.base_directory, "/", kTrimAll);
158
159 const upload::SpoolerDefinition spooler_definition_catalogs(
160 spooler_definition.Dup2DefaultCompression());
161
162 params.spooler = upload::Spooler::Construct(spooler_definition,
163 &publish_statistics);
164 if (NULL == params.spooler)
165 return 3;
166 const UniquePtr<upload::Spooler> spooler_catalogs(upload::Spooler::Construct(
167 spooler_definition_catalogs, &publish_statistics));
168 if (!spooler_catalogs.IsValid())
169 return 3;
170
171 const bool follow_redirects = (args.count('L') > 0);
172 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
173 if (!InitDownloadManager(follow_redirects, proxy)) {
174 return 3;
175 }
176
177 if (!InitSignatureManager(params.public_keys)) {
178 return 3;
179 }
180
181 const bool with_gateway = spooler_definition.driver_type
182 == upload::SpoolerDefinition::Gateway;
183
184 // This may fail, in which case a warning is printed and the process continues
185 ObtainDacReadSearchCapability();
186
187 UniquePtr<manifest::Manifest> manifest;
188 if (params.branched_catalog) {
189 // Throw-away manifest
190 manifest = new manifest::Manifest(shash::Any(), 0, "");
191 } else {
192 if (with_gateway) {
193 manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
194 shash::Any());
195 } else {
196 manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
197 params.base_hash);
198 }
199 }
200 if (!manifest.IsValid()) {
201 return 3;
202 }
203
204 const std::string old_root_hash = manifest->catalog_hash().ToString(true);
205
206 catalog::WritableCatalogManager catalog_manager(
207 params.base_hash, params.stratum0, params.dir_temp,
208 spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
209 params.nested_kcatalog_limit, params.root_kcatalog_limit,
210 params.file_mbyte_limit, statistics(), params.is_balanced,
211 params.max_weight, params.min_weight);
212 catalog_manager.Init();
213
214 publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
215 LogCvmfs(kLogPublish, kLogStdout, "Swissknife Ingest: Processing changes...");
216
217 publish::SyncUnion *sync = new publish::SyncUnionTarball(
218 &mediator, params.dir_rdonly, params.tar_file, params.base_directory,
219 params.uid, params.gid, params.to_delete, create_catalog,
220 params.fast_delete, "///");
221
222 if (!sync->Initialize()) {
223 LogCvmfs(kLogCvmfs, kLogStderr,
224 "Swissknife Ingest: Initialization of the synchronisation "
225 "engine failed");
226 return 4;
227 }
228
229 sync->Traverse();
230
231 if (!params.authz_file.empty()) {
232 LogCvmfs(kLogCvmfs, kLogDebug,
233 "Swissknife Ingest: Adding contents of authz file %s to"
234 " root catalog.",
235 params.authz_file.c_str());
236 const int fd = open(params.authz_file.c_str(), O_RDONLY);
237 if (fd == -1) {
238 LogCvmfs(kLogCvmfs, kLogStderr,
239 "Swissknife Ingest: Unable to open authz file (%s)"
240 "from the publication process: %s",
241 params.authz_file.c_str(), strerror(errno));
242 return 7;
243 }
244
245 std::string new_authz;
246 const bool read_successful = SafeReadToString(fd, &new_authz);
247 close(fd);
248
249 if (!read_successful) {
250 LogCvmfs(kLogCvmfs, kLogStderr,
251 "Swissknife Ingest: Failed to read authz file (%s): %s",
252 params.authz_file.c_str(), strerror(errno));
253 return 8;
254 }
255
256 catalog_manager.SetVOMSAuthz(new_authz);
257 }
258
259 if (!mediator.Commit(manifest.weak_ref())) {
260 PrintError("Swissknife Ingest: something went wrong during sync");
261 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
262 if (upload_statsdb) {
263 stats_db->UploadStatistics(params.spooler);
264 }
265 return 5;
266 }
267
268 perf::Counter *revision_counter = statistics()->Register(
269 "publish.revision", "Published revision number");
270 revision_counter->Set(catalog_manager.GetRootCatalog()->revision());
271
272 // finalize the spooler
273 LogCvmfs(kLogCvmfs, kLogStdout,
274 "Swissknife Ingest: Wait for all uploads to finish");
275 params.spooler->WaitForUpload();
276 spooler_catalogs->WaitForUpload();
277 params.spooler->FinalizeSession(false);
278
279 LogCvmfs(kLogCvmfs, kLogStdout,
280 "Swissknife Ingest: Exporting repository manifest");
281
282 // We call FinalizeSession(true) this time, to also trigger the commit
283 // operation on the gateway machine (if the upstream is of type "gw").
284
285 // Get the path of the new root catalog
286 const std::string new_root_hash = manifest->catalog_hash().ToString(true);
287
288 if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
289 params.repo_tag)) {
290 PrintError("Swissknife Ingest: Failed to commit the transaction.");
291 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
292 if (upload_statsdb) {
293 stats_db->UploadStatistics(params.spooler);
294 }
295 return 9;
296 }
297
298 stats_db->StorePublishStatistics(this->statistics(), start_time, true);
299 if (upload_statsdb) {
300 stats_db->UploadStatistics(params.spooler);
301 }
302
303 delete params.spooler;
304
305 if (!manifest->Export(params.manifest_path)) {
306 PrintError("Swissknife Ingest: Failed to create new repository");
307 return 6;
308 }
309
310 // Mark GC paths as deleted in the database after successful publication.
311 // Only the rows that were just read (gc_batch_ids) are marked, so rows
312 // added after the read by a concurrent scanner are preserved for the next
313 // invocation.
314 if (!gc_db_path.empty()) {
315 if (MarkGCPathsDeleted(gc_db_path, gc_batch_ids)) {
316 LogCvmfs(kLogCvmfs, kLogStdout,
317 "Swissknife Ingest: Marked %lu GC paths as deleted in %s",
318 gc_batch_ids.size(), gc_db_path.c_str());
319 } else {
320 LogCvmfs(kLogCvmfs, kLogStderr,
321 "Swissknife Ingest: WARNING - Failed to mark GC paths as "
322 "deleted in %s. Paths were removed from the repository but "
323 "the database was not updated.",
324 gc_db_path.c_str());
325 }
326 }
327
328 return 0;
329 }
330