CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
swissknife_ingest.cc
Go to the documentation of this file.
1 
5 #include "swissknife_ingest.h"
6 
7 #include <fcntl.h>
8 #include <unistd.h>
9 
10 #include "catalog_virtual.h"
11 #include "manifest.h"
12 #include "statistics.h"
13 #include "statistics_database.h"
15 #include "sync_mediator.h"
16 #include "sync_union.h"
17 #include "sync_union_tarball.h"
18 #include "util/logging.h"
19 #include "util/pointer.h"
20 #include "util/posix.h"
21 
22 /*
23  * Many of the options possible to set in the ArgumentList are not actually used
24  * by the ingest command since they are not part of its interface, hence those
25  * unused options cannot be set by the shell script. Of course if there is the
26  * necessitty those parameters can be added and managed.
27  * At the moment this approach worked fine and didn't add much complexity,
28  * however if yet another command will need to use a similar approach it would
29  * be good to consider creating different options handler for each command.
30  */
32  std::string start_time = GetGMTimestamp();
33 
34  SyncParameters params;
35  params.dir_rdonly = MakeCanonicalPath(*args.find('c')->second);
36  params.dir_temp = MakeCanonicalPath(*args.find('t')->second);
37  params.base_hash = shash::MkFromHexPtr(shash::HexPtr(*args.find('b')->second),
39  params.stratum0 = *args.find('w')->second;
40  params.manifest_path = *args.find('o')->second;
41  params.spooler_definition = *args.find('r')->second;
42 
43  params.public_keys = *args.find('K')->second;
44  params.repo_name = *args.find('N')->second;
45 
46  if (args.find('T') != args.end()) {
47  params.tar_file = *args.find('T')->second;
48  }
49  if (args.find('B') != args.end()) {
50  params.base_directory = *args.find('B')->second;
51  }
52  if (args.find('D') != args.end()) {
53  params.to_delete = *args.find('D')->second;
54  }
55 
56  if (args.find('O') != args.end()) {
57  params.generate_legacy_bulk_chunks = true;
58  }
59  if (args.find('j') != args.end()) {
60  params.enable_mtime_ns = true;
61  }
62  shash::Algorithms hash_algorithm = shash::kSha1;
63  if (args.find('e') != args.end()) {
64  hash_algorithm = shash::ParseHashAlgorithm(*args.find('e')->second);
65  if (hash_algorithm == shash::kAny) {
66  PrintError("Swissknife Ingest: unknown hash algorithm");
67  return 1;
68  }
69  }
70  if (args.find('Z') != args.end()) {
72  *args.find('Z')->second);
73  }
74  if (args.find('U') != args.end()) {
75  params.uid = static_cast<uid_t>(String2Int64(*args.find('U')->second));
76  }
77  if (args.find('G') != args.end()) {
78  params.gid = static_cast<gid_t>(String2Int64(*args.find('G')->second));
79  }
80 
81  bool create_catalog = args.find('C') != args.end();
82 
86 
87  params.branched_catalog = false; // could be true?
88 
89  if (args.find('P') != args.end()) {
90  params.session_token_file = *args.find('P')->second;
91  }
92 
93  if (args.find('H') != args.end()) {
94  params.key_file = *args.find('H')->second;
95  }
96 
97  const bool upload_statsdb = (args.count('I') > 0);
98 
99  perf::StatisticsTemplate publish_statistics("publish", this->statistics());
101  params.repo_name);
102 
103  upload::SpoolerDefinition spooler_definition(
104  params.spooler_definition, hash_algorithm, params.compression_alg,
107  params.max_file_chunk_size, params.session_token_file, params.key_file);
108  if (params.max_concurrent_write_jobs > 0) {
109  spooler_definition
111  }
112 
113  // Sanitize base_directory, removing any leading or trailing slashes
114  // from non-root (!= "/") paths
115  params.base_directory = TrimString(params.base_directory, "/", kTrimAll);
116 
117  upload::SpoolerDefinition spooler_definition_catalogs(
118  spooler_definition.Dup2DefaultCompression());
119 
120  params.spooler = upload::Spooler::Construct(spooler_definition,
121  &publish_statistics);
122  if (NULL == params.spooler)
123  return 3;
124  UniquePtr<upload::Spooler> spooler_catalogs(upload::Spooler::Construct(
125  spooler_definition_catalogs, &publish_statistics));
126  if (!spooler_catalogs.IsValid())
127  return 3;
128 
129  const bool follow_redirects = (args.count('L') > 0);
130  const string proxy = (args.count('@') > 0) ? *args.find('@')->second : "";
131  if (!InitDownloadManager(follow_redirects, proxy)) {
132  return 3;
133  }
134 
135  if (!InitSignatureManager(params.public_keys)) {
136  return 3;
137  }
138 
139  bool with_gateway = spooler_definition.driver_type
141 
142  // This may fail, in which case a warning is printed and the process continues
144 
146  if (params.branched_catalog) {
147  // Throw-away manifest
148  manifest = new manifest::Manifest(shash::Any(), 0, "");
149  } else {
150  if (with_gateway) {
151  manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
152  shash::Any());
153  } else {
154  manifest = FetchRemoteManifest(params.stratum0, params.repo_name,
155  params.base_hash);
156  }
157  }
158  if (!manifest.IsValid()) {
159  return 3;
160  }
161 
162  const std::string old_root_hash = manifest->catalog_hash().ToString(true);
163 
164  catalog::WritableCatalogManager catalog_manager(
165  params.base_hash, params.stratum0, params.dir_temp,
166  spooler_catalogs.weak_ref(), download_manager(), params.enforce_limits,
168  params.file_mbyte_limit, statistics(), params.is_balanced,
169  params.max_weight, params.min_weight);
170  catalog_manager.Init();
171 
172  publish::SyncMediator mediator(&catalog_manager, &params, publish_statistics);
173  LogCvmfs(kLogPublish, kLogStdout, "Swissknife Ingest: Processing changes...");
174 
176  &mediator, params.dir_rdonly, params.tar_file, params.base_directory,
177  params.uid, params.gid, params.to_delete, create_catalog, "///");
178 
179  if (!sync->Initialize()) {
181  "Swissknife Ingest: Initialization of the synchronisation "
182  "engine failed");
183  return 4;
184  }
185 
186  sync->Traverse();
187 
188  if (!params.authz_file.empty()) {
190  "Swissknife Ingest: Adding contents of authz file %s to"
191  " root catalog.",
192  params.authz_file.c_str());
193  int fd = open(params.authz_file.c_str(), O_RDONLY);
194  if (fd == -1) {
196  "Swissknife Ingest: Unable to open authz file (%s)"
197  "from the publication process: %s",
198  params.authz_file.c_str(), strerror(errno));
199  return 7;
200  }
201 
202  std::string new_authz;
203  const bool read_successful = SafeReadToString(fd, &new_authz);
204  close(fd);
205 
206  if (!read_successful) {
208  "Swissknife Ingest: Failed to read authz file (%s): %s",
209  params.authz_file.c_str(), strerror(errno));
210  return 8;
211  }
212 
213  catalog_manager.SetVOMSAuthz(new_authz);
214  }
215 
216  if (!mediator.Commit(manifest.weak_ref())) {
217  PrintError("Swissknife Ingest: something went wrong during sync");
218  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
219  if (upload_statsdb) {
220  stats_db->UploadStatistics(params.spooler);
221  }
222  return 5;
223  }
224 
225  perf::Counter *revision_counter = statistics()->Register(
226  "publish.revision", "Published revision number");
227  revision_counter->Set(catalog_manager.GetRootCatalog()->revision());
228 
229  // finalize the spooler
231  "Swissknife Ingest: Wait for all uploads to finish");
232  params.spooler->WaitForUpload();
233  spooler_catalogs->WaitForUpload();
234  params.spooler->FinalizeSession(false);
235 
237  "Swissknife Ingest: Exporting repository manifest");
238 
239  // We call FinalizeSession(true) this time, to also trigger the commit
240  // operation on the gateway machine (if the upstream is of type "gw").
241 
242  // Get the path of the new root catalog
243  const std::string new_root_hash = manifest->catalog_hash().ToString(true);
244 
245  if (!spooler_catalogs->FinalizeSession(true, old_root_hash, new_root_hash,
246  params.repo_tag)) {
247  PrintError("Swissknife Ingest: Failed to commit the transaction.");
248  stats_db->StorePublishStatistics(this->statistics(), start_time, false);
249  if (upload_statsdb) {
250  stats_db->UploadStatistics(params.spooler);
251  }
252  return 9;
253  }
254 
255  stats_db->StorePublishStatistics(this->statistics(), start_time, true);
256  if (upload_statsdb) {
257  stats_db->UploadStatistics(params.spooler);
258  }
259 
260  delete params.spooler;
261 
262  if (!manifest->Export(params.manifest_path)) {
263  PrintError("Swissknife Ingest: Failed to create new repository");
264  return 6;
265  }
266 
267  return 0;
268 }
bool Commit(manifest::Manifest *manifest)
Counter * Register(const std::string &name, const std::string &desc)
Definition: statistics.cc:163
Algorithms ParseCompressionAlgorithm(const std::string &algorithm_option)
Definition: compression.cc:153
size_t avg_file_chunk_size
const manifest::Manifest * manifest() const
Definition: repository.h:125
std::string stratum0
DriverType driver_type
the type of the spooler driver
std::string GetGMTimestamp(const std::string &format)
Definition: string.cc:654
bool UploadStatistics(upload::Spooler *spooler, std::string local_path="")
T * weak_ref() const
Definition: pointer.h:46
SpoolerDefinition Dup2DefaultCompression() const
bool generate_legacy_bulk_chunks
unsigned file_mbyte_limit
uint64_t max_concurrent_write_jobs
static const unsigned kDefaultFileMbyteLimit
virtual bool Initialize()
Definition: sync_union.cc:24
std::string to_delete
static const unsigned kDefaultNestedKcatalogLimit
static const unsigned kDefaultRootKcatalogLimit
size_t min_file_chunk_size
std::string spooler_definition
std::string manifest_path
manifest::Manifest * FetchRemoteManifest(const std::string &repository_url, const std::string &repository_name, const shash::Any &base_hash=shash::Any()) const
Definition: server_tool.cc:121
zlib::Algorithms compression_alg
unsigned nested_kcatalog_limit
std::string repo_name
int Main(const ArgumentList &args)
Algorithms
Definition: hash.h:41
std::string key_file
std::string dir_temp
int64_t String2Int64(const string &value)
Definition: string.cc:234
std::string base_directory
download::DownloadManager * download_manager() const
Definition: server_tool.cc:96
void Set(const int64_t val)
Definition: statistics.h:33
std::string dir_rdonly
const char kSuffixCatalog
Definition: hash.h:54
const int kTrimAll
Definition: string.h:21
perf::Statistics * statistics()
Definition: server_tool.h:47
shash::Any base_hash
upload::Spooler * spooler
bool IsValid() const
Definition: pointer.h:47
static StatisticsDatabase * OpenStandardDB(const std::string repo_name)
bool SafeReadToString(int fd, std::string *final_result)
Definition: posix.cc:2116
unsigned root_kcatalog_limit
size_t max_file_chunk_size
bool StorePublishStatistics(const perf::Statistics *statistics, const std::string &start_time, const bool success)
std::string session_token_file
bool InitSignatureManager(const std::string &pubkey_path, const std::string &certificate_path="", const std::string &private_key_path="")
Definition: server_tool.cc:44
bool ObtainDacReadSearchCapability()
virtual void Traverse()=0
std::map< char, SharedPtr< std::string > > ArgumentList
Definition: swissknife.h:72
Algorithms ParseHashAlgorithm(const string &algorithm_option)
Definition: hash.cc:71
std::string TrimString(const std::string &path, const std::string &toTrim, const int trimMode)
Definition: string.cc:488
std::string authz_file
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
RepositoryTag repo_tag
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:98
std::string tar_file
void PrintError(const string &message)
Definition: logging.cc:556
const upload::Spooler * spooler_catalogs() const
Definition: repository.h:321
bool InitDownloadManager(const bool follow_redirects, const std::string &proxy, const unsigned max_pool_handles=1)
Definition: server_tool.cc:17
std::string public_keys
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545