GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_ingest.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 0 153 0.0%
Branches: 0 92 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System
3 */
4
5 #include "swissknife_ingest.h"
6
7 #include <fcntl.h>
8 #include <unistd.h>
9
10 #include <string>
11 #include <vector>
12
13 #include "catalog_virtual.h"
14 #include "manifest.h"
15 #include "statistics.h"
16 #include "statistics_database.h"
17 #include "swissknife_ingest_gc.h"
18 #include "sync_mediator.h"
19 #include "sync_union.h"
20 #include "sync_union_tarball.h"
21 #include "util/capabilities.h"
22 #include "util/logging.h"
23 #include "util/pointer.h"
24 #include "util/posix.h"
25
26 /*
27 * Many of the options possible to set in the ArgumentList are not actually used
28 * by the ingest command since they are not part of its interface, hence those
29 * unused options cannot be set by the shell script. Of course if there is the
30 * necessitty those parameters can be added and managed.
31 * At the moment this approach worked fine and didn't add much complexity,
32 * however if yet another command will need to use a similar approach it would
33 * be good to consider creating different options handler for each command.
34 */
35 int swissknife::Ingest::Main(const swissknife::ArgumentList &args) {
36 const std::string start_time = GetGMTimestamp();
37
38 SyncParameters params;
39 params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
40 params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
41 params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
42 shash::kSuffixCatalog);
43 params.stratum0 = *args.find('w')->second;
44 params.manifest_path = *args.find('o')->second;
45 params.spooler_definition = *args.find('r')->second;
46
47 params.public_keys = *args.find('K')->second;
48 params.repo_name = *args.find('N')->second;
49
50 if (args.find('T') != args.end()) {
51 params.tar_file = *args.find('T')->second;
52 }
53 if (args.find('B') != args.end()) {
54 params.base_directory = *args.find('B')->second;
55 }
56 if (args.find('D') != args.end()) {
57 params.to_delete = *args.find('D')->second;
58 }
59
60 std::string gc_db_path;
61 std::vector<int64_t> gc_batch_ids;
62 if (args.find('Q') != args.end()) {
63 gc_db_path = *args.find('Q')->second;
64 // Batch size comes from the -X flag (populated by the shell from
65 // CVMFS_GC_DB_BATCH_SIZE in the repository server.conf). 0 / missing /
66 // negative means "no limit": read all pending rows. Default is 1000.
67 int gc_db_batch_size = 1000;
68 if (args.find('X') != args.end()) {
69 gc_db_batch_size = static_cast<int>(String2Int64(*args.find('X')->second));
70 if (gc_db_batch_size < 0) gc_db_batch_size = 0;
71 }
72 std::vector<std::string> gc_paths;
73 if (!ReadGCDatabase(gc_db_path, &gc_paths, &gc_batch_ids,
74 gc_db_batch_size)) {
75 PrintError("Swissknife Ingest: failed to read GC database");
76 return 1;
77 }
78 LogCvmfs(kLogCvmfs, kLogStdout,
79 "Swissknife Ingest: Read %lu paths from GC database %s "
80 "(batch size %d)",
81 gc_paths.size(), gc_db_path.c_str(), gc_db_batch_size);
82 // Append GC paths to the existing to_delete string using /// delimiter
83 for (size_t i = 0; i < gc_paths.size(); ++i) {
84 if (!params.to_delete.empty()) {
85 params.to_delete += "///";
86 }
87 params.to_delete += gc_paths[i];
88 }
89 // GC database implies fast delete
90 if (!gc_paths.empty()) {
91 params.fast_delete = true;
92 }
93 }
94
95 if (args.find('O') != args.end()) {
96 params.generate_legacy_bulk_chunks = true;
97 }
98 if (args.find('j') != args.end()) {
99 params.enable_mtime_ns = true;
100 }
101 if (args.find('f') != args.end()) {
102 params.fast_delete = true;
103 }
104 if (args.find('m') != args.end()) {
105 params.tolerate_missing_hardlinks = true;
106 }
107 shash::Algorithms hash_algorithm = shash::kSha1;
108 if (args.find('e') != args.end()) {
109 hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
110 if (hash_algorithm == shash::kAny) {
111 PrintError("Swissknife Ingest: unknown hash algorithm");
112 return 1;
113 }
114 }
115 if (args.find('Z') != args.end()) {
116 params.compression_alg = zlib::ParseCompressionAlgorithm(
117 *args.find('Z')->second);
118 }
119 if (args.find('U') != args.end()) {
120 params.uid = static_cast<uid_t>(String2Int64(*args.find('U')->second));
121 }
122 if (args.find('G') != args.end()) {
123 params.gid = static_cast<gid_t>(String2Int64(*args.find('G')->second));
124 }
125
126 const bool create_catalog = args.find('C') != args.end();
127
128 params.nested_kcatalog_limit = SyncParameters::kDefaultNestedKcatalogLimit;
129 params.root_kcatalog_limit = SyncParameters::kDefaultRootKcatalogLimit;
130 params.file_mbyte_limit = SyncParameters::kDefaultFileMbyteLimit;
131
132 params.branched_catalog = false; // could be true?
133
134 if (args.find('P') != args.end()) {
135 params.session_token_file = *args.find('P')->second;
136 }
137
138 if (args.find('H') != args.end()) {
139 params.key_file = *args.find('H')->second;
140 }
141
142 const bool upload_statsdb = (args.count('I') > 0);
143
144 perf::StatisticsTemplate publish_statistics("publish", this->statistics());
145 StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(
146 params.repo_name);
147
148 upload::SpoolerDefinition spooler_definition(
149 params.spooler_definition, hash_algorithm, params.compression_alg,
150 params.generate_legacy_bulk_chunks, params.use_file_chunking,
151 params.min_file_chunk_size, params.avg_file_chunk_size,
152 params.max_file_chunk_size, params.session_token_file, params.key_file);
153 if (params.max_concurrent_write_jobs > 0) {
154 spooler_definition
155 .number_of_concurrent_uploads = params.max_concurrent_write_jobs;
156 }
157
158 // Sanitize base_directory, removing any leading or trailing slashes
159 // from non-root (!= "/") paths
160 params.base_directory = TrimString(params.base_directory, "/", kTrimAll);
161
162 const upload::SpoolerDefinition spooler_definition_catalogs(
163 spooler_definition.Dup2DefaultCompression());
164
165 params.spooler = upload::Spooler::Construct(spooler_definition,
166 &publish_statistics);
167 if (NULL == params.spooler)
168 return 3;
169 const UniquePtr<upload::Spooler> spooler_catalogs(upload::Spooler::Construct(
170 spooler_definition_catalogs, &publish_statistics));
171 if (!spooler_catalogs.IsValid())
172 return 3;
173
174 const bool follow_redirects = (args.count('L') > 0);
175 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
176 if (!InitDownloadManager(follow_redirects, proxy)) {
177 return 3;
178 }
179
180 if (!InitSignatureManager(params.public_keys)) {
181 return 3;
182 }
183
184 const bool with_gateway = spooler_definition.driver_type
185 == upload::SpoolerDefinition::Gateway;
186
187 // This may fail, in which case a warning is printed and the process continues
188 ObtainDacReadSearchCapability();
189
190 UniquePtr<manifest::Manifest> manifest;
191 if (params.branched_catalog) {
192 // Throw-away manifest
193 manifest = new manifest::Manifest(shash::Any(), 0, "");
194 } else {
195 if (with_gateway) {
196 manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
197 shash::Any());
198 } else {
199 manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
200 params.base_hash);
201 }
202 }
203 if (!manifest.IsValid()) {
204 return 3;
205 }
206
207 const std::string old_root_hash = manifest->catalog_hash().ToString(true);
208
209 catalog::WritableCatalogManager catalog_manager(
210 params.base_hash, params.stratum0, params.dir_temp,
211 spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
212 params.nested_kcatalog_limit, params.root_kcatalog_limit,
213 params.file_mbyte_limit, statistics(), params.is_balanced,
214 params.max_weight, params.min_weight);
215 catalog_manager.Init();
216
217 publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
218 LogCvmfs(kLogPublish, kLogStdout, "Swissknife Ingest: Processing changes...");
219
220 publish::SyncUnion *sync = new publish::SyncUnionTarball(
221 &mediator, params.dir_rdonly, params.tar_file, params.base_directory,
222 params.uid, params.gid, params.to_delete, create_catalog,
223 params.fast_delete, "///", params.tolerate_missing_hardlinks);
224
225 if (!sync->Initialize()) {
226 LogCvmfs(kLogCvmfs, kLogStderr,
227 "Swissknife Ingest: Initialization of the synchronisation "
228 "engine failed");
229 return 4;
230 }
231
232 sync->Traverse();
233
234 if (!params.authz_file.empty()) {
235 LogCvmfs(kLogCvmfs, kLogDebug,
236 "Swissknife Ingest: Adding contents of authz file %s to"
237 " root catalog.",
238 params.authz_file.c_str());
239 const int fd = open(params.authz_file.c_str(), O_RDONLY);
240 if (fd == -1) {
241 LogCvmfs(kLogCvmfs, kLogStderr,
242 "Swissknife Ingest: Unable to open authz file (%s)"
243 "from the publication process: %s",
244 params.authz_file.c_str(), strerror(errno));
245 return 7;
246 }
247
248 std::string new_authz;
249 const bool read_successful = SafeReadToString(fd, &new_authz);
250 close(fd);
251
252 if (!read_successful) {
253 LogCvmfs(kLogCvmfs, kLogStderr,
254 "Swissknife Ingest: Failed to read authz file (%s): %s",
255 params.authz_file.c_str(), strerror(errno));
256 return 8;
257 }
258
259 catalog_manager.SetVOMSAuthz(new_authz);
260 }
261
262 if (!mediator.Commit(manifest.weak_ref())) {
263 PrintError("Swissknife Ingest: something went wrong during sync");
264 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
265 if (upload_statsdb) {
266 stats_db->UploadStatistics(params.spooler);
267 }
268 return 5;
269 }
270
271 perf::Counter *revision_counter = statistics()->Register(
272 "publish.revision", "Published revision number");
273 revision_counter->Set(catalog_manager.GetRootCatalog()->revision());
274
275 // finalize the spooler
276 LogCvmfs(kLogCvmfs, kLogStdout,
277 "Swissknife Ingest: Wait for all uploads to finish");
278 params.spooler->WaitForUpload();
279 spooler_catalogs->WaitForUpload();
280 params.spooler->FinalizeSession(false);
281
282 LogCvmfs(kLogCvmfs, kLogStdout,
283 "Swissknife Ingest: Exporting repository manifest");
284
285 // We call FinalizeSession(true) this time, to also trigger the commit
286 // operation on the gateway machine (if the upstream is of type "gw").
287
288 // Get the path of the new root catalog
289 const std::string new_root_hash = manifest->catalog_hash().ToString(true);
290
291 if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
292 params.repo_tag)) {
293 PrintError("Swissknife Ingest: Failed to commit the transaction.");
294 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
295 if (upload_statsdb) {
296 stats_db->UploadStatistics(params.spooler);
297 }
298 return 9;
299 }
300
301 stats_db->StorePublishStatistics(this->statistics(), start_time, true);
302 if (upload_statsdb) {
303 stats_db->UploadStatistics(params.spooler);
304 }
305
306 delete params.spooler;
307
308 if (!manifest->Export(params.manifest_path)) {
309 PrintError("Swissknife Ingest: Failed to create new repository");
310 return 6;
311 }
312
313 // Mark GC paths as deleted in the database after successful publication.
314 // Only the rows that were just read (gc_batch_ids) are marked, so rows
315 // added after the read by a concurrent scanner are preserved for the next
316 // invocation.
317 if (!gc_db_path.empty()) {
318 if (MarkGCPathsDeleted(gc_db_path, gc_batch_ids)) {
319 LogCvmfs(kLogCvmfs, kLogStdout,
320 "Swissknife Ingest: Marked %lu GC paths as deleted in %s",
321 gc_batch_ids.size(), gc_db_path.c_str());
322 } else {
323 LogCvmfs(kLogCvmfs, kLogStderr,
324 "Swissknife Ingest: WARNING - Failed to mark GC paths as "
325 "deleted in %s. Paths were removed from the repository but "
326 "the database was not updated.",
327 gc_db_path.c_str());
328 }
329 }
330
331 return 0;
332 }
333