GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/swissknife_ingest.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 0 127 0.0%
Branches: 0 68 0.0%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM File System
3 */
4
5 #include "swissknife_ingest.h"
6
7 #include <fcntl.h>
8 #include <unistd.h>
9
10 #include "catalog_virtual.h"
11 #include "manifest.h"
12 #include "statistics.h"
13 #include "statistics_database.h"
14 #include "swissknife_capabilities.h"
15 #include "sync_mediator.h"
16 #include "sync_union.h"
17 #include "sync_union_tarball.h"
18 #include "util/logging.h"
19 #include "util/pointer.h"
20 #include "util/posix.h"
21
22 /*
23 * Many of the options possible to set in the ArgumentList are not actually used
24 * by the ingest command since they are not part of its interface, hence those
25 * unused options cannot be set by the shell script. Of course if there is the
26 * necessitty those parameters can be added and managed.
27 * At the moment this approach worked fine and didn't add much complexity,
28 * however if yet another command will need to use a similar approach it would
29 * be good to consider creating different options handler for each command.
30 */
31 int swissknife::Ingest::Main(const swissknife::ArgumentList &args) {
32 const std::string start_time = GetGMTimestamp();
33
34 SyncParameters params;
35 params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
36 params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
37 params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
38 shash::kSuffixCatalog);
39 params.stratum0 = *args.find('w')->second;
40 params.manifest_path = *args.find('o')->second;
41 params.spooler_definition = *args.find('r')->second;
42
43 params.public_keys = *args.find('K')->second;
44 params.repo_name = *args.find('N')->second;
45
46 if (args.find('T') != args.end()) {
47 params.tar_file = *args.find('T')->second;
48 }
49 if (args.find('B') != args.end()) {
50 params.base_directory = *args.find('B')->second;
51 }
52 if (args.find('D') != args.end()) {
53 params.to_delete = *args.find('D')->second;
54 }
55
56 if (args.find('O') != args.end()) {
57 params.generate_legacy_bulk_chunks = true;
58 }
59 if (args.find('j') != args.end()) {
60 params.enable_mtime_ns = true;
61 }
62 shash::Algorithms hash_algorithm = shash::kSha1;
63 if (args.find('e') != args.end()) {
64 hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
65 if (hash_algorithm == shash::kAny) {
66 PrintError("Swissknife Ingest: unknown hash algorithm");
67 return 1;
68 }
69 }
70 if (args.find('Z') != args.end()) {
71 params.compression_alg = zlib::ParseCompressionAlgorithm(
72 *args.find('Z')->second);
73 }
74 if (args.find('U') != args.end()) {
75 params.uid = static_cast<uid_t>(String2Int64(*args.find('U')->second));
76 }
77 if (args.find('G') != args.end()) {
78 params.gid = static_cast<gid_t>(String2Int64(*args.find('G')->second));
79 }
80
81 const bool create_catalog = args.find('C') != args.end();
82
83 params.nested_kcatalog_limit = SyncParameters::kDefaultNestedKcatalogLimit;
84 params.root_kcatalog_limit = SyncParameters::kDefaultRootKcatalogLimit;
85 params.file_mbyte_limit = SyncParameters::kDefaultFileMbyteLimit;
86
87 params.branched_catalog = false; // could be true?
88
89 if (args.find('P') != args.end()) {
90 params.session_token_file = *args.find('P')->second;
91 }
92
93 if (args.find('H') != args.end()) {
94 params.key_file = *args.find('H')->second;
95 }
96
97 const bool upload_statsdb = (args.count('I') > 0);
98
99 perf::StatisticsTemplate publish_statistics("publish", this->statistics());
100 StatisticsDatabase *stats_db = StatisticsDatabase::OpenStandardDB(
101 params.repo_name);
102
103 upload::SpoolerDefinition spooler_definition(
104 params.spooler_definition, hash_algorithm, params.compression_alg,
105 params.generate_legacy_bulk_chunks, params.use_file_chunking,
106 params.min_file_chunk_size, params.avg_file_chunk_size,
107 params.max_file_chunk_size, params.session_token_file, params.key_file);
108 if (params.max_concurrent_write_jobs > 0) {
109 spooler_definition
110 .number_of_concurrent_uploads = params.max_concurrent_write_jobs;
111 }
112
113 // Sanitize base_directory, removing any leading or trailing slashes
114 // from non-root (!= "/") paths
115 params.base_directory = TrimString(params.base_directory, "/", kTrimAll);
116
117 const upload::SpoolerDefinition spooler_definition_catalogs(
118 spooler_definition.Dup2DefaultCompression());
119
120 params.spooler = upload::Spooler::Construct(spooler_definition,
121 &publish_statistics);
122 if (NULL == params.spooler)
123 return 3;
124 const UniquePtr<upload::Spooler> spooler_catalogs(upload::Spooler::Construct(
125 spooler_definition_catalogs, &publish_statistics));
126 if (!spooler_catalogs.IsValid())
127 return 3;
128
129 const bool follow_redirects = (args.count('L') > 0);
130 const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
131 if (!InitDownloadManager(follow_redirects, proxy)) {
132 return 3;
133 }
134
135 if (!InitSignatureManager(params.public_keys)) {
136 return 3;
137 }
138
139 const bool with_gateway =
140 spooler_definition.driver_type == upload::SpoolerDefinition::Gateway;
141
142 // This may fail, in which case a warning is printed and the process continues
143 ObtainDacReadSearchCapability();
144
145 UniquePtr<manifest::Manifest> manifest;
146 if (params.branched_catalog) {
147 // Throw-away manifest
148 manifest = new manifest::Manifest(shash::Any(), 0, "");
149 } else {
150 if (with_gateway) {
151 manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
152 shash::Any());
153 } else {
154 manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
155 params.base_hash);
156 }
157 }
158 if (!manifest.IsValid()) {
159 return 3;
160 }
161
162 const std::string old_root_hash = manifest->catalog_hash().ToString(true);
163
164 catalog::WritableCatalogManager catalog_manager(
165 params.base_hash, params.stratum0, params.dir_temp,
166 spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
167 params.nested_kcatalog_limit, params.root_kcatalog_limit,
168 params.file_mbyte_limit, statistics(), params.is_balanced,
169 params.max_weight, params.min_weight);
170 catalog_manager.Init();
171
172 publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
173 LogCvmfs(kLogPublish, kLogStdout, "Swissknife Ingest: Processing changes...");
174
175 publish::SyncUnion *sync = new publish::SyncUnionTarball(
176 &mediator, params.dir_rdonly, params.tar_file, params.base_directory,
177 params.uid, params.gid, params.to_delete, create_catalog, "///");
178
179 if (!sync->Initialize()) {
180 LogCvmfs(kLogCvmfs, kLogStderr,
181 "Swissknife Ingest: Initialization of the synchronisation "
182 "engine failed");
183 return 4;
184 }
185
186 sync->Traverse();
187
188 if (!params.authz_file.empty()) {
189 LogCvmfs(kLogCvmfs, kLogDebug,
190 "Swissknife Ingest: Adding contents of authz file %s to"
191 " root catalog.",
192 params.authz_file.c_str());
193 const int fd = open(params.authz_file.c_str(), O_RDONLY);
194 if (fd == -1) {
195 LogCvmfs(kLogCvmfs, kLogStderr,
196 "Swissknife Ingest: Unable to open authz file (%s)"
197 "from the publication process: %s",
198 params.authz_file.c_str(), strerror(errno));
199 return 7;
200 }
201
202 std::string new_authz;
203 const bool read_successful = SafeReadToString(fd, &new_authz);
204 close(fd);
205
206 if (!read_successful) {
207 LogCvmfs(kLogCvmfs, kLogStderr,
208 "Swissknife Ingest: Failed to read authz file (%s): %s",
209 params.authz_file.c_str(), strerror(errno));
210 return 8;
211 }
212
213 catalog_manager.SetVOMSAuthz(new_authz);
214 }
215
216 if (!mediator.Commit(manifest.weak_ref())) {
217 PrintError("Swissknife Ingest: something went wrong during sync");
218 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
219 if (upload_statsdb) {
220 stats_db->UploadStatistics(params.spooler);
221 }
222 return 5;
223 }
224
225 perf::Counter *revision_counter = statistics()->Register(
226 "publish.revision", "Published revision number");
227 revision_counter->Set(catalog_manager.GetRootCatalog()->revision());
228
229 // finalize the spooler
230 LogCvmfs(kLogCvmfs, kLogStdout,
231 "Swissknife Ingest: Wait for all uploads to finish");
232 params.spooler->WaitForUpload();
233 spooler_catalogs->WaitForUpload();
234 params.spooler->FinalizeSession(false);
235
236 LogCvmfs(kLogCvmfs, kLogStdout,
237 "Swissknife Ingest: Exporting repository manifest");
238
239 // We call FinalizeSession(true) this time, to also trigger the commit
240 // operation on the gateway machine (if the upstream is of type "gw").
241
242 // Get the path of the new root catalog
243 const std::string new_root_hash = manifest->catalog_hash().ToString(true);
244
245 if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
246 params.repo_tag)) {
247 PrintError("Swissknife Ingest: Failed to commit the transaction.");
248 stats_db->StorePublishStatistics(this->statistics(), start_time, false);
249 if (upload_statsdb) {
250 stats_db->UploadStatistics(params.spooler);
251 }
252 return 9;
253 }
254
255 stats_db->StorePublishStatistics(this->statistics(), start_time, true);
256 if (upload_statsdb) {
257 stats_db->UploadStatistics(params.spooler);
258 }
259
260 delete params.spooler;
261
262 if (!manifest->Export(params.manifest_path)) {
263 PrintError("Swissknife Ingest: Failed to create new repository");
264 return 6;
265 }
266
267 return 0;
268 }
269