Line |
Branch |
Exec |
Source |
1 |
|
|
/** |
2 |
|
|
* This file is part of the CernVM File System |
3 |
|
|
*/ |
4 |
|
|
|
5 |
|
|
#include "swissknife_ingest.h" |
6 |
|
|
|
7 |
|
|
#include <fcntl.h> |
8 |
|
|
#include <unistd.h> |
9 |
|
|
|
10 |
|
|
#include "catalog_virtual.h" |
11 |
|
|
#include "manifest.h" |
12 |
|
|
#include "statistics.h" |
13 |
|
|
#include "statistics_database.h" |
14 |
|
|
#include "swissknife_capabilities.h" |
15 |
|
|
#include "sync_mediator.h" |
16 |
|
|
#include "sync_union.h" |
17 |
|
|
#include "sync_union_tarball.h" |
18 |
|
|
#include "util/logging.h" |
19 |
|
|
#include "util/pointer.h" |
20 |
|
|
#include "util/posix.h" |
21 |
|
|
|
22 |
|
|
/* |
23 |
|
|
* Many of the options possible to set in the ArgumentList are not actually used |
24 |
|
|
* by the ingest command since they are not part of its interface, hence those |
25 |
|
|
* unused options cannot be set by the shell script. Of course if there is the |
26 |
|
|
* necessitty those parameters can be added and managed. |
27 |
|
|
* At the moment this approach worked fine and didn't add much complexity, |
28 |
|
|
* however if yet another command will need to use a similar approach it would |
29 |
|
|
* be good to consider creating different options handler for each command. |
30 |
|
|
*/ |
31 |
|
✗ |
int swissknife::Ingest::Main(const swissknife::ArgumentList &args) { |
32 |
|
✗ |
std::string start_time = GetGMTimestamp(); |
33 |
|
|
|
34 |
|
✗ |
SyncParameters params; |
35 |
|
✗ |
params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second); |
36 |
|
✗ |
params.dir_temp = MakeCanonicalPath(*args.find('t')->second); |
37 |
|
✗ |
params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second), |
38 |
|
|
shash::kSuffixCatalog); |
39 |
|
✗ |
params.stratum0 = *args.find('w')->second; |
40 |
|
✗ |
params.manifest_path = *args.find('o')->second; |
41 |
|
✗ |
params.spooler_definition = *args.find('r')->second; |
42 |
|
|
|
43 |
|
✗ |
params.public_keys = *args.find('K')->second; |
44 |
|
✗ |
params.repo_name = *args.find('N')->second; |
45 |
|
|
|
46 |
|
✗ |
if (args.find('T') != args.end()) { |
47 |
|
✗ |
params.tar_file = *args.find('T')->second; |
48 |
|
|
} |
49 |
|
✗ |
if (args.find('B') != args.end()) { |
50 |
|
✗ |
params.base_directory = *args.find('B')->second; |
51 |
|
|
} |
52 |
|
✗ |
if (args.find('D') != args.end()) { |
53 |
|
✗ |
params.to_delete = *args.find('D')->second; |
54 |
|
|
} |
55 |
|
|
|
56 |
|
✗ |
if (args.find('O') != args.end()) { |
57 |
|
✗ |
params.generate_legacy_bulk_chunks = true; |
58 |
|
|
} |
59 |
|
✗ |
if (args.find('j') != args.end()) { |
60 |
|
✗ |
params.enable_mtime_ns = true; |
61 |
|
|
} |
62 |
|
✗ |
shash::Algorithms hash_algorithm = shash::kSha1; |
63 |
|
✗ |
if (args.find('e') != args.end()) { |
64 |
|
✗ |
hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second); |
65 |
|
✗ |
if (hash_algorithm == shash::kAny) { |
66 |
|
✗ |
PrintError("Swissknife Ingest: unknown hash algorithm"); |
67 |
|
✗ |
return 1; |
68 |
|
|
} |
69 |
|
|
} |
70 |
|
✗ |
if (args.find('Z') != args.end()) { |
71 |
|
✗ |
params.compression_alg = |
72 |
|
✗ |
zlib::ParseCompressionAlgorithm(*args.find('Z')->second); |
73 |
|
|
} |
74 |
|
✗ |
if (args.find('U') != args.end()) { |
75 |
|
✗ |
params.uid = static_cast<uid_t>(String2Int64(*args.find('U')->second)); |
76 |
|
|
} |
77 |
|
✗ |
if (args.find('G') != args.end()) { |
78 |
|
✗ |
params.gid = static_cast<gid_t>(String2Int64(*args.find('G')->second)); |
79 |
|
|
} |
80 |
|
|
|
81 |
|
✗ |
bool create_catalog = args.find('C') != args.end(); |
82 |
|
|
|
83 |
|
✗ |
params.nested_kcatalog_limit = SyncParameters::kDefaultNestedKcatalogLimit; |
84 |
|
✗ |
params.root_kcatalog_limit = SyncParameters::kDefaultRootKcatalogLimit; |
85 |
|
✗ |
params.file_mbyte_limit = SyncParameters::kDefaultFileMbyteLimit; |
86 |
|
|
|
87 |
|
✗ |
params.branched_catalog = false; // could be true? |
88 |
|
|
|
89 |
|
✗ |
if (args.find('P') != args.end()) { |
90 |
|
✗ |
params.session_token_file = *args.find('P')->second; |
91 |
|
|
} |
92 |
|
|
|
93 |
|
✗ |
if (args.find('H') != args.end()) { |
94 |
|
✗ |
params.key_file = *args.find('H')->second; |
95 |
|
|
} |
96 |
|
|
|
97 |
|
✗ |
const bool upload_statsdb = (args.count('I') > 0); |
98 |
|
|
|
99 |
|
✗ |
perf::StatisticsTemplate publish_statistics("publish", this->statistics()); |
100 |
|
|
StatisticsDatabase *stats_db = |
101 |
|
✗ |
StatisticsDatabase::OpenStandardDB(params.repo_name); |
102 |
|
|
|
103 |
|
|
upload::SpoolerDefinition spooler_definition( |
104 |
|
|
params.spooler_definition, hash_algorithm, params.compression_alg, |
105 |
|
✗ |
params.generate_legacy_bulk_chunks, params.use_file_chunking, |
106 |
|
|
params.min_file_chunk_size, params.avg_file_chunk_size, |
107 |
|
✗ |
params.max_file_chunk_size, params.session_token_file, params.key_file); |
108 |
|
✗ |
if (params.max_concurrent_write_jobs > 0) { |
109 |
|
✗ |
spooler_definition.number_of_concurrent_uploads = |
110 |
|
✗ |
params.max_concurrent_write_jobs; |
111 |
|
|
} |
112 |
|
|
|
113 |
|
|
// Sanitize base_directory, removing any leading or trailing slashes |
114 |
|
|
// from non-root (!= "/") paths |
115 |
|
✗ |
params.base_directory = TrimString(params.base_directory, "/", kTrimAll); |
116 |
|
|
|
117 |
|
|
upload::SpoolerDefinition spooler_definition_catalogs( |
118 |
|
✗ |
spooler_definition.Dup2DefaultCompression()); |
119 |
|
|
|
120 |
|
✗ |
params.spooler = upload::Spooler::Construct(spooler_definition, |
121 |
|
|
&publish_statistics); |
122 |
|
✗ |
if (NULL == params.spooler) return 3; |
123 |
|
|
UniquePtr<upload::Spooler> spooler_catalogs( |
124 |
|
|
upload::Spooler::Construct(spooler_definition_catalogs, |
125 |
|
✗ |
&publish_statistics)); |
126 |
|
✗ |
if (!spooler_catalogs.IsValid()) return 3; |
127 |
|
|
|
128 |
|
✗ |
const bool follow_redirects = (args.count('L') > 0); |
129 |
|
✗ |
const string proxy = (args.count('@') > 0) ? *args.find('@')->second : ""; |
130 |
|
✗ |
if (!InitDownloadManager(follow_redirects, proxy)) { |
131 |
|
✗ |
return 3; |
132 |
|
|
} |
133 |
|
|
|
134 |
|
✗ |
if (!InitSignatureManager(params.public_keys)) { |
135 |
|
✗ |
return 3; |
136 |
|
|
} |
137 |
|
|
|
138 |
|
✗ |
bool with_gateway = |
139 |
|
✗ |
spooler_definition.driver_type == upload::SpoolerDefinition::Gateway; |
140 |
|
|
|
141 |
|
|
// This may fail, in which case a warning is printed and the process continues |
142 |
|
✗ |
ObtainDacReadSearchCapability(); |
143 |
|
|
|
144 |
|
✗ |
UniquePtr<manifest::Manifest> manifest; |
145 |
|
✗ |
if (params.branched_catalog) { |
146 |
|
|
// Throw-away manifest |
147 |
|
✗ |
manifest = new manifest::Manifest(shash::Any(), 0, ""); |
148 |
|
|
} else { |
149 |
|
✗ |
if (with_gateway) { |
150 |
|
|
manifest = |
151 |
|
✗ |
FetchRemoteManifest(params.stratum0, params.repo_name, shash::Any()); |
152 |
|
|
} else { |
153 |
|
|
manifest = FetchRemoteManifest(params.stratum0, params.repo_name, |
154 |
|
✗ |
params.base_hash); |
155 |
|
|
} |
156 |
|
|
} |
157 |
|
✗ |
if (!manifest.IsValid()) { |
158 |
|
✗ |
return 3; |
159 |
|
|
} |
160 |
|
|
|
161 |
|
✗ |
const std::string old_root_hash = manifest->catalog_hash().ToString(true); |
162 |
|
|
|
163 |
|
|
catalog::WritableCatalogManager catalog_manager( |
164 |
|
|
params.base_hash, params.stratum0, params.dir_temp, |
165 |
|
|
spooler_catalogs.weak_ref(), |
166 |
|
✗ |
download_manager(), params.enforce_limits, params.nested_kcatalog_limit, |
167 |
|
|
params.root_kcatalog_limit, params.file_mbyte_limit, statistics(), |
168 |
|
✗ |
params.is_balanced, params.max_weight, params.min_weight); |
169 |
|
✗ |
catalog_manager.Init(); |
170 |
|
|
|
171 |
|
✗ |
publish::SyncMediator mediator(&catalog_manager, ¶ms, publish_statistics); |
172 |
|
✗ |
LogCvmfs(kLogPublish, kLogStdout, "Swissknife Ingest: Processing changes..."); |
173 |
|
|
|
174 |
|
|
publish::SyncUnion *sync = new publish::SyncUnionTarball( |
175 |
|
|
&mediator, params.dir_rdonly, params.tar_file, params.base_directory, |
176 |
|
✗ |
params.uid, params.gid, params.to_delete, create_catalog); |
177 |
|
|
|
178 |
|
✗ |
if (!sync->Initialize()) { |
179 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
180 |
|
|
"Swissknife Ingest: Initialization of the synchronisation " |
181 |
|
|
"engine failed"); |
182 |
|
✗ |
return 4; |
183 |
|
|
} |
184 |
|
|
|
185 |
|
✗ |
sync->Traverse(); |
186 |
|
|
|
187 |
|
✗ |
if (!params.authz_file.empty()) { |
188 |
|
|
LogCvmfs(kLogCvmfs, kLogDebug, |
189 |
|
|
"Swissknife Ingest: Adding contents of authz file %s to" |
190 |
|
|
" root catalog.", |
191 |
|
|
params.authz_file.c_str()); |
192 |
|
✗ |
int fd = open(params.authz_file.c_str(), O_RDONLY); |
193 |
|
✗ |
if (fd == -1) { |
194 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
195 |
|
|
"Swissknife Ingest: Unable to open authz file (%s)" |
196 |
|
|
"from the publication process: %s", |
197 |
|
|
params.authz_file.c_str(), strerror(errno)); |
198 |
|
✗ |
return 7; |
199 |
|
|
} |
200 |
|
|
|
201 |
|
✗ |
std::string new_authz; |
202 |
|
✗ |
const bool read_successful = SafeReadToString(fd, &new_authz); |
203 |
|
✗ |
close(fd); |
204 |
|
|
|
205 |
|
✗ |
if (!read_successful) { |
206 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStderr, |
207 |
|
|
"Swissknife Ingest: Failed to read authz file (%s): %s", |
208 |
|
|
params.authz_file.c_str(), strerror(errno)); |
209 |
|
✗ |
return 8; |
210 |
|
|
} |
211 |
|
|
|
212 |
|
✗ |
catalog_manager.SetVOMSAuthz(new_authz); |
213 |
|
|
} |
214 |
|
|
|
215 |
|
✗ |
if (!mediator.Commit(manifest.weak_ref())) { |
216 |
|
✗ |
PrintError("Swissknife Ingest: something went wrong during sync"); |
217 |
|
✗ |
stats_db->StorePublishStatistics(this->statistics(), start_time, false); |
218 |
|
✗ |
if (upload_statsdb) { |
219 |
|
✗ |
stats_db->UploadStatistics(params.spooler); |
220 |
|
|
} |
221 |
|
✗ |
return 5; |
222 |
|
|
} |
223 |
|
|
|
224 |
|
✗ |
perf::Counter *revision_counter = statistics()->Register("publish.revision", |
225 |
|
|
"Published revision number"); |
226 |
|
✗ |
revision_counter->Set(catalog_manager.GetRootCatalog()->revision()); |
227 |
|
|
|
228 |
|
|
// finalize the spooler |
229 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
230 |
|
|
"Swissknife Ingest: Wait for all uploads to finish"); |
231 |
|
✗ |
params.spooler->WaitForUpload(); |
232 |
|
✗ |
spooler_catalogs->WaitForUpload(); |
233 |
|
✗ |
params.spooler->FinalizeSession(false); |
234 |
|
|
|
235 |
|
✗ |
LogCvmfs(kLogCvmfs, kLogStdout, |
236 |
|
|
"Swissknife Ingest: Exporting repository manifest"); |
237 |
|
|
|
238 |
|
|
// We call FinalizeSession(true) this time, to also trigger the commit |
239 |
|
|
// operation on the gateway machine (if the upstream is of type "gw"). |
240 |
|
|
|
241 |
|
|
// Get the path of the new root catalog |
242 |
|
✗ |
const std::string new_root_hash = manifest->catalog_hash().ToString(true); |
243 |
|
|
|
244 |
|
✗ |
if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash, |
245 |
|
|
params.repo_tag)) { |
246 |
|
✗ |
PrintError("Swissknife Ingest: Failed to commit the transaction."); |
247 |
|
✗ |
stats_db->StorePublishStatistics(this->statistics(), start_time, false); |
248 |
|
✗ |
if (upload_statsdb) { |
249 |
|
✗ |
stats_db->UploadStatistics(params.spooler); |
250 |
|
|
} |
251 |
|
✗ |
return 9; |
252 |
|
|
} |
253 |
|
|
|
254 |
|
✗ |
stats_db->StorePublishStatistics(this->statistics(), start_time, true); |
255 |
|
✗ |
if (upload_statsdb) { |
256 |
|
✗ |
stats_db->UploadStatistics(params.spooler); |
257 |
|
|
} |
258 |
|
|
|
259 |
|
✗ |
delete params.spooler; |
260 |
|
|
|
261 |
|
✗ |
if (!manifest->Export(params.manifest_path)) { |
262 |
|
✗ |
PrintError("Swissknife Ingest: Failed to create new repository"); |
263 |
|
✗ |
return 6; |
264 |
|
|
} |
265 |
|
|
|
266 |
|
✗ |
return 0; |
267 |
|
|
} |
268 |
|
|
|