GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/catalog_mgr_rw.cc
Date: 2026-06-28 02:36:10
Exec Total Coverage
Lines: 476 878 54.2%
Branches: 364 1287 28.3%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM file system.
3 */
4
5 #include "catalog_mgr_rw.h"
6
7 #include <inttypes.h>
8 #include <unistd.h>
9
10 #include <cassert>
11 #include <cstdio>
12 #include <cstdlib>
13 #include <string>
14
15 #include "catalog_balancer.h"
16 #include "catalog_rw.h"
17 #include "manifest.h"
18 #include "statistics.h"
19 #include "upload.h"
20 #include "util/exception.h"
21 #include "util/logging.h"
22 #include "util/posix.h"
23 #include "util/smalloc.h"
24
25 using namespace std; // NOLINT
26
27 namespace catalog {
28
29 1239 WritableCatalogManager::WritableCatalogManager(
30 const shash::Any &base_hash,
31 const std::string &stratum0,
32 const string &dir_temp,
33 upload::Spooler *spooler,
34 download::DownloadManager *download_manager,
35 bool enforce_limits,
36 const unsigned nested_kcatalog_limit,
37 const unsigned root_kcatalog_limit,
38 const unsigned file_mbyte_limit,
39 perf::Statistics *statistics,
40 bool is_balanceable,
41 unsigned max_weight,
42 unsigned min_weight,
43 1239 const std::string &dir_cache)
44 : SimpleCatalogManager(base_hash, stratum0, dir_temp, download_manager,
45 statistics, false, dir_cache,
46 true /* copy to tmpdir */)
47 1239 , spooler_(spooler)
48 1239 , enforce_limits_(enforce_limits)
49 1239 , nested_kcatalog_limit_(nested_kcatalog_limit)
50 1239 , root_kcatalog_limit_(root_kcatalog_limit)
51 1239 , file_mbyte_limit_(file_mbyte_limit)
52 1239 , is_balanceable_(is_balanceable)
53 1239 , max_weight_(max_weight)
54 1239 , min_weight_(min_weight)
55 1239 , balance_weight_(max_weight / 2) {
56 1239 sync_lock_ = reinterpret_cast<pthread_mutex_t *>(
57 1239 smalloc(sizeof(pthread_mutex_t)));
58 1239 int retval = pthread_mutex_init(sync_lock_, NULL);
59
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1239 times.
1239 assert(retval == 0);
60 1239 catalog_processing_lock_ = reinterpret_cast<pthread_mutex_t *>(
61 1239 smalloc(sizeof(pthread_mutex_t)));
62 1239 retval = pthread_mutex_init(catalog_processing_lock_, NULL);
63
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1239 times.
1239 assert(retval == 0);
64 1239 }
65
66
67 4952 WritableCatalogManager::~WritableCatalogManager() {
68 2476 pthread_mutex_destroy(sync_lock_);
69 2476 free(sync_lock_);
70 2476 pthread_mutex_destroy(catalog_processing_lock_);
71 2476 free(catalog_processing_lock_);
72 4952 }
73
74
75 /**
76 * This method is virtual in AbstractCatalogManager. It returns a new catalog
77 * structure in the form the different CatalogManagers need it.
78 * In this case it returns a stub for a WritableCatalog.
79 * @param mountpoint the mount point of the catalog stub to create
80 * @param catalog_hash the content hash of the catalog to create
81 * @param parent_catalog the parent of the catalog stub to create
82 * @return a pointer to the catalog stub structure created
83 */
84 3054 Catalog *WritableCatalogManager::CreateCatalog(const PathString &mountpoint,
85 const shash::Any &catalog_hash,
86 Catalog *parent_catalog) {
87 6108 return new WritableCatalog(mountpoint.ToString(), catalog_hash,
88
2/4
✓ Branch 1 taken 3054 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 3054 times.
✗ Branch 5 not taken.
6108 parent_catalog);
89 }
90
91
92 3054 void WritableCatalogManager::ActivateCatalog(Catalog *catalog) {
93 3054 catalog->TakeDatabaseFileOwnership();
94 3054 }
95
96
97 /**
98 * This method is invoked if we create a completely new repository.
99 * The new root catalog will already contain a root entry.
100 * It is uploaded by a Forklift to the upstream storage.
101 * @return true on success, false otherwise
102 */
103 751 manifest::Manifest *WritableCatalogManager::CreateRepository(
104 const string &dir_temp,
105 const bool volatile_content,
106 const std::string &voms_authz,
107 upload::Spooler *spooler) {
108 // Create a new root catalog at file_path
109
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 const string file_path = dir_temp + "/new_root_catalog";
110
111 751 const shash::Algorithms hash_algorithm = spooler->GetHashAlgorithm();
112
113 // A newly created catalog always needs a root entry
114 // we create and configure this here
115
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 DirectoryEntry root_entry;
116 751 root_entry.inode_ = DirectoryEntry::kInvalidInode;
117 751 root_entry.mode_ = 16877;
118 751 root_entry.size_ = 4096;
119 751 root_entry.mtime_ = time(NULL);
120 751 root_entry.uid_ = getuid();
121 751 root_entry.gid_ = getgid();
122
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 root_entry.checksum_ = shash::Any(hash_algorithm);
123 751 root_entry.linkcount_ = 2;
124
1/2
✓ Branch 2 taken 751 times.
✗ Branch 3 not taken.
751 const string root_path = "";
125
126 // Create the database schema and the initial root entry
127 {
128 const UniquePtr<CatalogDatabase> new_clg_db(
129
2/4
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 751 times.
✗ Branch 5 not taken.
751 CatalogDatabase::Create(file_path));
130 751 if (!new_clg_db.IsValid()
131
4/8
✓ Branch 0 taken 751 times.
✗ Branch 1 not taken.
✓ Branch 4 taken 751 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 751 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 751 times.
751 || !new_clg_db->InsertInitialValues(root_path, volatile_content,
132 voms_authz, root_entry)) {
133 LogCvmfs(kLogCatalog, kLogStderr, "creation of catalog '%s' failed",
134 file_path.c_str());
135 return NULL;
136 }
137
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 }
138
139 // Compress root catalog;
140
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 const int64_t catalog_size = GetFileSize(file_path);
141
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 751 times.
751 if (catalog_size < 0) {
142 unlink(file_path.c_str());
143 return NULL;
144 }
145
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 const string file_path_compressed = file_path + ".compressed";
146
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 shash::Any hash_catalog(hash_algorithm, shash::kSuffixCatalog);
147
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 const bool retval = zlib::CompressPath2Path(file_path, file_path_compressed,
148 &hash_catalog);
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 751 times.
751 if (!retval) {
150 LogCvmfs(kLogCatalog, kLogStderr, "compression of catalog '%s' failed",
151 file_path.c_str());
152 unlink(file_path.c_str());
153 return NULL;
154 }
155 751 unlink(file_path.c_str());
156
157 // Create manifest
158
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 const string manifest_path = dir_temp + "/manifest";
159 manifest::Manifest *manifest = new manifest::Manifest(hash_catalog,
160
3/6
✓ Branch 2 taken 751 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 751 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 751 times.
✗ Branch 9 not taken.
751 catalog_size, "");
161
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 751 times.
751 if (!voms_authz.empty()) {
162 manifest->set_has_alt_catalog_path(true);
163 }
164
165 // Upload catalog
166
3/6
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 751 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 751 times.
✗ Branch 8 not taken.
751 spooler->Upload(file_path_compressed, "data/" + hash_catalog.MakePath());
167
1/2
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
751 spooler->WaitForUpload();
168 751 unlink(file_path_compressed.c_str());
169
2/4
✓ Branch 1 taken 751 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 751 times.
751 if (spooler->GetNumberOfErrors() > 0) {
170 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalog %s",
171 file_path_compressed.c_str());
172 delete manifest;
173 return NULL;
174 }
175
176 751 return manifest;
177 751 }
178
179
180 /**
181 * Retrieve the catalog containing the given path.
182 * Other than AbstractCatalogManager::FindCatalog() this mounts nested
183 * catalogs if necessary and returns WritableCatalog objects.
184 * Furthermore it optionally returns the looked-up DirectoryEntry.
185 *
186 * @param path the path to look for
187 * @param result the retrieved catalog (as a pointer)
188 * @param dirent is set to looked up DirectoryEntry for 'path' if non-NULL
189 * @return true if catalog was found
190 */
191 13691 bool WritableCatalogManager::FindCatalog(const string &path,
192 WritableCatalog **result,
193 DirectoryEntry *dirent) {
194
1/2
✓ Branch 1 taken 13691 times.
✗ Branch 2 not taken.
13691 const PathString ps_path(path);
195
196
1/2
✓ Branch 1 taken 13691 times.
✗ Branch 2 not taken.
13691 Catalog *best_fit = AbstractCatalogManager<Catalog>::FindCatalog(ps_path);
197
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13691 times.
13691 assert(best_fit != NULL);
198 13691 Catalog *catalog = NULL;
199
1/2
✓ Branch 1 taken 13691 times.
✗ Branch 2 not taken.
13691 const bool retval = MountSubtree(ps_path, best_fit, true /* is_listable */,
200 &catalog);
201
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 13691 times.
13691 if (!retval)
202 return false;
203
204 13691 *result = static_cast<WritableCatalog *>(catalog);
205
206
1/2
✓ Branch 1 taken 13691 times.
✗ Branch 2 not taken.
13691 catalog::DirectoryEntry dummy;
207
2/2
✓ Branch 0 taken 6873 times.
✓ Branch 1 taken 6818 times.
13691 if (NULL == dirent) {
208 6873 dirent = &dummy;
209 }
210
1/2
✓ Branch 1 taken 13691 times.
✗ Branch 2 not taken.
13691 const bool found = catalog->LookupPath(ps_path, dirent);
211
6/8
✓ Branch 0 taken 13648 times.
✓ Branch 1 taken 43 times.
✓ Branch 3 taken 13648 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 13648 times.
✓ Branch 7 taken 43 times.
✓ Branch 8 taken 13648 times.
13691 if (!found || !catalog->IsWritable())
212 43 return false;
213
214 13648 return true;
215 13691 }
216
217
218 WritableCatalog *WritableCatalogManager::GetHostingCatalog(
219 const std::string &path) {
220 WritableCatalog *result = NULL;
221 const bool retval = FindCatalog(MakeRelativePath(path), &result, NULL);
222 if (!retval)
223 return NULL;
224 return result;
225 }
226
227
228 /**
229 * Remove the given file from the catalogs.
230 * @param file_path the full path to the file to be removed
231 * @return true on success, false otherwise
232 */
233 172 void WritableCatalogManager::RemoveFile(const std::string &path) {
234
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 const string file_path = MakeRelativePath(path);
235
236 172 SyncLock();
237 172 WritableCatalog *catalog = NULL;
238
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 DirectoryEntry entry;
239
2/4
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 172 times.
✗ Branch 4 not taken.
172 if (FindCatalog(file_path, &catalog, &entry)) {
240
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 172 times.
172 if (entry.IsBundleTrigger()) {
241 catalog->RemoveEntry(
242 GetParentPath(file_path) + "/.cvmfsbundle-" + GetFileName(file_path));
243 }
244
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 catalog->RemoveEntry(file_path);
245 }
246 172 SyncUnlock();
247
248
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 172 times.
172 if (catalog == NULL) {
249 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
250 file_path.c_str());
251 }
252 172 }
253
254
255 /**
256 * Remove the given directory from the catalogs.
257 * @param directory_path the full path to the directory to be removed
258 * @return true on success, false otherwise
259 */
260 217 void WritableCatalogManager::RemoveDirectory(const std::string &path) {
261
1/2
✓ Branch 1 taken 217 times.
✗ Branch 2 not taken.
217 const string directory_path = MakeRelativePath(path);
262
1/2
✓ Branch 1 taken 217 times.
✗ Branch 2 not taken.
217 const string parent_path = GetParentPath(directory_path);
263
264 217 SyncLock();
265 WritableCatalog *catalog;
266
1/2
✓ Branch 1 taken 217 times.
✗ Branch 2 not taken.
217 DirectoryEntry parent_entry;
267
2/4
✓ Branch 1 taken 217 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 217 times.
217 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
268 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
269 directory_path.c_str());
270 }
271
272 217 parent_entry.set_linkcount(parent_entry.linkcount() - 1);
273
274
1/2
✓ Branch 1 taken 217 times.
✗ Branch 2 not taken.
217 catalog->RemoveEntry(directory_path);
275
1/2
✓ Branch 1 taken 217 times.
✗ Branch 2 not taken.
217 catalog->UpdateEntry(parent_entry, parent_path);
276
2/2
✓ Branch 1 taken 87 times.
✓ Branch 2 taken 130 times.
217 if (parent_entry.IsNestedCatalogRoot()) {
277
1/2
✓ Branch 2 taken 87 times.
✗ Branch 3 not taken.
87 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
278 parent_path.c_str());
279 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
280 87 catalog->parent());
281 87 parent_entry.set_is_nested_catalog_mountpoint(true);
282 87 parent_entry.set_is_nested_catalog_root(false);
283
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 parent_catalog->UpdateEntry(parent_entry, parent_path);
284 }
285 217 SyncUnlock();
286 217 }
287
288 /**
289 * Clone the file called `source` changing its name into `destination`, the
290 * source file is keep intact.
291 * @params destination, the name of the new file, complete path
292 * @params source, the name of the file to clone, which must be already in the
293 * repository
294 * @params fail_if_source_missing, when true (default) abort if the source is
295 * not in the catalog; when false, skip the clone and return false instead.
296 * This is used by the tarball ingestion engine to tolerate hardlinks whose
297 * target is not part of the same archive (e.g. cross-layer hardlinks in OCI
298 * image layers).
299 * @return true if the file was cloned, false if the source was missing and
300 * fail_if_source_missing was false
301 */
302 bool WritableCatalogManager::Clone(const std::string destination,
303 const std::string source,
304 const bool fail_if_source_missing) {
305 const std::string relative_source = MakeRelativePath(source);
306
307 DirectoryEntry source_dirent;
308 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
309 if (fail_if_source_missing) {
310 PANIC(kLogStderr, "catalog for file '%s' cannot be found, aborting",
311 source.c_str());
312 }
313 // The caller opted out of aborting and is expected to handle the missing
314 // source (e.g. by materializing a replacement file), so only log at debug
315 // level here to avoid duplicate user-facing warnings.
316 LogCvmfs(kLogCatalog, kLogDebug,
317 "catalog for clone source '%s' cannot be found, not cloning "
318 "to '%s'",
319 source.c_str(), destination.c_str());
320 return false;
321 }
322 if (source_dirent.IsDirectory()) {
323 PANIC(kLogStderr, "Trying to clone a directory: '%s', aborting",
324 source.c_str());
325 }
326
327 // if the file is already there we remove it and we add it back
328 DirectoryEntry check_dirent;
329 const bool destination_already_present = LookupPath(
330 MakeRelativePath(destination), kLookupDefault, &check_dirent);
331 if (destination_already_present) {
332 this->RemoveFile(destination);
333 }
334
335 DirectoryEntry destination_dirent(source_dirent);
336 std::string destination_dirname;
337 std::string destination_filename;
338 SplitPath(destination, &destination_dirname, &destination_filename);
339
340 destination_dirent.name_.Assign(
341 NameString(destination_filename.c_str(), destination_filename.length()));
342
343 // TODO(jblomer): clone is used by tarball engine and should eventually
344 // support extended attributes
345 this->AddFile(destination_dirent, empty_xattrs, destination_dirname);
346 return true;
347 }
348
349
350 /**
351 * Copies an entire directory tree from the existing from_dir to the
352 * non-existing to_dir. The destination's parent directory must exist. On the
353 * catalog level, the new entries will be identical to the old ones except
354 * for their path hash fields.
355 */
356 431 void WritableCatalogManager::CloneTree(const std::string &from_dir,
357 const std::string &to_dir) {
358 // Sanitize input paths
359
6/6
✓ Branch 1 taken 345 times.
✓ Branch 2 taken 86 times.
✓ Branch 4 taken 43 times.
✓ Branch 5 taken 302 times.
✓ Branch 6 taken 129 times.
✓ Branch 7 taken 302 times.
431 if (from_dir.empty() || to_dir.empty())
360 129 PANIC(kLogStderr, "clone tree from or to root impossible");
361
362
1/2
✓ Branch 1 taken 302 times.
✗ Branch 2 not taken.
302 const std::string relative_source = MakeRelativePath(from_dir);
363
1/2
✓ Branch 1 taken 302 times.
✗ Branch 2 not taken.
302 const std::string relative_dest = MakeRelativePath(to_dir);
364
365
2/2
✓ Branch 1 taken 43 times.
✓ Branch 2 taken 259 times.
302 if (relative_source == relative_dest) {
366 43 PANIC(kLogStderr, "cannot clone tree into itself ('%s')", to_dir.c_str());
367 }
368
4/7
✓ Branch 1 taken 259 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 259 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 43 times.
✓ Branch 8 taken 216 times.
259 if (HasPrefix(relative_dest, relative_source + "/", false /*ignore_case*/)) {
369 43 PANIC(kLogStderr,
370 "cannot clone tree into sub directory of source '%s' --> '%s'",
371 from_dir.c_str(), to_dir.c_str());
372 }
373
374
1/2
✓ Branch 1 taken 216 times.
✗ Branch 2 not taken.
216 DirectoryEntry source_dirent;
375
3/4
✓ Branch 1 taken 216 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
✓ Branch 4 taken 173 times.
216 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
376 43 PANIC(kLogStderr, "path '%s' cannot be found, aborting", from_dir.c_str());
377 }
378
2/2
✓ Branch 1 taken 43 times.
✓ Branch 2 taken 130 times.
173 if (!source_dirent.IsDirectory()) {
379 43 PANIC(kLogStderr, "CloneTree: source '%s' not a directory, aborting",
380 from_dir.c_str());
381 }
382
383
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 DirectoryEntry dest_dirent;
384
3/4
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
✓ Branch 4 taken 87 times.
130 if (LookupPath(relative_dest, kLookupDefault, &dest_dirent)) {
385 43 PANIC(kLogStderr, "destination '%s' exists, aborting", to_dir.c_str());
386 }
387
388
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 const std::string dest_parent = GetParentPath(relative_dest);
389
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 DirectoryEntry dest_parent_dirent;
390
3/4
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
✓ Branch 4 taken 44 times.
87 if (!LookupPath(dest_parent, kLookupDefault, &dest_parent_dirent)) {
391 43 PANIC(kLogStderr, "destination '%s' not on a known path, aborting",
392 to_dir.c_str());
393 }
394
395
2/4
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 44 times.
✗ Branch 5 not taken.
44 CloneTreeImpl(PathString(from_dir),
396
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
88 GetParentPath(to_dir),
397
2/4
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 44 times.
✗ Branch 5 not taken.
88 NameString(GetFileName(to_dir)));
398 904 }
399
400
401 /**
402 * Called from CloneTree(), assumes that from_dir and to_dir are sufficiently
403 * sanitized
404 */
405 352 void WritableCatalogManager::CloneTreeImpl(const PathString &source_dir,
406 const std::string &dest_parent_dir,
407 const NameString &dest_name) {
408
1/2
✓ Branch 4 taken 352 times.
✗ Branch 5 not taken.
352 LogCvmfs(kLogCatalog, kLogDebug, "cloning %s --> %s/%s", source_dir.c_str(),
409
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
704 dest_parent_dir.c_str(), dest_name.ToString().c_str());
410
3/6
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 352 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 352 times.
✗ Branch 8 not taken.
704 const PathString relative_source(MakeRelativePath(source_dir.ToString()));
411
412
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 DirectoryEntry source_dirent;
413
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 bool retval = LookupPath(relative_source, kLookupDefault, &source_dirent);
414
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 352 times.
352 assert(retval);
415
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 352 times.
352 assert(!source_dirent.IsBindMountpoint());
416
417
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 DirectoryEntry dest_dirent(source_dirent);
418
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 dest_dirent.name_.Assign(dest_name);
419 // Just in case, reset the nested catalog markers
420 352 dest_dirent.set_is_nested_catalog_mountpoint(false);
421 352 dest_dirent.set_is_nested_catalog_root(false);
422
423 352 XattrList xattrs;
424
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 352 times.
352 if (source_dirent.HasXattrs()) {
425 retval = LookupXattrs(relative_source, &xattrs);
426 assert(retval);
427 }
428
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 AddDirectory(dest_dirent, xattrs, dest_parent_dir);
429
430
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 std::string dest_dir = dest_parent_dir;
431
2/2
✓ Branch 1 taken 308 times.
✓ Branch 2 taken 44 times.
352 if (!dest_dir.empty())
432
1/2
✓ Branch 1 taken 308 times.
✗ Branch 2 not taken.
308 dest_dir.push_back('/');
433
2/4
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 352 times.
✗ Branch 5 not taken.
352 dest_dir += dest_name.ToString();
434 352 if (source_dirent.IsNestedCatalogRoot()
435
5/6
✓ Branch 0 taken 220 times.
✓ Branch 1 taken 132 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 220 times.
✓ Branch 5 taken 132 times.
✓ Branch 6 taken 220 times.
352 || source_dirent.IsNestedCatalogMountpoint()) {
436
1/2
✓ Branch 1 taken 132 times.
✗ Branch 2 not taken.
132 CreateNestedCatalog(dest_dir);
437 }
438
439 352 DirectoryEntryList ls;
440
1/2
✓ Branch 1 taken 352 times.
✗ Branch 2 not taken.
352 retval = Listing(relative_source, &ls, false /* expand_symlink */);
441
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 352 times.
352 assert(retval);
442
2/2
✓ Branch 1 taken 748 times.
✓ Branch 2 taken 352 times.
1100 for (unsigned i = 0; i < ls.size(); ++i) {
443
1/2
✓ Branch 1 taken 748 times.
✗ Branch 2 not taken.
748 PathString sub_path(source_dir);
444
2/4
✓ Branch 1 taken 748 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 748 times.
748 assert(!sub_path.IsEmpty());
445
1/2
✓ Branch 1 taken 748 times.
✗ Branch 2 not taken.
748 sub_path.Append("/", 1);
446
3/6
✓ Branch 2 taken 748 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 748 times.
✗ Branch 8 not taken.
✓ Branch 11 taken 748 times.
✗ Branch 12 not taken.
748 sub_path.Append(ls[i].name().GetChars(), ls[i].name().GetLength());
447
448
2/2
✓ Branch 2 taken 308 times.
✓ Branch 3 taken 440 times.
748 if (ls[i].IsDirectory()) {
449
2/4
✓ Branch 2 taken 308 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 308 times.
✗ Branch 6 not taken.
308 CloneTreeImpl(sub_path, dest_dir, ls[i].name());
450 308 continue;
451 }
452
453 // We break hard-links during cloning
454 440 ls[i].set_hardlink_group(0);
455 440 ls[i].set_linkcount(1);
456
457 440 xattrs.Clear();
458
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 440 times.
440 if (ls[i].HasXattrs()) {
459 retval = LookupXattrs(sub_path, &xattrs);
460 assert(retval);
461 }
462
463
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 440 times.
440 if (ls[i].IsChunkedFile()) {
464 FileChunkList chunks;
465 const std::string relative_sub_path = MakeRelativePath(
466 sub_path.ToString());
467 retval = ListFileChunks(PathString(relative_sub_path),
468 ls[i].hash_algorithm(), &chunks);
469 assert(retval);
470 AddChunkedFile(ls[i], xattrs, dest_dir, chunks);
471 } else {
472
1/2
✓ Branch 2 taken 440 times.
✗ Branch 3 not taken.
440 AddFile(ls[i], xattrs, dest_dir);
473 }
474
2/2
✓ Branch 1 taken 440 times.
✓ Branch 2 taken 308 times.
748 }
475 352 }
476
477
478 /**
479 * Add a new directory to the catalogs.
480 * @param entry a DirectoryEntry structure describing the new directory
481 * @param parent_directory the absolute path of the directory containing the
482 * directory to be created
483 * @return true on success, false otherwise
484 */
485 5004 void WritableCatalogManager::AddDirectory(const DirectoryEntryBase &entry,
486 const XattrList &xattrs,
487 const std::string &parent_directory) {
488
1/2
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
5004 const string parent_path = MakeRelativePath(parent_directory);
489
1/2
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
5004 string directory_path = parent_path + "/";
490
3/6
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 5004 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 5004 times.
✗ Branch 10 not taken.
5004 directory_path.append(entry.name().GetChars(), entry.name().GetLength());
491
492 5004 SyncLock();
493 WritableCatalog *catalog;
494
1/2
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
5004 DirectoryEntry parent_entry;
495
2/4
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 5004 times.
5004 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
496 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
497 directory_path.c_str());
498 }
499
500
1/2
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
5004 DirectoryEntry fixed_hardlink_count(entry);
501 5004 fixed_hardlink_count.set_linkcount(2);
502
1/2
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
5004 catalog->AddEntry(fixed_hardlink_count, xattrs, directory_path, parent_path);
503
504 5004 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
505
1/2
✓ Branch 1 taken 5004 times.
✗ Branch 2 not taken.
5004 catalog->UpdateEntry(parent_entry, parent_path);
506
2/2
✓ Branch 1 taken 132 times.
✓ Branch 2 taken 4872 times.
5004 if (parent_entry.IsNestedCatalogRoot()) {
507
1/2
✓ Branch 2 taken 132 times.
✗ Branch 3 not taken.
132 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
508 parent_path.c_str());
509 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
510 132 catalog->parent());
511 132 parent_entry.set_is_nested_catalog_mountpoint(true);
512 132 parent_entry.set_is_nested_catalog_root(false);
513
1/2
✓ Branch 1 taken 132 times.
✗ Branch 2 not taken.
132 parent_catalog->UpdateEntry(parent_entry, parent_path);
514 }
515 5004 SyncUnlock();
516 5004 }
517
518 /**
519 * Add a new file to the catalogs.
520 * @param entry a DirectoryEntry structure describing the new file
521 * @param parent_directory the absolute path of the directory containing the
522 * file to be created
523 * @return true on success, false otherwise
524 */
525 6399 void WritableCatalogManager::AddFile(const DirectoryEntry &entry,
526 const XattrList &xattrs,
527 const std::string &parent_directory) {
528
1/2
✓ Branch 1 taken 6399 times.
✗ Branch 2 not taken.
6399 const string parent_path = MakeRelativePath(parent_directory);
529
1/2
✓ Branch 1 taken 6399 times.
✗ Branch 2 not taken.
6399 const string file_path = entry.GetFullPath(parent_path);
530
531 6399 SyncLock();
532 WritableCatalog *catalog;
533
2/4
✓ Branch 1 taken 6399 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 6399 times.
6399 if (!FindCatalog(parent_path, &catalog)) {
534 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
535 file_path.c_str());
536 }
537
538
4/6
✓ Branch 1 taken 6266 times.
✓ Branch 2 taken 133 times.
✓ Branch 4 taken 6266 times.
✗ Branch 5 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 6266 times.
6399 assert(!entry.IsRegular() || entry.IsChunkedFile()
539 || !entry.checksum().IsNull());
540
3/4
✓ Branch 1 taken 133 times.
✓ Branch 2 taken 6266 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 133 times.
6399 assert(entry.IsRegular() || !entry.IsExternalFile());
541
542 // check if file is too big
543
1/2
✓ Branch 1 taken 6399 times.
✗ Branch 2 not taken.
6399 const unsigned mbytes = entry.size() / (1024 * 1024);
544
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6399 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6399 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
545 LogCvmfs(kLogCatalog, kLogStderr,
546 "%s: file at %s is larger than %u megabytes (%u). "
547 "CernVM-FS works best with small files. "
548 "Please remove the file or increase the limit.",
549 enforce_limits_ ? "FATAL" : "WARNING", file_path.c_str(),
550 file_mbyte_limit_, mbytes);
551 if (enforce_limits_)
552 PANIC(kLogStderr, "file at %s is larger than %u megabytes (%u).",
553 file_path.c_str(), file_mbyte_limit_, mbytes);
554 }
555
556
1/2
✓ Branch 1 taken 6399 times.
✗ Branch 2 not taken.
6399 catalog->AddEntry(entry, xattrs, file_path, parent_path);
557 6399 SyncUnlock();
558 6399 }
559
560
561 void WritableCatalogManager::AddChunkedFile(const DirectoryEntryBase &entry,
562 const XattrList &xattrs,
563 const std::string &parent_directory,
564 const FileChunkList &file_chunks) {
565 assert(file_chunks.size() > 0);
566
567 DirectoryEntry full_entry(entry);
568 full_entry.set_is_chunked_file(true);
569
570 AddFile(full_entry, xattrs, parent_directory);
571
572 const string parent_path = MakeRelativePath(parent_directory);
573 const string file_path = entry.GetFullPath(parent_path);
574
575 SyncLock();
576 WritableCatalog *catalog;
577 if (!FindCatalog(parent_path, &catalog)) {
578 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
579 file_path.c_str());
580 }
581
582 for (unsigned i = 0; i < file_chunks.size(); ++i) {
583 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
584 }
585 SyncUnlock();
586 }
587
588 /**
589 * Reads the entry given by file_path and sets the bundle trigger flag
590 * accordingly. If new_value is true, the regular file at file_path must exist.
591 * If new_value is false, a missing file_path is ignored (otherwise it would
592 * be hard to get rid of dangling bundle trigger markers).
593 */
594 void WritableCatalogManager::UpdateBundleTrigger(const std::string &file_path,
595 bool new_value)
596 {
597 SyncLock();
598
599 WritableCatalog *catalog = NULL;
600 DirectoryEntry entry;
601 if (!FindCatalog(file_path, &catalog, &entry)) {
602 SyncUnlock();
603
604 if (new_value) {
605 PANIC(kLogStderr, "failed to find catalog of %s", file_path.c_str());
606 }
607
608 LogCvmfs(kLogCatalog, kLogDebug, "dangling bundle trigger %s",
609 file_path.c_str());
610 return;
611 }
612
613 if (new_value && !entry.IsRegular()) {
614 SyncUnlock();
615 PANIC(kLogStderr, "failed to set bundle trigger on non-regular file %s",
616 file_path.c_str());
617 }
618
619 entry.set_is_bundle_trigger(new_value);
620 catalog->UpdateEntry(entry, file_path);
621
622 SyncUnlock();
623 }
624
625
626 /**
627 * Add a hardlink group to the catalogs.
628 * @param entries a list of DirectoryEntries describing the new files
629 * @param parent_directory the absolute path of the directory containing the
630 * files to be created
631 * @return true on success, false otherwise
632 */
633 void WritableCatalogManager::AddHardlinkGroup(
634 const DirectoryEntryBaseList &entries,
635 const XattrList &xattrs,
636 const std::string &parent_directory,
637 const FileChunkList &file_chunks) {
638 assert(entries.size() >= 1);
639 assert(file_chunks.IsEmpty() || entries[0].IsRegular());
640 if (entries.size() == 1) {
641 DirectoryEntry fix_linkcount(entries[0]);
642 fix_linkcount.set_linkcount(1);
643 if (file_chunks.IsEmpty())
644 return AddFile(fix_linkcount, xattrs, parent_directory);
645 return AddChunkedFile(fix_linkcount, xattrs, parent_directory, file_chunks);
646 }
647
648 LogCvmfs(kLogCatalog, kLogVerboseMsg, "adding hardlink group %s/%s",
649 parent_directory.c_str(), entries[0].name().c_str());
650
651 // Hardlink groups have to reside in the same directory.
652 // Therefore we only have one parent directory here
653 const string parent_path = MakeRelativePath(parent_directory);
654
655 // check if hard link is too big
656 const unsigned mbytes = entries[0].size() / (1024 * 1024);
657 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
658 LogCvmfs(kLogCatalog, kLogStderr,
659 "%s: hard link at %s is larger than %u megabytes (%u). "
660 "CernVM-FS works best with small files. "
661 "Please remove the file or increase the limit.",
662 enforce_limits_ ? "FATAL" : "WARNING",
663 (parent_path + entries[0].name().ToString()).c_str(),
664 file_mbyte_limit_, mbytes);
665 if (enforce_limits_)
666 PANIC(kLogStderr, "hard link at %s is larger than %u megabytes (%u)",
667 (parent_path + entries[0].name().ToString()).c_str(),
668 file_mbyte_limit_, mbytes);
669 }
670
671 SyncLock();
672 WritableCatalog *catalog;
673 if (!FindCatalog(parent_path, &catalog)) {
674 PANIC(kLogStderr,
675 "catalog for hardlink group containing '%s' cannot be found",
676 parent_path.c_str());
677 }
678
679 // Get a valid hardlink group id for the catalog the group will end up in
680 // TODO(unknown): Compaction
681 const uint32_t new_group_id = catalog->GetMaxLinkId() + 1;
682 LogCvmfs(kLogCatalog, kLogVerboseMsg, "hardlink group id %u issued",
683 new_group_id);
684 assert(new_group_id > 0);
685
686 // Add the file entries to the catalog
687 for (DirectoryEntryBaseList::const_iterator i = entries.begin(),
688 iEnd = entries.end();
689 i != iEnd;
690 ++i) {
691 string file_path = parent_path + "/";
692 file_path.append(i->name().GetChars(), i->name().GetLength());
693
694 // create a fully fledged DirectoryEntry to add the hardlink group to it
695 // which is CVMFS specific meta data.
696 DirectoryEntry hardlink(*i);
697 hardlink.set_hardlink_group(new_group_id);
698 hardlink.set_linkcount(entries.size());
699 hardlink.set_is_chunked_file(!file_chunks.IsEmpty());
700
701 catalog->AddEntry(hardlink, xattrs, file_path, parent_path);
702 if (hardlink.IsChunkedFile()) {
703 for (unsigned i = 0; i < file_chunks.size(); ++i) {
704 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
705 }
706 }
707 }
708 SyncUnlock();
709 }
710
711
712 void WritableCatalogManager::ShrinkHardlinkGroup(const string &remove_path) {
713 const string relative_path = MakeRelativePath(remove_path);
714
715 SyncLock();
716 WritableCatalog *catalog;
717 if (!FindCatalog(relative_path, &catalog)) {
718 PANIC(kLogStderr,
719 "catalog for hardlink group containing '%s' cannot be found",
720 remove_path.c_str());
721 }
722
723 catalog->IncLinkcount(relative_path, -1);
724 SyncUnlock();
725 }
726
727
728 /**
729 * Update entry meta data (mode, owner, ...).
730 * CVMFS specific meta data (i.e. nested catalog transition points) are NOT
731 * changed by this method, although transition points intrinsics are taken into
732 * account, to keep nested catalogs consistent.
733 * @param entry the directory entry to be touched
734 * @param path the path of the directory entry to be touched
735 */
736 43 void WritableCatalogManager::TouchDirectory(const DirectoryEntryBase &entry,
737 const XattrList &xattrs,
738 const std::string &directory_path) {
739
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
43 assert(entry.IsDirectory());
740
741
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 const string entry_path = MakeRelativePath(directory_path);
742
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 const string parent_path = GetParentPath(entry_path);
743
744 43 SyncLock();
745 // find the catalog to be updated
746 WritableCatalog *catalog;
747
2/4
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 43 times.
43 if (!FindCatalog(parent_path, &catalog)) {
748 PANIC(kLogStderr, "catalog for entry '%s' cannot be found",
749 entry_path.c_str());
750 }
751
752
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 catalog->TouchEntry(entry, xattrs, entry_path);
753
754 // since we deal with a directory here, we might just touch a
755 // nested catalog transition point. If this is the case we would need to
756 // update two catalog entries:
757 // * the nested catalog MOUNTPOINT in the parent catalog
758 // * the nested catalog ROOT in the nested catalog
759
760 // first check if we really have a nested catalog transition point
761
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 catalog::DirectoryEntry potential_transition_point;
762
1/2
✓ Branch 3 taken 43 times.
✗ Branch 4 not taken.
43 const PathString transition_path(entry_path.data(), entry_path.length());
763
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 bool retval = catalog->LookupPath(transition_path,
764 &potential_transition_point);
765
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 43 times.
43 assert(retval);
766
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 43 times.
43 if (potential_transition_point.IsNestedCatalogMountpoint()) {
767 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point at %s",
768 entry_path.c_str());
769
770 // find and mount nested catalog associated to this transition point
771 shash::Any nested_hash;
772 uint64_t nested_size;
773 retval = catalog->FindNested(transition_path, &nested_hash, &nested_size);
774 assert(retval);
775 Catalog *nested_catalog;
776 nested_catalog = MountCatalog(transition_path, nested_hash, catalog);
777 assert(nested_catalog != NULL);
778
779 // update nested catalog root in the child catalog
780 reinterpret_cast<WritableCatalog *>(nested_catalog)
781 ->TouchEntry(entry, xattrs, entry_path);
782 }
783
784 43 SyncUnlock();
785 43 }
786
787
788 /**
789 * Create a new nested catalog. Includes moving all entries belonging there
790 * from it's parent catalog.
791 * @param mountpoint the path of the directory to become a nested root
792 * @return true on success, false otherwise
793 */
794 1338 void WritableCatalogManager::CreateNestedCatalog(
795 const std::string &mountpoint) {
796
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 const string nested_root_path = MakeRelativePath(mountpoint);
797
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 const PathString ps_nested_root_path(nested_root_path);
798
799 1338 SyncLock();
800 // Find the catalog currently containing the directory structure, which
801 // will be represented as a new nested catalog and its root-entry/mountpoint
802 // along the way
803 1338 WritableCatalog *old_catalog = NULL;
804
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 DirectoryEntry new_root_entry;
805
2/4
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1338 times.
1338 if (!FindCatalog(nested_root_path, &old_catalog, &new_root_entry)) {
806 PANIC(kLogStderr,
807 "failed to create nested catalog '%s': "
808 "mountpoint was not found in current catalog structure",
809 nested_root_path.c_str());
810 }
811
812 // Create the database schema and the initial root entry
813 // for the new nested catalog
814
1/2
✓ Branch 2 taken 1338 times.
✗ Branch 3 not taken.
1338 const string database_file_path = CreateTempPath(dir_temp() + "/catalog",
815
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 0666);
816 1338 const bool volatile_content = false;
817
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 CatalogDatabase *new_catalog_db = CatalogDatabase::Create(database_file_path);
818
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1338 times.
1338 assert(NULL != new_catalog_db);
819 // Note we do not set the external_data bit for nested catalogs
820
2/4
✓ Branch 2 taken 1338 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1338 times.
✗ Branch 6 not taken.
1338 bool retval = new_catalog_db->InsertInitialValues(
821 nested_root_path,
822 volatile_content,
823 "", // At this point, only root
824 // catalog gets VOMS authz
825 new_root_entry);
826
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1338 times.
1338 assert(retval);
827 // TODO(rmeusel): we need a way to attach a catalog directly from an open
828 // database to remove this indirection
829
1/2
✓ Branch 0 taken 1338 times.
✗ Branch 1 not taken.
1338 delete new_catalog_db;
830 1338 new_catalog_db = NULL;
831
832 // Attach the just created nested catalog
833
2/4
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1338 times.
✗ Branch 5 not taken.
1338 Catalog *new_catalog = CreateCatalog(ps_nested_root_path, shash::Any(),
834 old_catalog);
835
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 retval = AttachCatalog(database_file_path, new_catalog);
836
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1338 times.
1338 assert(retval);
837
838
2/4
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1338 times.
1338 assert(new_catalog->IsWritable());
839 1338 WritableCatalog *wr_new_catalog = static_cast<WritableCatalog *>(new_catalog);
840
841
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1338 times.
1338 if (new_root_entry.HasXattrs()) {
842 XattrList xattrs;
843 retval = old_catalog->LookupXattrsPath(ps_nested_root_path, &xattrs);
844 assert(retval);
845 wr_new_catalog->TouchEntry(new_root_entry, xattrs, nested_root_path);
846 }
847
848 // From now on, there are two catalogs, spanning the same directory structure
849 // we have to split the overlapping directory entries from the old catalog
850 // to the new catalog to re-gain a valid catalog structure
851
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 old_catalog->Partition(wr_new_catalog);
852
853 // Add the newly created nested catalog to the references of the containing
854 // catalog
855
4/8
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1338 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1338 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1338 times.
✗ Branch 11 not taken.
1338 old_catalog->InsertNestedCatalog(new_catalog->mountpoint().ToString(), NULL,
856 1338 shash::Any(spooler_->GetHashAlgorithm()), 0);
857
858 // Fix subtree counters in new nested catalogs: subtree is the sum of all
859 // entries of all "grand-nested" catalogs
860 // Note: taking a copy of the nested catalog list here
861 const Catalog::NestedCatalogList
862
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 &grand_nested = wr_new_catalog->ListOwnNestedCatalogs();
863
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 DeltaCounters fix_subtree_counters;
864 2676 for (Catalog::NestedCatalogList::const_iterator i = grand_nested.begin(),
865 1338 iEnd = grand_nested.end();
866
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1338 times.
1338 i != iEnd;
867 ++i) {
868 WritableCatalog *grand_catalog;
869 retval = FindCatalog(i->mountpoint.ToString(), &grand_catalog);
870 assert(retval);
871 const Counters &grand_counters = grand_catalog->GetCounters();
872 grand_counters.AddAsSubtree(&fix_subtree_counters);
873 }
874 1338 const DeltaCounters save_counters = wr_new_catalog->delta_counters_;
875 1338 wr_new_catalog->delta_counters_ = fix_subtree_counters;
876
1/2
✓ Branch 1 taken 1338 times.
✗ Branch 2 not taken.
1338 wr_new_catalog->UpdateCounters();
877 1338 wr_new_catalog->delta_counters_ = save_counters;
878
879 1338 SyncUnlock();
880 1338 }
881
882
883 /**
884 * Remove a nested catalog
885 *
886 * If the merged parameter is true, when you remove a nested catalog
887 * all entries currently held by it will be merged into its parent
888 * catalog.
889 * @param mountpoint - the path of the nested catalog to be removed
890 * @param merge - merge the subtree associated with the nested catalog
891 * into its parent catalog
892 * @return - true on success, false otherwise
893 */
894 130 void WritableCatalogManager::RemoveNestedCatalog(const string &mountpoint,
895 const bool merge) {
896
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 const string nested_root_path = MakeRelativePath(mountpoint);
897
898 130 SyncLock();
899 // Find the catalog which should be removed
900 130 WritableCatalog *nested_catalog = NULL;
901
2/4
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 130 times.
130 if (!FindCatalog(nested_root_path, &nested_catalog)) {
902 PANIC(kLogStderr,
903 "failed to remove nested catalog '%s': "
904 "mountpoint was not found in current catalog structure",
905 nested_root_path.c_str());
906 }
907
908 // Check if the found catalog is really the nested catalog to be deleted
909
6/19
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 130 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 130 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 130 times.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
✓ Branch 13 taken 130 times.
✗ Branch 14 not taken.
✓ Branch 16 taken 130 times.
✗ Branch 17 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
260 assert(!nested_catalog->IsRoot()
910 && (nested_catalog->mountpoint().ToString() == nested_root_path));
911
912
1/2
✓ Branch 0 taken 130 times.
✗ Branch 1 not taken.
130 if (merge) {
913 // Merge all data from the nested catalog into it's parent
914
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 nested_catalog->MergeIntoParent();
915 } else {
916 nested_catalog->RemoveFromParent();
917 }
918
919 // Delete the catalog database file from the working copy
920
2/6
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 130 times.
130 if (unlink(nested_catalog->database_path().c_str()) != 0) {
921 PANIC(kLogStderr,
922 "unable to delete the removed nested catalog database file '%s'",
923 nested_catalog->database_path().c_str());
924 }
925
926 // Remove the catalog from internal data structures
927
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 DetachCatalog(nested_catalog);
928 130 SyncUnlock();
929 130 }
930
931
932 /**
933 * Swap in a new nested catalog
934 *
935 * The old nested catalog must not have been already attached to the
936 * catalog tree. This method will not attach the new nested catalog
937 * to the catalog tree.
938 *
939 * @param mountpoint - the path of the nested catalog to be removed
940 * @param new_hash - the hash of the new nested catalog
941 * @param new_size - the size of the new nested catalog
942 */
943 301 void WritableCatalogManager::SwapNestedCatalog(const string &mountpoint,
944 const shash::Any &new_hash,
945 const uint64_t new_size) {
946
1/2
✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
301 const string nested_root_path = MakeRelativePath(mountpoint);
947
1/2
✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
301 const string parent_path = GetParentPath(nested_root_path);
948
1/2
✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
301 const PathString nested_root_ps = PathString(nested_root_path);
949
950 301 SyncLock();
951
952 // Find the immediate parent catalog
953 301 WritableCatalog *parent = NULL;
954
3/4
✓ Branch 1 taken 301 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
✓ Branch 4 taken 258 times.
301 if (!FindCatalog(parent_path, &parent)) {
955 43 SyncUnlock(); // this is needed for the unittest. otherwise they get stuck
956 43 PANIC(kLogStderr,
957 "failed to swap nested catalog '%s': could not find parent '%s'",
958 nested_root_path.c_str(), parent_path.c_str());
959 }
960
961 // Get old nested catalog counters
962
1/2
✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
258 Catalog *old_attached_catalog = parent->FindChild(nested_root_ps);
963
1/2
✓ Branch 1 taken 258 times.
✗ Branch 2 not taken.
258 Counters old_counters;
964
2/2
✓ Branch 0 taken 43 times.
✓ Branch 1 taken 215 times.
258 if (old_attached_catalog) {
965 // Old catalog was already attached (e.g. as a child catalog
966 // attached by a prior call to CreateNestedCatalog()). Ensure
967 // that it has not been modified, get counters, and detach it.
968 43 WritableCatalogList list;
969
2/4
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 43 times.
✗ Branch 4 not taken.
43 if (GetModifiedCatalogLeafsRecursively(old_attached_catalog, &list)) {
970 43 SyncUnlock();
971 43 PANIC(kLogStderr,
972 "failed to swap nested catalog '%s': already modified",
973 nested_root_path.c_str());
974 }
975 old_counters = old_attached_catalog->GetCounters();
976 DetachSubtree(old_attached_catalog);
977
978 43 } else {
979 // Old catalog was not attached. Download a freely attached
980 // version and get counters.
981
1/2
✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
215 shash::Any old_hash;
982 uint64_t old_size;
983
1/2
✓ Branch 1 taken 215 times.
✗ Branch 2 not taken.
215 const bool old_found = parent->FindNested(nested_root_ps, &old_hash,
984 &old_size);
985
2/2
✓ Branch 0 taken 43 times.
✓ Branch 1 taken 172 times.
215 if (!old_found) {
986 43 SyncUnlock();
987 43 PANIC(kLogStderr,
988 "failed to swap nested catalog '%s': not found in parent",
989 nested_root_path.c_str());
990 }
991 const UniquePtr<Catalog> old_free_catalog(
992
2/4
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 172 times.
✗ Branch 5 not taken.
172 LoadFreeCatalog(nested_root_ps, old_hash));
993
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 172 times.
172 if (!old_free_catalog.IsValid()) {
994 SyncUnlock();
995 PANIC(kLogStderr,
996 "failed to swap nested catalog '%s': failed to load old catalog",
997 nested_root_path.c_str());
998 }
999 172 old_counters = old_free_catalog->GetCounters();
1000 172 }
1001
1002 // Load freely attached new catalog
1003 const UniquePtr<Catalog> new_catalog(
1004
3/4
✓ Branch 1 taken 129 times.
✓ Branch 2 taken 43 times.
✓ Branch 4 taken 129 times.
✗ Branch 5 not taken.
172 LoadFreeCatalog(nested_root_ps, new_hash));
1005
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 129 times.
129 if (!new_catalog.IsValid()) {
1006 SyncUnlock();
1007 PANIC(kLogStderr,
1008 "failed to swap nested catalog '%s': failed to load new catalog",
1009 nested_root_path.c_str());
1010 }
1011
1012 // Get new catalog root directory entry
1013
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 DirectoryEntry dirent;
1014 129 XattrList xattrs;
1015
1/2
✓ Branch 2 taken 129 times.
✗ Branch 3 not taken.
129 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
1016
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 129 times.
129 if (!dirent_found) {
1017 SyncUnlock();
1018 PANIC(kLogStderr,
1019 "failed to swap nested catalog '%s': missing dirent in new catalog",
1020 nested_root_path.c_str());
1021 }
1022
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 129 times.
129 if (dirent.HasXattrs()) {
1023 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
1024 &xattrs);
1025 if (!xattrs_found) {
1026 SyncUnlock();
1027 PANIC(kLogStderr,
1028 "failed to swap nested catalog '%s': missing xattrs in new catalog",
1029 nested_root_path.c_str());
1030 }
1031 }
1032
1033 // Swap catalogs
1034
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 parent->RemoveNestedCatalog(nested_root_path, NULL);
1035
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 parent->InsertNestedCatalog(nested_root_path, NULL, new_hash, new_size);
1036
1037 // Update parent directory entry
1038 129 dirent.set_is_nested_catalog_mountpoint(true);
1039 129 dirent.set_is_nested_catalog_root(false);
1040
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 parent->UpdateEntry(dirent, nested_root_path);
1041
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 parent->TouchEntry(dirent, xattrs, nested_root_path);
1042
1043 // Update counters
1044
1/2
✓ Branch 3 taken 129 times.
✗ Branch 4 not taken.
129 const DeltaCounters delta = Counters::Diff(old_counters,
1045 new_catalog->GetCounters());
1046
1/2
✓ Branch 1 taken 129 times.
✗ Branch 2 not taken.
129 delta.PopulateToParent(&parent->delta_counters_);
1047
1048 129 SyncUnlock();
1049 645 }
1050
1051 /**
1052 * Install a nested catalog (catalog hierarchy) at a new, empty directory
1053 *
1054 * The mountpoint directory must not yet exist. Its parent directory, however
1055 * must exist. This method combines functionality from AddDirectory(),
1056 * CreateNestedCatalog() and SwapNestedCatalog().
1057 * The new nested catalog won't get attached.
1058 *
1059 * @param mountpoint - the path where the nested catalog should be installed
1060 * @param new_hash - the hash of the new nested catalog
1061 * @param new_size - the size of the new nested catalog
1062 */
1063 131 void WritableCatalogManager::GraftNestedCatalog(const string &mountpoint,
1064 const shash::Any &new_hash,
1065 const uint64_t new_size) {
1066
2/2
✓ Branch 1 taken 88 times.
✓ Branch 2 taken 43 times.
131 if (!TryGraftNestedCatalog(mountpoint, new_hash, new_size)) {
1067 88 PANIC(kLogStderr, "failed to graft nested catalog '%s'",
1068 mountpoint.c_str());
1069 }
1070 43 }
1071
1072 131 bool WritableCatalogManager::TryGraftNestedCatalog(const string &mountpoint,
1073 const shash::Any &new_hash,
1074 const uint64_t new_size) {
1075
1/2
✓ Branch 1 taken 131 times.
✗ Branch 2 not taken.
131 const string nested_root_path = MakeRelativePath(mountpoint);
1076
1/2
✓ Branch 1 taken 131 times.
✗ Branch 2 not taken.
131 const string parent_path = GetParentPath(nested_root_path);
1077
1/2
✓ Branch 1 taken 131 times.
✗ Branch 2 not taken.
131 const PathString nested_root_ps = PathString(nested_root_path);
1078
1079
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 131 times.
131 if (nested_root_path.empty()) {
1080 LogCvmfs(kLogCatalog, kLogStderr,
1081 "failed to graft nested catalog: empty mountpoint");
1082 return false;
1083 }
1084
1085 // Load freely attached new catalog
1086 const UniquePtr<Catalog> new_catalog(
1087
2/4
✓ Branch 1 taken 131 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 131 times.
✗ Branch 5 not taken.
131 LoadFreeCatalog(nested_root_ps, new_hash));
1088
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 131 times.
131 if (!new_catalog.IsValid()) {
1089 LogCvmfs(kLogCatalog, kLogStderr,
1090 "failed to graft nested catalog '%s': failed to load new catalog",
1091 nested_root_path.c_str());
1092 return false;
1093 }
1094
4/7
✓ Branch 2 taken 131 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 131 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 44 times.
✓ Branch 9 taken 87 times.
131 if (new_catalog->root_prefix() != nested_root_ps) {
1095
1/7
✗ Branch 2 not taken.
✓ Branch 3 taken 44 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 10 not taken.
✗ Branch 11 not taken.
44 LogCvmfs(kLogCatalog, kLogStderr,
1096 "invalid nested catalog for grafting at '%s': catalog rooted at "
1097 "'%s'",
1098 nested_root_path.c_str(),
1099
2/4
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 44 times.
✗ Branch 6 not taken.
88 new_catalog->root_prefix().ToString().c_str());
1100 44 return false;
1101 }
1102
1103 // Get new catalog root directory entry
1104
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 DirectoryEntry dirent;
1105 87 XattrList xattrs;
1106
1/2
✓ Branch 2 taken 87 times.
✗ Branch 3 not taken.
87 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
1107
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 87 times.
87 if (!dirent_found) {
1108 LogCvmfs(kLogCatalog, kLogStderr,
1109 "failed to graft nested catalog '%s': missing dirent in new "
1110 "catalog",
1111 nested_root_path.c_str());
1112 return false;
1113 }
1114
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 87 times.
87 if (!dirent.IsDirectory()) {
1115 LogCvmfs(kLogCatalog, kLogStderr,
1116 "failed to graft nested catalog '%s': root entry is not a "
1117 "directory",
1118 nested_root_path.c_str());
1119 return false;
1120 }
1121
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 87 times.
87 if (dirent.HasXattrs()) {
1122 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
1123 &xattrs);
1124 if (!xattrs_found) {
1125 LogCvmfs(kLogCatalog, kLogStderr,
1126 "failed to graft nested catalog '%s': missing xattrs in new "
1127 "catalog",
1128 nested_root_path.c_str());
1129 return false;
1130 }
1131 }
1132 // Transform the nested catalog root into a transition point to be inserted
1133 // in the parent catalog
1134 87 dirent.set_is_nested_catalog_root(false);
1135 87 dirent.set_is_nested_catalog_mountpoint(true);
1136
1137 // Add directory and nested catalog
1138
1139 87 SyncLock();
1140 WritableCatalog *parent_catalog;
1141
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 DirectoryEntry parent_entry;
1142
2/4
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 87 times.
87 if (!FindCatalog(parent_path, &parent_catalog, &parent_entry)) {
1143 SyncUnlock();
1144 LogCvmfs(kLogCatalog, kLogStderr,
1145 "catalog for directory '%s' cannot be found", parent_path.c_str());
1146 return false;
1147 }
1148
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 87 times.
87 if (!parent_entry.IsDirectory()) {
1149 SyncUnlock();
1150 LogCvmfs(kLogCatalog, kLogStderr,
1151 "parent path '%s' is not a directory", parent_path.c_str());
1152 return false;
1153 }
1154
3/4
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 44 times.
✓ Branch 4 taken 43 times.
87 if (parent_catalog->LookupPath(nested_root_ps, NULL)) {
1155 44 SyncUnlock();
1156
1/2
✓ Branch 2 taken 44 times.
✗ Branch 3 not taken.
44 LogCvmfs(kLogCatalog, kLogStderr,
1157 "invalid attempt to graft nested catalog into existing directory "
1158 "'%s'",
1159 nested_root_path.c_str());
1160 44 return false;
1161 }
1162
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 parent_catalog->AddEntry(dirent, xattrs, nested_root_path, parent_path);
1163 43 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
1164
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 parent_catalog->UpdateEntry(parent_entry, parent_path);
1165
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 if (parent_entry.IsNestedCatalogRoot()) {
1166 WritableCatalog *grand_parent_catalog = reinterpret_cast<WritableCatalog *>(
1167 43 parent_catalog->parent());
1168 43 parent_entry.set_is_nested_catalog_root(false);
1169 43 parent_entry.set_is_nested_catalog_mountpoint(true);
1170
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 grand_parent_catalog->UpdateEntry(parent_entry, parent_path);
1171 }
1172
1173
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 parent_catalog->InsertNestedCatalog(nested_root_path, NULL, new_hash,
1174 new_size);
1175
1176 // Fix-up counters
1177
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 const Counters counters;
1178
1/2
✓ Branch 3 taken 43 times.
✗ Branch 4 not taken.
43 const DeltaCounters delta = Counters::Diff(counters,
1179 new_catalog->GetCounters());
1180
1/2
✓ Branch 1 taken 43 times.
✗ Branch 2 not taken.
43 delta.PopulateToParent(&parent_catalog->delta_counters_);
1181
1182 43 SyncUnlock();
1183 43 return true;
1184 131 }
1185
1186 /**
1187 * Checks if a nested catalog starts at this path. The path must be valid.
1188 */
1189 bool WritableCatalogManager::IsTransitionPoint(const string &mountpoint) {
1190 const string path = MakeRelativePath(mountpoint);
1191
1192 SyncLock();
1193 WritableCatalog *catalog;
1194 DirectoryEntry entry;
1195 if (!FindCatalog(path, &catalog, &entry)) {
1196 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1197 path.c_str());
1198 }
1199 const bool result = entry.IsNestedCatalogRoot();
1200 SyncUnlock();
1201 return result;
1202 }
1203
1204
1205 void WritableCatalogManager::PrecalculateListings() {
1206 // TODO(jblomer): meant for micro catalogs
1207 }
1208
1209
1210 void WritableCatalogManager::SetTTL(const uint64_t new_ttl) {
1211 SyncLock();
1212 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetTTL(new_ttl);
1213 SyncUnlock();
1214 }
1215
1216
1217 bool WritableCatalogManager::SetVOMSAuthz(const std::string &voms_authz) {
1218 bool result;
1219 SyncLock();
1220 result = reinterpret_cast<WritableCatalog *>(GetRootCatalog())
1221 ->SetVOMSAuthz(voms_authz);
1222 SyncUnlock();
1223 return result;
1224 }
1225
1226
1227 875 bool WritableCatalogManager::Commit(const bool stop_for_tweaks,
1228 const uint64_t manual_revision,
1229 manifest::Manifest *manifest) {
1230 WritableCatalog *root_catalog = reinterpret_cast<WritableCatalog *>(
1231 875 GetRootCatalog());
1232
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 root_catalog->SetDirty();
1233
1234 // set root catalog revision to manually provided number if available
1235
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 875 times.
875 if (manual_revision > 0) {
1236 const uint64_t revision = root_catalog->GetRevision();
1237 if (revision >= manual_revision) {
1238 LogCvmfs(kLogCatalog, kLogStderr,
1239 "Manual revision (%" PRIu64 ") must not be "
1240 "smaller than the current root catalog's (%" PRIu64
1241 "). Skipped!",
1242 manual_revision, revision);
1243 } else {
1244 // Gets incremented by FinalizeCatalog() afterwards!
1245 root_catalog->SetRevision(manual_revision - 1);
1246 }
1247 }
1248
1249 // do the actual catalog snapshotting and upload
1250
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 CatalogInfo root_catalog_info;
1251
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 if (getenv("_CVMFS_SERIALIZED_CATALOG_PROCESSING_") == NULL)
1252
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 root_catalog_info = SnapshotCatalogs(stop_for_tweaks);
1253 else
1254 root_catalog_info = SnapshotCatalogsSerialized(stop_for_tweaks);
1255
2/4
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 875 times.
875 if (spooler_->GetNumberOfErrors() > 0) {
1256 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalogs");
1257 return false;
1258 }
1259
1260 // .cvmfspublished export
1261
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 LogCvmfs(kLogCatalog, kLogVerboseMsg, "Committing repository manifest");
1262 875 set_base_hash(root_catalog_info.content_hash);
1263
1264 875 manifest->set_catalog_hash(root_catalog_info.content_hash);
1265 875 manifest->set_catalog_size(root_catalog_info.size);
1266
2/4
✓ Branch 2 taken 875 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 875 times.
✗ Branch 6 not taken.
875 manifest->set_root_path("");
1267 875 manifest->set_ttl(root_catalog_info.ttl);
1268 875 manifest->set_revision(root_catalog_info.revision);
1269
1270 875 return true;
1271 }
1272
1273
1274 /**
1275 * Handles the snapshotting of dirty (i.e. modified) catalogs while trying to
1276 * parallelize the compression and upload as much as possible. We use a parallel
1277 * depth first post order tree traversal based on 'continuations'.
1278 *
1279 * The idea is as follows:
1280 * 1. find all leaf-catalogs (i.e. dirty catalogs with no dirty children)
1281 * --> these can be processed and uploaded immediately and independently
1282 * see WritableCatalogManager::GetModifiedCatalogLeafs()
1283 * 2. annotate non-leaf catalogs with their number of dirty children
1284 * --> a finished child will notify it's parent and decrement this number
1285 * see WritableCatalogManager::CatalogUploadCallback()
1286 * 3. if a non-leaf catalog's dirty children number reaches 0, it is scheduled
1287 * for processing as well (continuation)
1288 * --> the parallel processing walks bottom-up through the catalog tree
1289 * see WritableCatalogManager::CatalogUploadCallback()
1290 * 4. when the root catalog is reached, we notify the main thread and return
1291 * --> done through a Future<> in WritableCatalogManager::SnapshotCatalogs
1292 *
1293 * Note: The catalog finalisation (see WritableCatalogManager::FinalizeCatalog)
1294 * happens in a worker thread (i.e. the callback method) for non-leaf
1295 * catalogs.
1296 *
1297 * TODO(rmeusel): since all leaf catalogs are finalized in the main thread, we
1298 * sacrifice some potential concurrency for simplicity.
1299 */
1300 875 WritableCatalogManager::CatalogInfo WritableCatalogManager::SnapshotCatalogs(
1301 const bool stop_for_tweaks) {
1302 // prepare environment for parallel processing
1303
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 Future<CatalogInfo> root_catalog_info_future;
1304 CatalogUploadContext upload_context;
1305 875 upload_context.root_catalog_info = &root_catalog_info_future;
1306 875 upload_context.stop_for_tweaks = stop_for_tweaks;
1307
1308
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 spooler_->RegisterListener(&WritableCatalogManager::CatalogUploadCallback,
1309 this, upload_context);
1310
1311 // find dirty leaf catalogs and annotate non-leaf catalogs (dirty child count)
1312 // post-condition: the entire catalog tree is ready for concurrent processing
1313 875 WritableCatalogList leafs_to_snapshot;
1314
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 GetModifiedCatalogLeafs(&leafs_to_snapshot);
1315
1316 // finalize and schedule the catalog processing
1317 875 WritableCatalogList::const_iterator i = leafs_to_snapshot.begin();
1318 875 const WritableCatalogList::const_iterator iend = leafs_to_snapshot.end();
1319
2/2
✓ Branch 2 taken 1256 times.
✓ Branch 3 taken 875 times.
2131 for (; i != iend; ++i) {
1320
1/2
✓ Branch 2 taken 1256 times.
✗ Branch 3 not taken.
1256 FinalizeCatalog(*i, stop_for_tweaks);
1321
1/2
✓ Branch 2 taken 1256 times.
✗ Branch 3 not taken.
1256 ScheduleCatalogProcessing(*i);
1322 }
1323
1324
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 LogCvmfs(kLogCatalog, kLogVerboseMsg, "waiting for upload of catalogs");
1325
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 const CatalogInfo &root_catalog_info = root_catalog_info_future.Get();
1326
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 spooler_->WaitForUpload();
1327
1328
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 spooler_->UnregisterListeners();
1329 875 return root_catalog_info;
1330 875 }
1331
1332
1333 2081 void WritableCatalogManager::FinalizeCatalog(WritableCatalog *catalog,
1334 const bool stop_for_tweaks) {
1335 // update meta information of this catalog
1336
1/3
✓ Branch 2 taken 2081 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
2081 LogCvmfs(kLogCatalog, kLogVerboseMsg, "creating snapshot of catalog '%s'",
1337 4162 catalog->mountpoint().c_str());
1338
1339 2081 catalog->UpdateCounters();
1340 2081 catalog->UpdateLastModified();
1341 2081 catalog->IncrementRevision();
1342
1343 // update the previous catalog revision pointer
1344
2/2
✓ Branch 1 taken 875 times.
✓ Branch 2 taken 1206 times.
2081 if (catalog->IsRoot()) {
1345
1/4
✓ Branch 2 taken 875 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
875 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1346 "setting '%s' as previous revision "
1347 "for root catalog",
1348 1750 base_hash().ToStringWithSuffix().c_str());
1349 875 catalog->SetPreviousRevision(base_hash());
1350 } else {
1351 // Multiple catalogs might query the parent concurrently
1352 1206 SyncLock();
1353
1/2
✓ Branch 1 taken 1206 times.
✗ Branch 2 not taken.
1206 shash::Any hash_previous;
1354 uint64_t size_previous;
1355
1/2
✓ Branch 2 taken 1206 times.
✗ Branch 3 not taken.
2412 const bool retval = catalog->parent()->FindNested(
1356
1/2
✓ Branch 1 taken 1206 times.
✗ Branch 2 not taken.
2412 catalog->mountpoint(), &hash_previous, &size_previous);
1357
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1206 times.
1206 assert(retval);
1358 1206 SyncUnlock();
1359
1360
1/8
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1206 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
2412 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1361 "found '%s' as previous revision "
1362 "for nested catalog '%s'",
1363
1/2
✓ Branch 1 taken 1206 times.
✗ Branch 2 not taken.
2412 hash_previous.ToStringWithSuffix().c_str(),
1364
1/2
✓ Branch 1 taken 1206 times.
✗ Branch 2 not taken.
2412 catalog->mountpoint().c_str());
1365
1/2
✓ Branch 1 taken 1206 times.
✗ Branch 2 not taken.
1206 catalog->SetPreviousRevision(hash_previous);
1366 }
1367 2081 catalog->Commit();
1368
1369 // check if catalog has too many entries
1370 const uint64_t catalog_limit = uint64_t(1000)
1371
2/2
✓ Branch 1 taken 875 times.
✓ Branch 2 taken 1206 times.
2081 * uint64_t((catalog->IsRoot()
1372 875 ? root_kcatalog_limit_
1373 1206 : nested_kcatalog_limit_));
1374 2081 if ((catalog_limit > 0)
1375
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 2081 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 2081 times.
2081 && (catalog->GetCounters().GetSelfEntries() > catalog_limit)) {
1376 LogCvmfs(kLogCatalog, kLogStderr,
1377 "%s: catalog at %s has more than %lu entries (%lu). "
1378 "Large catalogs stress the CernVM-FS transport infrastructure. "
1379 "Please split it into nested catalogs or increase the limit.",
1380 enforce_limits_ ? "FATAL" : "WARNING",
1381 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1382 catalog_limit, catalog->GetCounters().GetSelfEntries());
1383 if (enforce_limits_)
1384 PANIC(kLogStderr, "catalog at %s has more than %u entries (%u). ",
1385 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1386 catalog_limit, catalog->GetCounters().GetSelfEntries());
1387 }
1388
1389 // allow for manual adjustments in the catalog
1390
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2081 times.
2081 if (stop_for_tweaks) {
1391 LogCvmfs(kLogCatalog, kLogStdout,
1392 "Allowing for tweaks in %s at %s "
1393 "(hit return to continue)",
1394 catalog->database_path().c_str(), catalog->mountpoint().c_str());
1395 const int read_char = getchar();
1396 assert(read_char != EOF);
1397 }
1398
1399 // compaction of bloated catalogs (usually after high database churn)
1400 2081 catalog->VacuumDatabaseIfNecessary();
1401 2081 }
1402
1403
1404 2081 void WritableCatalogManager::ScheduleCatalogProcessing(
1405 WritableCatalog *catalog) {
1406 {
1407 2081 const MutexLockGuard guard(catalog_processing_lock_);
1408 // register catalog object for WritableCatalogManager::CatalogUploadCallback
1409
2/4
✓ Branch 1 taken 2081 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2081 times.
✗ Branch 5 not taken.
2081 catalog_processing_map_[catalog->database_path()] = catalog;
1410 2081 }
1411
1/2
✓ Branch 2 taken 2081 times.
✗ Branch 3 not taken.
2081 spooler_->ProcessCatalog(catalog->database_path());
1412 2081 }
1413
1414 /**
1415 * Copy catalog to local cache.server
1416 * Must be an atomic write into the cache_dir
1417 * As such: create a temporary copy in cache_dir/txn and then do a
1418 * `rename` (which is atomic) to the actual cache path
1419 *
1420 * @returns true on success, otherwise false
1421 */
1422 bool WritableCatalogManager::CopyCatalogToLocalCache(
1423 const upload::SpoolerResult &result) {
1424 std::string tmp_catalog_path;
1425 const std::string cache_catalog_path = dir_cache_ + "/"
1426 + result.content_hash
1427 .MakePathWithoutSuffix();
1428 FILE *fcatalog = CreateTempFile(dir_cache_ + "/txn/catalog", 0666, "w",
1429 &tmp_catalog_path);
1430 if (!fcatalog) {
1431 PANIC(kLogDebug | kLogStderr,
1432 "Creating file for temporary catalog failed: %s",
1433 tmp_catalog_path.c_str());
1434 }
1435 CopyPath2File(result.local_path.c_str(), fcatalog);
1436 (void)fclose(fcatalog);
1437
1438 if (rename(tmp_catalog_path.c_str(), cache_catalog_path.c_str()) != 0) {
1439 PANIC(kLogDebug | kLogStderr, "Failed to copy catalog from %s to cache %s",
1440 result.local_path.c_str(), cache_catalog_path.c_str());
1441 }
1442 return true;
1443 }
1444
1445 2081 void WritableCatalogManager::CatalogUploadCallback(
1446 const upload::SpoolerResult &result,
1447 const CatalogUploadContext catalog_upload_context) {
1448
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2081 times.
2081 if (result.return_code != 0) {
1449 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1450 result.local_path.c_str(), result.return_code);
1451 }
1452
1453 // retrieve the catalog object based on the callback information
1454 // see WritableCatalogManager::ScheduleCatalogProcessing()
1455 2081 WritableCatalog *catalog = NULL;
1456 {
1457 2081 const MutexLockGuard guard(catalog_processing_lock_);
1458 const std::map<std::string, WritableCatalog *>::iterator
1459
1/2
✓ Branch 1 taken 2081 times.
✗ Branch 2 not taken.
2081 c = catalog_processing_map_.find(result.local_path);
1460
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 2081 times.
2081 assert(c != catalog_processing_map_.end());
1461 2081 catalog = c->second;
1462 2081 }
1463
1464 2081 const uint64_t catalog_size = GetFileSize(result.local_path);
1465
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2081 times.
2081 assert(catalog_size > 0);
1466
1467
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2081 times.
2081 if (UseLocalCache()) {
1468 CopyCatalogToLocalCache(result);
1469 }
1470
1471 2081 SyncLock();
1472
2/2
✓ Branch 1 taken 1206 times.
✓ Branch 2 taken 875 times.
2081 if (catalog->HasParent()) {
1473 // finalized nested catalogs will update their parent's pointer and schedule
1474 // them for processing (continuation) if the 'dirty children count' == 0
1475 1206 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1476 1206 WritableCatalog *parent = catalog->GetWritableParent();
1477
1478
2/4
✓ Branch 1 taken 1206 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1206 times.
✗ Branch 5 not taken.
1206 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1479 1206 result.content_hash,
1480 catalog_size,
1481 1206 catalog->delta_counters_);
1482 1206 catalog->delta_counters_.SetZero();
1483
1484 const int remaining_dirty_children = catalog->GetWritableParent()
1485 1206 ->DecrementDirtyChildren();
1486
1487 1206 SyncUnlock();
1488
1489 // continuation of the dirty catalog tree traversal
1490 // see WritableCatalogManager::SnapshotCatalogs()
1491
2/2
✓ Branch 0 taken 825 times.
✓ Branch 1 taken 381 times.
1206 if (remaining_dirty_children == 0) {
1492 825 FinalizeCatalog(parent, catalog_upload_context.stop_for_tweaks);
1493 825 ScheduleCatalogProcessing(parent);
1494 }
1495
1496
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 } else if (catalog->IsRoot()) {
1497 // once the root catalog is reached, we are done with processing and report
1498 // back to the main via a Future<> and provide the necessary information
1499
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 CatalogInfo root_catalog_info;
1500 875 root_catalog_info.size = catalog_size;
1501
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 root_catalog_info.ttl = catalog->GetTTL();
1502 875 root_catalog_info.content_hash = result.content_hash;
1503
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 root_catalog_info.revision = catalog->GetRevision();
1504
1/2
✓ Branch 1 taken 875 times.
✗ Branch 2 not taken.
875 catalog_upload_context.root_catalog_info->Set(root_catalog_info);
1505 875 SyncUnlock();
1506 } else {
1507 PANIC(kLogStderr, "inconsistent state detected");
1508 }
1509 2081 }
1510
1511
1512 /**
1513 * Finds dirty catalogs that can be snapshot right away and annotates all the
1514 * other catalogs with their number of dirty descendants.
1515 * Note that there is a convenience wrapper to start the recursion:
1516 * WritableCatalogManager::GetModifiedCatalogLeafs()
1517 *
1518 * @param catalog the catalog for this recursion step
1519 * @param result the result list to be appended to
1520 * @return true if 'catalog' is dirty
1521 */
1522 2124 bool WritableCatalogManager::GetModifiedCatalogLeafsRecursively(
1523 Catalog *catalog, WritableCatalogList *result) const {
1524 2124 WritableCatalog *wr_catalog = static_cast<WritableCatalog *>(catalog);
1525
1526 // Look for dirty catalogs in the descendants of *catalog
1527 2124 int dirty_children = 0;
1528
1/2
✓ Branch 1 taken 2124 times.
✗ Branch 2 not taken.
2124 CatalogList children = wr_catalog->GetChildren();
1529 2124 CatalogList::const_iterator i = children.begin();
1530 2124 const CatalogList::const_iterator iend = children.end();
1531
2/2
✓ Branch 2 taken 1206 times.
✓ Branch 3 taken 2124 times.
3330 for (; i != iend; ++i) {
1532
2/4
✓ Branch 2 taken 1206 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1206 times.
✗ Branch 5 not taken.
1206 if (GetModifiedCatalogLeafsRecursively(*i, result)) {
1533 1206 ++dirty_children;
1534 }
1535 }
1536
1537 // a catalog is dirty if itself or one of its children has changed
1538 // a leaf catalog doesn't have any dirty children
1539 2124 wr_catalog->set_dirty_children(dirty_children);
1540
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 2124 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
2124 const bool is_dirty = wr_catalog->IsDirty() || dirty_children > 0;
1541 2124 const bool is_leaf = dirty_children == 0;
1542
3/4
✓ Branch 0 taken 2124 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1299 times.
✓ Branch 3 taken 825 times.
2124 if (is_dirty && is_leaf) {
1543
1/2
✓ Branch 1 taken 1299 times.
✗ Branch 2 not taken.
1299 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1544 }
1545
1546 2124 return is_dirty;
1547 2124 }
1548
1549
1550 void WritableCatalogManager::DoBalance() {
1551 CatalogList catalog_list = GetCatalogs();
1552 reverse(catalog_list.begin(), catalog_list.end());
1553 for (unsigned i = 0; i < catalog_list.size(); ++i) {
1554 FixWeight(static_cast<WritableCatalog *>(catalog_list[i]));
1555 }
1556 }
1557
1558 void WritableCatalogManager::FixWeight(WritableCatalog *catalog) {
1559 // firstly check underflow because they can provoke overflows
1560 if (catalog->GetNumEntries() < min_weight_ && !catalog->IsRoot()
1561 && catalog->IsAutogenerated()) {
1562 LogCvmfs(kLogCatalog, kLogStdout,
1563 "Deleting an autogenerated catalog in '%s'",
1564 catalog->mountpoint().c_str());
1565 // Remove the .cvmfscatalog and .cvmfsautocatalog files first
1566 const string path = catalog->mountpoint().ToString();
1567 catalog->RemoveEntry(path + "/.cvmfscatalog");
1568 catalog->RemoveEntry(path + "/.cvmfsautocatalog");
1569 // Remove the actual catalog
1570 const string catalog_path = catalog->mountpoint().ToString().substr(1);
1571 RemoveNestedCatalog(catalog_path);
1572 } else if (catalog->GetNumEntries() > max_weight_) {
1573 CatalogBalancer<WritableCatalogManager> catalog_balancer(this);
1574 catalog_balancer.Balance(catalog);
1575 }
1576 }
1577
1578
1579 //****************************************************************************
1580 // Workaround -- Serialized Catalog Committing
1581
1582 int WritableCatalogManager::GetModifiedCatalogsRecursively(
1583 const Catalog *catalog, WritableCatalogList *result) const {
1584 // A catalog must be snapshot, if itself or one of it's descendants is dirty.
1585 // So we traverse the catalog tree recursively and look for dirty catalogs
1586 // on the way.
1587 const WritableCatalog *wr_catalog = static_cast<const WritableCatalog *>(
1588 catalog);
1589 // This variable will contain the number of dirty catalogs in the sub tree
1590 // with *catalog as it's root.
1591 int dirty_catalogs = (wr_catalog->IsDirty()) ? 1 : 0;
1592
1593 // Look for dirty catalogs in the descendants of *catalog
1594 CatalogList children = wr_catalog->GetChildren();
1595 for (CatalogList::const_iterator i = children.begin(), iEnd = children.end();
1596 i != iEnd; ++i) {
1597 dirty_catalogs += GetModifiedCatalogsRecursively(*i, result);
1598 }
1599
1600 // If we found a dirty catalog in the checked sub tree, the root (*catalog)
1601 // must be snapshot and ends up in the result list
1602 if (dirty_catalogs > 0)
1603 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1604
1605 // tell the upper layer about number of catalogs
1606 return dirty_catalogs;
1607 }
1608
1609
1610 void WritableCatalogManager::CatalogUploadSerializedCallback(
1611 const upload::SpoolerResult &result, const CatalogUploadContext unused) {
1612 if (result.return_code != 0) {
1613 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1614 result.local_path.c_str(), result.return_code);
1615 }
1616
1617 if (UseLocalCache()) {
1618 CopyCatalogToLocalCache(result);
1619 }
1620
1621 unlink(result.local_path.c_str());
1622 }
1623
1624
1625 WritableCatalogManager::CatalogInfo
1626 WritableCatalogManager::SnapshotCatalogsSerialized(const bool stop_for_tweaks) {
1627 LogCvmfs(kLogCvmfs, kLogStdout, "Serialized committing of file catalogs...");
1628 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetDirty();
1629 WritableCatalogList catalogs_to_snapshot;
1630 GetModifiedCatalogs(&catalogs_to_snapshot);
1631 CatalogUploadContext unused;
1632 unused.root_catalog_info = NULL;
1633 unused.stop_for_tweaks = false;
1634 spooler_->RegisterListener(
1635 &WritableCatalogManager::CatalogUploadSerializedCallback, this, unused);
1636
1637 CatalogInfo root_catalog_info;
1638 WritableCatalogList::const_iterator i = catalogs_to_snapshot.begin();
1639 const WritableCatalogList::const_iterator iend = catalogs_to_snapshot.end();
1640 for (; i != iend; ++i) {
1641 FinalizeCatalog(*i, stop_for_tweaks);
1642
1643 // Compress and upload catalog
1644 shash::Any hash_catalog(spooler_->GetHashAlgorithm(),
1645 shash::kSuffixCatalog);
1646 if (!zlib::CompressPath2Null((*i)->database_path(), &hash_catalog)) {
1647 PANIC(kLogStderr, "could not compress catalog %s",
1648 (*i)->mountpoint().ToString().c_str());
1649 }
1650
1651 const int64_t catalog_size = GetFileSize((*i)->database_path());
1652 assert(catalog_size > 0);
1653
1654 if ((*i)->HasParent()) {
1655 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1656 WritableCatalog *parent = (*i)->GetWritableParent();
1657 parent->UpdateNestedCatalog((*i)->mountpoint().ToString(), hash_catalog,
1658 catalog_size, (*i)->delta_counters_);
1659 (*i)->delta_counters_.SetZero();
1660 } else if ((*i)->IsRoot()) {
1661 root_catalog_info.size = catalog_size;
1662 root_catalog_info.ttl = (*i)->GetTTL();
1663 root_catalog_info.content_hash = hash_catalog;
1664 root_catalog_info.revision = (*i)->GetRevision();
1665 } else {
1666 PANIC(kLogStderr, "inconsistent state detected");
1667 }
1668
1669 spooler_->ProcessCatalog((*i)->database_path());
1670 }
1671 spooler_->WaitForUpload();
1672
1673 spooler_->UnregisterListeners();
1674 return root_catalog_info;
1675 }
1676
1677 void WritableCatalogManager::SetupSingleCatalogUploadCallback() {
1678 spooler_->RegisterListener(
1679 &WritableCatalogManager::SingleCatalogUploadCallback, this);
1680 }
1681
1682 void WritableCatalogManager::RemoveSingleCatalogUploadCallback() {
1683 spooler_->WaitForUpload(); // wait for all outstanding jobs to finish before
1684 // tearing it down
1685 spooler_->UnregisterListeners();
1686 pending_catalogs_ =
1687 {}; // whatever we couldn't process, leave it to the Commit
1688 }
1689
1690 void WritableCatalogManager::AddCatalogToQueue(const std::string &path) {
1691 SyncLock();
1692 WritableCatalog *catalog = NULL;
1693 bool const retval = FindCatalog(MakeRelativePath(path), &catalog, NULL);
1694 assert(retval);
1695 assert(catalog);
1696 catalog->SetDirty(); // ensure it's dirty so its parent will wait for it
1697 SyncUnlock();
1698 pending_catalogs_.push_back(catalog);
1699 }
1700
1701 void WritableCatalogManager::ScheduleReadyCatalogs() {
1702 // best effort to schedule as many catalogs for upload as possible
1703 for (auto it = pending_catalogs_.begin(); it != pending_catalogs_.end();) {
1704 if ((*it)->dirty_children() == 0) {
1705 FinalizeCatalog(*it, false /* stop_for_tweaks */);
1706 ScheduleCatalogProcessing(*it);
1707 LogCvmfs(kLogCatalog, kLogVerboseMsg, "scheduled %s for processing",
1708 (*it)->mountpoint().c_str());
1709 it = pending_catalogs_.erase(it);
1710 } else {
1711 ++it;
1712 }
1713 }
1714 }
1715
1716 // Callback for uploading a single catalog, similar to CatalogUploadCallback.
1717 // The main difference is that this callback would not trigger processing of the
1718 // parent
1719 void WritableCatalogManager::SingleCatalogUploadCallback(
1720 const upload::SpoolerResult &result) {
1721 if (result.return_code != 0) {
1722 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1723 result.local_path.c_str(), result.return_code);
1724 }
1725
1726 // retrieve the catalog object based on the callback information
1727 // see WritableCatalogManager::ScheduleCatalogProcessing()
1728 WritableCatalog *catalog = NULL;
1729 {
1730 MutexLockGuard const guard(catalog_processing_lock_);
1731 std::map<std::string, WritableCatalog *>::iterator const
1732 c = catalog_processing_map_.find(result.local_path);
1733 assert(c != catalog_processing_map_.end());
1734 catalog = c->second;
1735 }
1736
1737 uint64_t const catalog_size = GetFileSize(result.local_path);
1738 assert(catalog_size > 0);
1739
1740 SyncLock();
1741 if (catalog->HasParent()) {
1742 // finalized nested catalogs will update their parent's pointer
1743 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1744 WritableCatalog *parent = catalog->GetWritableParent();
1745
1746 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1747 result.content_hash,
1748 catalog_size,
1749 catalog->delta_counters_);
1750 parent->DecrementDirtyChildren();
1751 catalog->delta_counters_.SetZero();
1752 }
1753 // JUMP: detach the catalog after uploading to free sqlite related resources
1754 DetachCatalog(catalog);
1755 SyncUnlock();
1756 }
1757 // using the given list of dirs, fetch all relevant catalogs
1758 void WritableCatalogManager::LoadCatalogs(
1759 const std::string &base_path, const std::unordered_set<std::string> &dirs) {
1760 // mount everything up to "base_path" first (this would be our lease_path
1761 // typically)
1762 Catalog *base_catalog;
1763 MountSubtree(PathString(base_path), NULL /* entry_point */,
1764 true /* is_listable */, &base_catalog);
1765
1766 // start up the downloader
1767 CatalogDownloadContext context;
1768 context.dirs = &dirs;
1769 catalog_download_pipeline_ = new CatalogDownloadPipeline(
1770 static_cast<SimpleCatalogManager *>(this));
1771 catalog_download_pipeline_->RegisterListener(
1772 &WritableCatalogManager::CatalogDownloadCallback, this, context);
1773 catalog_download_pipeline_->Spawn();
1774
1775 Catalog::NestedCatalogList nested_catalogs = base_catalog
1776 ->ListNestedCatalogs();
1777 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1778 // schedule relevant child nested catalogs for download
1779 std::string const mountpoint = it->mountpoint.ToString();
1780 if (dirs.find(mountpoint) != dirs.end()) {
1781 Catalog *catalog = CreateCatalog(it->mountpoint, it->hash,
1782 NULL /* parent */);
1783 {
1784 MutexLockGuard const guard(catalog_download_lock_);
1785 catalog_download_map_.insert(
1786 std::make_pair(it->hash.ToString(), catalog));
1787 }
1788 catalog_download_pipeline_->Process(it->hash);
1789 }
1790 }
1791
1792 catalog_download_pipeline_->WaitFor();
1793 delete catalog_download_pipeline_; // terminate all the threads
1794 }
1795
1796 bool WritableCatalogManager::LookupDirEntry(const string &path,
1797 const LookupOptions options,
1798 DirectoryEntry *dirent) {
1799 SyncLock();
1800 bool const exists = LookupPath(path, options, dirent);
1801 SyncUnlock();
1802 return exists;
1803 }
1804
1805 void WritableCatalogManager::CatalogHashSerializedCallback(
1806 const CompressHashResult &result) {
1807 MutexLockGuard const guard(catalog_hash_lock_);
1808 catalog_hash_map_[result.path] = result.hash;
1809 }
1810
1811 void WritableCatalogManager::CatalogDownloadCallback(
1812 const CatalogDownloadResult &result, CatalogDownloadContext context) {
1813 Catalog *downloaded_catalog;
1814 {
1815 MutexLockGuard const guard(catalog_download_lock_);
1816 auto it = catalog_download_map_.find(result.hash);
1817 assert(it != catalog_download_map_.end());
1818 downloaded_catalog = it->second;
1819 }
1820
1821 if (!downloaded_catalog->OpenDatabase(result.db_path)) {
1822 LogCvmfs(kLogCvmfs, kLogDebug, "failed to initialize catalog");
1823 delete downloaded_catalog;
1824 return;
1825 }
1826
1827 Catalog::NestedCatalogList nested_catalogs = downloaded_catalog
1828 ->ListNestedCatalogs();
1829 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1830 // schedule relevant child nested catalogs for download
1831 if (context.dirs->find(it->mountpoint.ToString()) != context.dirs->end()) {
1832 Catalog *child_catalog = CreateCatalog(it->mountpoint, it->hash,
1833 NULL /* parent */);
1834 {
1835 MutexLockGuard const guard(catalog_download_lock_);
1836 catalog_download_map_.insert(
1837 std::make_pair(it->hash.ToString(), child_catalog));
1838 }
1839 catalog_download_pipeline_->Process(it->hash);
1840 }
1841 }
1842 delete downloaded_catalog;
1843 }
1844
1845
1846 } // namespace catalog
1847