GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/catalog_mgr_rw.cc
Date: 2025-06-22 02:36:02
Exec Total Coverage
Lines: 461 822 56.1%
Branches: 356 1218 29.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM file system.
3 */
4
5 #define __STDC_FORMAT_MACROS
6
7 #include "catalog_mgr_rw.h"
8
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <cassert>
13 #include <cstdio>
14 #include <cstdlib>
15 #include <string>
16
17 #include "catalog_balancer.h"
18 #include "catalog_rw.h"
19 #include "manifest.h"
20 #include "statistics.h"
21 #include "upload.h"
22 #include "util/exception.h"
23 #include "util/logging.h"
24 #include "util/posix.h"
25 #include "util/smalloc.h"
26
27 using namespace std; // NOLINT
28
29 namespace catalog {
30
31 1252 WritableCatalogManager::WritableCatalogManager(
32 const shash::Any &base_hash,
33 const std::string &stratum0,
34 const string &dir_temp,
35 upload::Spooler *spooler,
36 download::DownloadManager *download_manager,
37 bool enforce_limits,
38 const unsigned nested_kcatalog_limit,
39 const unsigned root_kcatalog_limit,
40 const unsigned file_mbyte_limit,
41 perf::Statistics *statistics,
42 bool is_balanceable,
43 unsigned max_weight,
44 unsigned min_weight,
45 1252 const std::string &dir_cache)
46 : SimpleCatalogManager(base_hash, stratum0, dir_temp, download_manager,
47 statistics, false, dir_cache,
48 true /* copy to tmpdir */)
49 1252 , spooler_(spooler)
50 1252 , enforce_limits_(enforce_limits)
51 1252 , nested_kcatalog_limit_(nested_kcatalog_limit)
52 1252 , root_kcatalog_limit_(root_kcatalog_limit)
53 1252 , file_mbyte_limit_(file_mbyte_limit)
54 1252 , is_balanceable_(is_balanceable)
55 1252 , max_weight_(max_weight)
56 1252 , min_weight_(min_weight)
57 1252 , balance_weight_(max_weight / 2) {
58 1252 sync_lock_ = reinterpret_cast<pthread_mutex_t *>(
59 1252 smalloc(sizeof(pthread_mutex_t)));
60 1252 int retval = pthread_mutex_init(sync_lock_, NULL);
61
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1252 times.
1252 assert(retval == 0);
62 1252 catalog_processing_lock_ = reinterpret_cast<pthread_mutex_t *>(
63 1252 smalloc(sizeof(pthread_mutex_t)));
64 1252 retval = pthread_mutex_init(catalog_processing_lock_, NULL);
65
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1252 times.
1252 assert(retval == 0);
66 1252 }
67
68
69 5004 WritableCatalogManager::~WritableCatalogManager() {
70 2502 pthread_mutex_destroy(sync_lock_);
71 2502 free(sync_lock_);
72 2502 pthread_mutex_destroy(catalog_processing_lock_);
73 2502 free(catalog_processing_lock_);
74 5004 }
75
76
77 /**
78 * This method is virtual in AbstractCatalogManager. It returns a new catalog
79 * structure in the form the different CatalogManagers need it.
80 * In this case it returns a stub for a WritableCatalog.
81 * @param mountpoint the mount point of the catalog stub to create
82 * @param catalog_hash the content hash of the catalog to create
83 * @param parent_catalog the parent of the catalog stub to create
84 * @return a pointer to the catalog stub structure created
85 */
86 2461 Catalog *WritableCatalogManager::CreateCatalog(const PathString &mountpoint,
87 const shash::Any &catalog_hash,
88 Catalog *parent_catalog) {
89 return new WritableCatalog(
90
2/4
✓ Branch 2 taken 2461 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 2461 times.
✗ Branch 6 not taken.
2461 mountpoint.ToString(), catalog_hash, parent_catalog);
91 }
92
93
94 2461 void WritableCatalogManager::ActivateCatalog(Catalog *catalog) {
95 2461 catalog->TakeDatabaseFileOwnership();
96 2461 }
97
98
99 /**
100 * This method is invoked if we create a completely new repository.
101 * The new root catalog will already contain a root entry.
102 * It is uploaded by a Forklift to the upstream storage.
103 * @return true on success, false otherwise
104 */
105 961 manifest::Manifest *WritableCatalogManager::CreateRepository(
106 const string &dir_temp,
107 const bool volatile_content,
108 const std::string &voms_authz,
109 upload::Spooler *spooler) {
110 // Create a new root catalog at file_path
111
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 const string file_path = dir_temp + "/new_root_catalog";
112
113 961 const shash::Algorithms hash_algorithm = spooler->GetHashAlgorithm();
114
115 // A newly created catalog always needs a root entry
116 // we create and configure this here
117
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 DirectoryEntry root_entry;
118 961 root_entry.inode_ = DirectoryEntry::kInvalidInode;
119 961 root_entry.mode_ = 16877;
120 961 root_entry.size_ = 4096;
121 961 root_entry.mtime_ = time(NULL);
122 961 root_entry.uid_ = getuid();
123 961 root_entry.gid_ = getgid();
124
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 root_entry.checksum_ = shash::Any(hash_algorithm);
125 961 root_entry.linkcount_ = 2;
126
1/2
✓ Branch 2 taken 961 times.
✗ Branch 3 not taken.
961 const string root_path = "";
127
128 // Create the database schema and the initial root entry
129 {
130 const UniquePtr<CatalogDatabase> new_clg_db(
131
2/4
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 961 times.
✗ Branch 5 not taken.
961 CatalogDatabase::Create(file_path));
132 961 if (!new_clg_db.IsValid()
133
4/8
✓ Branch 0 taken 961 times.
✗ Branch 1 not taken.
✓ Branch 4 taken 961 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 961 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 961 times.
961 || !new_clg_db->InsertInitialValues(
134 root_path, volatile_content, voms_authz, root_entry)) {
135 LogCvmfs(kLogCatalog, kLogStderr, "creation of catalog '%s' failed",
136 file_path.c_str());
137 return NULL;
138 }
139
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 }
140
141 // Compress root catalog;
142
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 const int64_t catalog_size = GetFileSize(file_path);
143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 961 times.
961 if (catalog_size < 0) {
144 unlink(file_path.c_str());
145 return NULL;
146 }
147
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 const string file_path_compressed = file_path + ".compressed";
148
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 shash::Any hash_catalog(hash_algorithm, shash::kSuffixCatalog);
149 const bool retval =
150
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 zlib::CompressPath2Path(file_path, file_path_compressed, &hash_catalog);
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 961 times.
961 if (!retval) {
152 LogCvmfs(kLogCatalog, kLogStderr, "compression of catalog '%s' failed",
153 file_path.c_str());
154 unlink(file_path.c_str());
155 return NULL;
156 }
157 961 unlink(file_path.c_str());
158
159 // Create manifest
160
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 const string manifest_path = dir_temp + "/manifest";
161 manifest::Manifest *manifest = new manifest::Manifest(hash_catalog,
162
3/6
✓ Branch 2 taken 961 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 961 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 961 times.
✗ Branch 9 not taken.
961 catalog_size, "");
163
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 961 times.
961 if (!voms_authz.empty()) {
164 manifest->set_has_alt_catalog_path(true);
165 }
166
167 // Upload catalog
168
3/6
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 961 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 961 times.
✗ Branch 8 not taken.
961 spooler->Upload(file_path_compressed, "data/" + hash_catalog.MakePath());
169
1/2
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
961 spooler->WaitForUpload();
170 961 unlink(file_path_compressed.c_str());
171
2/4
✓ Branch 1 taken 961 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 961 times.
961 if (spooler->GetNumberOfErrors() > 0) {
172 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalog %s",
173 file_path_compressed.c_str());
174 delete manifest;
175 return NULL;
176 }
177
178 961 return manifest;
179 961 }
180
181
182 /**
183 * Retrieve the catalog containing the given path.
184 * Other than AbstractCatalogManager::FindCatalog() this mounts nested
185 * catalogs if necessary and returns WritableCatalog objects.
186 * Furthermore it optionally returns the looked-up DirectoryEntry.
187 *
188 * @param path the path to look for
189 * @param result the retrieved catalog (as a pointer)
190 * @param dirent is set to looked up DirectoryEntry for 'path' if non-NULL
191 * @return true if catalog was found
192 */
193 11233 bool WritableCatalogManager::FindCatalog(const string &path,
194 WritableCatalog **result,
195 DirectoryEntry *dirent) {
196
1/2
✓ Branch 1 taken 11233 times.
✗ Branch 2 not taken.
11233 const PathString ps_path(path);
197
198
1/2
✓ Branch 1 taken 11233 times.
✗ Branch 2 not taken.
11233 Catalog *best_fit = AbstractCatalogManager<Catalog>::FindCatalog(ps_path);
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 11233 times.
11233 assert(best_fit != NULL);
200 11233 Catalog *catalog = NULL;
201 const bool retval =
202
1/2
✓ Branch 1 taken 11233 times.
✗ Branch 2 not taken.
11233 MountSubtree(ps_path, best_fit, true /* is_listable */, &catalog);
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 11233 times.
11233 if (!retval)
204 return false;
205
206
1/2
✓ Branch 1 taken 11233 times.
✗ Branch 2 not taken.
11233 catalog::DirectoryEntry dummy;
207
2/2
✓ Branch 0 taken 5887 times.
✓ Branch 1 taken 5346 times.
11233 if (NULL == dirent) {
208 5887 dirent = &dummy;
209 }
210
1/2
✓ Branch 1 taken 11233 times.
✗ Branch 2 not taken.
11233 const bool found = catalog->LookupPath(ps_path, dirent);
211
6/8
✓ Branch 0 taken 11218 times.
✓ Branch 1 taken 15 times.
✓ Branch 3 taken 11218 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 11218 times.
✓ Branch 7 taken 15 times.
✓ Branch 8 taken 11218 times.
11233 if (!found || !catalog->IsWritable())
212 15 return false;
213
214 11218 *result = static_cast<WritableCatalog *>(catalog);
215 11218 return true;
216 11233 }
217
218
219 WritableCatalog *WritableCatalogManager::GetHostingCatalog(
220 const std::string &path) {
221 WritableCatalog *result = NULL;
222 const bool retval = FindCatalog(MakeRelativePath(path), &result, NULL);
223 if (!retval)
224 return NULL;
225 return result;
226 }
227
228
229 /**
230 * Remove the given file from the catalogs.
231 * @param file_path the full path to the file to be removed
232 * @return true on success, false otherwise
233 */
234 162 void WritableCatalogManager::RemoveFile(const std::string &path) {
235
1/2
✓ Branch 1 taken 162 times.
✗ Branch 2 not taken.
162 const string file_path = MakeRelativePath(path);
236
1/2
✓ Branch 1 taken 162 times.
✗ Branch 2 not taken.
162 const string parent_path = GetParentPath(file_path);
237
238 162 SyncLock();
239 WritableCatalog *catalog;
240
2/4
✓ Branch 1 taken 162 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 162 times.
162 if (!FindCatalog(parent_path, &catalog)) {
241 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
242 file_path.c_str());
243 }
244
245
1/2
✓ Branch 1 taken 162 times.
✗ Branch 2 not taken.
162 catalog->RemoveEntry(file_path);
246 162 SyncUnlock();
247 162 }
248
249
250 /**
251 * Remove the given directory from the catalogs.
252 * @param directory_path the full path to the directory to be removed
253 * @return true on success, false otherwise
254 */
255 143 void WritableCatalogManager::RemoveDirectory(const std::string &path) {
256
1/2
✓ Branch 1 taken 143 times.
✗ Branch 2 not taken.
143 const string directory_path = MakeRelativePath(path);
257
1/2
✓ Branch 1 taken 143 times.
✗ Branch 2 not taken.
143 const string parent_path = GetParentPath(directory_path);
258
259 143 SyncLock();
260 WritableCatalog *catalog;
261
1/2
✓ Branch 1 taken 143 times.
✗ Branch 2 not taken.
143 DirectoryEntry parent_entry;
262
2/4
✓ Branch 1 taken 143 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 143 times.
143 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
263 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
264 directory_path.c_str());
265 }
266
267 143 parent_entry.set_linkcount(parent_entry.linkcount() - 1);
268
269
1/2
✓ Branch 1 taken 143 times.
✗ Branch 2 not taken.
143 catalog->RemoveEntry(directory_path);
270
1/2
✓ Branch 1 taken 143 times.
✗ Branch 2 not taken.
143 catalog->UpdateEntry(parent_entry, parent_path);
271
2/2
✓ Branch 1 taken 30 times.
✓ Branch 2 taken 113 times.
143 if (parent_entry.IsNestedCatalogRoot()) {
272
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
273 parent_path.c_str());
274 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
275 30 catalog->parent());
276 30 parent_entry.set_is_nested_catalog_mountpoint(true);
277 30 parent_entry.set_is_nested_catalog_root(false);
278
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 parent_catalog->UpdateEntry(parent_entry, parent_path);
279 }
280 143 SyncUnlock();
281 143 }
282
283 /**
284 * Clone the file called `source` changing its name into `destination`, the
285 * source file is keep intact.
286 * @params destination, the name of the new file, complete path
287 * @params source, the name of the file to clone, which must be already in the
288 * repository
289 * @return void
290 */
291 void WritableCatalogManager::Clone(const std::string destination,
292 const std::string source) {
293 const std::string relative_source = MakeRelativePath(source);
294
295 DirectoryEntry source_dirent;
296 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
297 PANIC(kLogStderr, "catalog for file '%s' cannot be found, aborting",
298 source.c_str());
299 }
300 if (source_dirent.IsDirectory()) {
301 PANIC(kLogStderr, "Trying to clone a directory: '%s', aborting",
302 source.c_str());
303 }
304
305 // if the file is already there we remove it and we add it back
306 DirectoryEntry check_dirent;
307 const bool destination_already_present =
308 LookupPath(MakeRelativePath(destination), kLookupDefault, &check_dirent);
309 if (destination_already_present) {
310 this->RemoveFile(destination);
311 }
312
313 DirectoryEntry destination_dirent(source_dirent);
314 std::string destination_dirname;
315 std::string destination_filename;
316 SplitPath(destination, &destination_dirname, &destination_filename);
317
318 destination_dirent.name_.Assign(
319 NameString(destination_filename.c_str(), destination_filename.length()));
320
321 // TODO(jblomer): clone is used by tarball engine and should eventually
322 // support extended attributes
323 this->AddFile(destination_dirent, empty_xattrs, destination_dirname);
324 }
325
326
327 /**
328 * Copies an entire directory tree from the existing from_dir to the
329 * non-existing to_dir. The destination's parent directory must exist. On the
330 * catalog level, the new entries will be identical to the old ones except
331 * for their path hash fields.
332 */
333 149 void WritableCatalogManager::CloneTree(const std::string &from_dir,
334 const std::string &to_dir) {
335 // Sanitize input paths
336
6/6
✓ Branch 1 taken 119 times.
✓ Branch 2 taken 30 times.
✓ Branch 4 taken 15 times.
✓ Branch 5 taken 104 times.
✓ Branch 6 taken 45 times.
✓ Branch 7 taken 104 times.
149 if (from_dir.empty() || to_dir.empty())
337 45 PANIC(kLogStderr, "clone tree from or to root impossible");
338
339
1/2
✓ Branch 1 taken 104 times.
✗ Branch 2 not taken.
104 const std::string relative_source = MakeRelativePath(from_dir);
340
1/2
✓ Branch 1 taken 104 times.
✗ Branch 2 not taken.
104 const std::string relative_dest = MakeRelativePath(to_dir);
341
342
2/2
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 89 times.
104 if (relative_source == relative_dest) {
343 15 PANIC(kLogStderr, "cannot clone tree into itself ('%s')", to_dir.c_str());
344 }
345
4/7
✓ Branch 1 taken 89 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 89 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 15 times.
✓ Branch 8 taken 74 times.
89 if (HasPrefix(relative_dest, relative_source + "/", false /*ignore_case*/)) {
346 15 PANIC(kLogStderr,
347 "cannot clone tree into sub directory of source '%s' --> '%s'",
348 from_dir.c_str(), to_dir.c_str());
349 }
350
351
1/2
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
74 DirectoryEntry source_dirent;
352
3/4
✓ Branch 1 taken 74 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
✓ Branch 4 taken 59 times.
74 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
353 15 PANIC(kLogStderr, "path '%s' cannot be found, aborting", from_dir.c_str());
354 }
355
2/2
✓ Branch 1 taken 15 times.
✓ Branch 2 taken 44 times.
59 if (!source_dirent.IsDirectory()) {
356 15 PANIC(kLogStderr, "CloneTree: source '%s' not a directory, aborting",
357 from_dir.c_str());
358 }
359
360
1/2
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
44 DirectoryEntry dest_dirent;
361
3/4
✓ Branch 1 taken 44 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
✓ Branch 4 taken 29 times.
44 if (LookupPath(relative_dest, kLookupDefault, &dest_dirent)) {
362 15 PANIC(kLogStderr, "destination '%s' exists, aborting", to_dir.c_str());
363 }
364
365
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 const std::string dest_parent = GetParentPath(relative_dest);
366
1/2
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
29 DirectoryEntry dest_parent_dirent;
367
3/4
✓ Branch 1 taken 29 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
✓ Branch 4 taken 14 times.
29 if (!LookupPath(dest_parent, kLookupDefault, &dest_parent_dirent)) {
368 15 PANIC(kLogStderr, "destination '%s' not on a known path, aborting",
369 to_dir.c_str());
370 }
371
372
2/4
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
14 CloneTreeImpl(PathString(from_dir),
373
1/2
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
28 GetParentPath(to_dir),
374
2/4
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 14 times.
✗ Branch 5 not taken.
28 NameString(GetFileName(to_dir)));
375 314 }
376
377
378 /**
379 * Called from CloneTree(), assumes that from_dir and to_dir are sufficiently
380 * sanitized
381 */
382 112 void WritableCatalogManager::CloneTreeImpl(const PathString &source_dir,
383 const std::string &dest_parent_dir,
384 const NameString &dest_name) {
385
1/2
✓ Branch 4 taken 112 times.
✗ Branch 5 not taken.
112 LogCvmfs(kLogCatalog, kLogDebug, "cloning %s --> %s/%s", source_dir.c_str(),
386
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
224 dest_parent_dir.c_str(), dest_name.ToString().c_str());
387
3/6
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 112 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 112 times.
✗ Branch 8 not taken.
224 const PathString relative_source(MakeRelativePath(source_dir.ToString()));
388
389
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 DirectoryEntry source_dirent;
390
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 bool retval = LookupPath(relative_source, kLookupDefault, &source_dirent);
391
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 112 times.
112 assert(retval);
392
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 112 times.
112 assert(!source_dirent.IsBindMountpoint());
393
394
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 DirectoryEntry dest_dirent(source_dirent);
395
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 dest_dirent.name_.Assign(dest_name);
396 // Just in case, reset the nested catalog markers
397 112 dest_dirent.set_is_nested_catalog_mountpoint(false);
398 112 dest_dirent.set_is_nested_catalog_root(false);
399
400 112 XattrList xattrs;
401
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 112 times.
112 if (source_dirent.HasXattrs()) {
402 retval = LookupXattrs(relative_source, &xattrs);
403 assert(retval);
404 }
405
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 AddDirectory(dest_dirent, xattrs, dest_parent_dir);
406
407
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 std::string dest_dir = dest_parent_dir;
408
2/2
✓ Branch 1 taken 98 times.
✓ Branch 2 taken 14 times.
112 if (!dest_dir.empty())
409
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 dest_dir.push_back('/');
410
2/4
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 112 times.
✗ Branch 5 not taken.
112 dest_dir += dest_name.ToString();
411 112 if (source_dirent.IsNestedCatalogRoot()
412
5/6
✓ Branch 0 taken 70 times.
✓ Branch 1 taken 42 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 70 times.
✓ Branch 5 taken 42 times.
✓ Branch 6 taken 70 times.
112 || source_dirent.IsNestedCatalogMountpoint()) {
413
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 CreateNestedCatalog(dest_dir);
414 }
415
416 112 DirectoryEntryList ls;
417
1/2
✓ Branch 1 taken 112 times.
✗ Branch 2 not taken.
112 retval = Listing(relative_source, &ls, false /* expand_symlink */);
418
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 112 times.
112 assert(retval);
419
2/2
✓ Branch 1 taken 238 times.
✓ Branch 2 taken 112 times.
350 for (unsigned i = 0; i < ls.size(); ++i) {
420
1/2
✓ Branch 1 taken 238 times.
✗ Branch 2 not taken.
238 PathString sub_path(source_dir);
421
2/4
✓ Branch 1 taken 238 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 238 times.
238 assert(!sub_path.IsEmpty());
422
1/2
✓ Branch 1 taken 238 times.
✗ Branch 2 not taken.
238 sub_path.Append("/", 1);
423
3/6
✓ Branch 2 taken 238 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 238 times.
✗ Branch 8 not taken.
✓ Branch 11 taken 238 times.
✗ Branch 12 not taken.
238 sub_path.Append(ls[i].name().GetChars(), ls[i].name().GetLength());
424
425
2/2
✓ Branch 2 taken 98 times.
✓ Branch 3 taken 140 times.
238 if (ls[i].IsDirectory()) {
426
2/4
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 98 times.
✗ Branch 6 not taken.
98 CloneTreeImpl(sub_path, dest_dir, ls[i].name());
427 98 continue;
428 }
429
430 // We break hard-links during cloning
431 140 ls[i].set_hardlink_group(0);
432 140 ls[i].set_linkcount(1);
433
434 140 xattrs.Clear();
435
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 140 times.
140 if (ls[i].HasXattrs()) {
436 retval = LookupXattrs(sub_path, &xattrs);
437 assert(retval);
438 }
439
440
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 140 times.
140 if (ls[i].IsChunkedFile()) {
441 FileChunkList chunks;
442 const std::string relative_sub_path =
443 MakeRelativePath(sub_path.ToString());
444 retval = ListFileChunks(PathString(relative_sub_path),
445 ls[i].hash_algorithm(), &chunks);
446 assert(retval);
447 AddChunkedFile(ls[i], xattrs, dest_dir, chunks);
448 } else {
449
1/2
✓ Branch 2 taken 140 times.
✗ Branch 3 not taken.
140 AddFile(ls[i], xattrs, dest_dir);
450 }
451
2/2
✓ Branch 1 taken 140 times.
✓ Branch 2 taken 98 times.
238 }
452 112 }
453
454
455 /**
456 * Add a new directory to the catalogs.
457 * @param entry a DirectoryEntry structure describing the new directory
458 * @param parent_directory the absolute path of the directory containing the
459 * directory to be created
460 * @return true on success, false otherwise
461 */
462 4192 void WritableCatalogManager::AddDirectory(const DirectoryEntryBase &entry,
463 const XattrList &xattrs,
464 const std::string &parent_directory) {
465
1/2
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
4192 const string parent_path = MakeRelativePath(parent_directory);
466
1/2
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
4192 string directory_path = parent_path + "/";
467
3/6
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 4192 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4192 times.
✗ Branch 10 not taken.
4192 directory_path.append(entry.name().GetChars(), entry.name().GetLength());
468
469 4192 SyncLock();
470 WritableCatalog *catalog;
471
1/2
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
4192 DirectoryEntry parent_entry;
472
2/4
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 4192 times.
4192 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
473 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
474 directory_path.c_str());
475 }
476
477
1/2
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
4192 DirectoryEntry fixed_hardlink_count(entry);
478 4192 fixed_hardlink_count.set_linkcount(2);
479
1/2
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
4192 catalog->AddEntry(fixed_hardlink_count, xattrs, directory_path, parent_path);
480
481 4192 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
482
1/2
✓ Branch 1 taken 4192 times.
✗ Branch 2 not taken.
4192 catalog->UpdateEntry(parent_entry, parent_path);
483
2/2
✓ Branch 1 taken 42 times.
✓ Branch 2 taken 4150 times.
4192 if (parent_entry.IsNestedCatalogRoot()) {
484
1/2
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
42 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
485 parent_path.c_str());
486 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
487 42 catalog->parent());
488 42 parent_entry.set_is_nested_catalog_mountpoint(true);
489 42 parent_entry.set_is_nested_catalog_root(false);
490
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 parent_catalog->UpdateEntry(parent_entry, parent_path);
491 }
492 4192 SyncUnlock();
493 4192 }
494
495 /**
496 * Add a new file to the catalogs.
497 * @param entry a DirectoryEntry structure describing the new file
498 * @param parent_directory the absolute path of the directory containing the
499 * file to be created
500 * @return true on success, false otherwise
501 */
502 5495 void WritableCatalogManager::AddFile(const DirectoryEntry &entry,
503 const XattrList &xattrs,
504 const std::string &parent_directory) {
505
1/2
✓ Branch 1 taken 5495 times.
✗ Branch 2 not taken.
5495 const string parent_path = MakeRelativePath(parent_directory);
506
1/2
✓ Branch 1 taken 5495 times.
✗ Branch 2 not taken.
5495 const string file_path = entry.GetFullPath(parent_path);
507
508 5495 SyncLock();
509 WritableCatalog *catalog;
510
2/4
✓ Branch 1 taken 5495 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 5495 times.
5495 if (!FindCatalog(parent_path, &catalog)) {
511 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
512 file_path.c_str());
513 }
514
515
4/6
✓ Branch 1 taken 5349 times.
✓ Branch 2 taken 146 times.
✓ Branch 4 taken 5349 times.
✗ Branch 5 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 5349 times.
5495 assert(!entry.IsRegular() || entry.IsChunkedFile()
516 || !entry.checksum().IsNull());
517
3/4
✓ Branch 1 taken 146 times.
✓ Branch 2 taken 5349 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 146 times.
5495 assert(entry.IsRegular() || !entry.IsExternalFile());
518
519 // check if file is too big
520
1/2
✓ Branch 1 taken 5495 times.
✗ Branch 2 not taken.
5495 const unsigned mbytes = entry.size() / (1024 * 1024);
521
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5495 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
5495 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
522 LogCvmfs(kLogCatalog, kLogStderr,
523 "%s: file at %s is larger than %u megabytes (%u). "
524 "CernVM-FS works best with small files. "
525 "Please remove the file or increase the limit.",
526 enforce_limits_ ? "FATAL" : "WARNING", file_path.c_str(),
527 file_mbyte_limit_, mbytes);
528 if (enforce_limits_)
529 PANIC(kLogStderr, "file at %s is larger than %u megabytes (%u).",
530 file_path.c_str(), file_mbyte_limit_, mbytes);
531 }
532
533
1/2
✓ Branch 1 taken 5495 times.
✗ Branch 2 not taken.
5495 catalog->AddEntry(entry, xattrs, file_path, parent_path);
534 5495 SyncUnlock();
535 5495 }
536
537
538 void WritableCatalogManager::AddChunkedFile(const DirectoryEntryBase &entry,
539 const XattrList &xattrs,
540 const std::string &parent_directory,
541 const FileChunkList &file_chunks) {
542 assert(file_chunks.size() > 0);
543
544 DirectoryEntry full_entry(entry);
545 full_entry.set_is_chunked_file(true);
546
547 AddFile(full_entry, xattrs, parent_directory);
548
549 const string parent_path = MakeRelativePath(parent_directory);
550 const string file_path = entry.GetFullPath(parent_path);
551
552 SyncLock();
553 WritableCatalog *catalog;
554 if (!FindCatalog(parent_path, &catalog)) {
555 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
556 file_path.c_str());
557 }
558
559 for (unsigned i = 0; i < file_chunks.size(); ++i) {
560 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
561 }
562 SyncUnlock();
563 }
564
565
566 /**
567 * Add a hardlink group to the catalogs.
568 * @param entries a list of DirectoryEntries describing the new files
569 * @param parent_directory the absolute path of the directory containing the
570 * files to be created
571 * @return true on success, false otherwise
572 */
573 void WritableCatalogManager::AddHardlinkGroup(
574 const DirectoryEntryBaseList &entries,
575 const XattrList &xattrs,
576 const std::string &parent_directory,
577 const FileChunkList &file_chunks) {
578 assert(entries.size() >= 1);
579 assert(file_chunks.IsEmpty() || entries[0].IsRegular());
580 if (entries.size() == 1) {
581 DirectoryEntry fix_linkcount(entries[0]);
582 fix_linkcount.set_linkcount(1);
583 if (file_chunks.IsEmpty())
584 return AddFile(fix_linkcount, xattrs, parent_directory);
585 return AddChunkedFile(fix_linkcount, xattrs, parent_directory, file_chunks);
586 }
587
588 LogCvmfs(kLogCatalog, kLogVerboseMsg, "adding hardlink group %s/%s",
589 parent_directory.c_str(), entries[0].name().c_str());
590
591 // Hardlink groups have to reside in the same directory.
592 // Therefore we only have one parent directory here
593 const string parent_path = MakeRelativePath(parent_directory);
594
595 // check if hard link is too big
596 const unsigned mbytes = entries[0].size() / (1024 * 1024);
597 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
598 LogCvmfs(kLogCatalog, kLogStderr,
599 "%s: hard link at %s is larger than %u megabytes (%u). "
600 "CernVM-FS works best with small files. "
601 "Please remove the file or increase the limit.",
602 enforce_limits_ ? "FATAL" : "WARNING",
603 (parent_path + entries[0].name().ToString()).c_str(),
604 file_mbyte_limit_, mbytes);
605 if (enforce_limits_)
606 PANIC(kLogStderr, "hard link at %s is larger than %u megabytes (%u)",
607 (parent_path + entries[0].name().ToString()).c_str(),
608 file_mbyte_limit_, mbytes);
609 }
610
611 SyncLock();
612 WritableCatalog *catalog;
613 if (!FindCatalog(parent_path, &catalog)) {
614 PANIC(kLogStderr,
615 "catalog for hardlink group containing '%s' cannot be found",
616 parent_path.c_str());
617 }
618
619 // Get a valid hardlink group id for the catalog the group will end up in
620 // TODO(unknown): Compaction
621 const uint32_t new_group_id = catalog->GetMaxLinkId() + 1;
622 LogCvmfs(kLogCatalog, kLogVerboseMsg, "hardlink group id %u issued",
623 new_group_id);
624 assert(new_group_id > 0);
625
626 // Add the file entries to the catalog
627 for (DirectoryEntryBaseList::const_iterator i = entries.begin(),
628 iEnd = entries.end();
629 i != iEnd;
630 ++i) {
631 string file_path = parent_path + "/";
632 file_path.append(i->name().GetChars(), i->name().GetLength());
633
634 // create a fully fledged DirectoryEntry to add the hardlink group to it
635 // which is CVMFS specific meta data.
636 DirectoryEntry hardlink(*i);
637 hardlink.set_hardlink_group(new_group_id);
638 hardlink.set_linkcount(entries.size());
639 hardlink.set_is_chunked_file(!file_chunks.IsEmpty());
640
641 catalog->AddEntry(hardlink, xattrs, file_path, parent_path);
642 if (hardlink.IsChunkedFile()) {
643 for (unsigned i = 0; i < file_chunks.size(); ++i) {
644 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
645 }
646 }
647 }
648 SyncUnlock();
649 }
650
651
652 void WritableCatalogManager::ShrinkHardlinkGroup(const string &remove_path) {
653 const string relative_path = MakeRelativePath(remove_path);
654
655 SyncLock();
656 WritableCatalog *catalog;
657 if (!FindCatalog(relative_path, &catalog)) {
658 PANIC(kLogStderr,
659 "catalog for hardlink group containing '%s' cannot be found",
660 remove_path.c_str());
661 }
662
663 catalog->IncLinkcount(relative_path, -1);
664 SyncUnlock();
665 }
666
667
668 /**
669 * Update entry meta data (mode, owner, ...).
670 * CVMFS specific meta data (i.e. nested catalog transition points) are NOT
671 * changed by this method, although transition points intrinsics are taken into
672 * account, to keep nested catalogs consistent.
673 * @param entry the directory entry to be touched
674 * @param path the path of the directory entry to be touched
675 */
676 49 void WritableCatalogManager::TouchDirectory(const DirectoryEntryBase &entry,
677 const XattrList &xattrs,
678 const std::string &directory_path) {
679
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 49 times.
49 assert(entry.IsDirectory());
680
681
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 const string entry_path = MakeRelativePath(directory_path);
682
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 const string parent_path = GetParentPath(entry_path);
683
684 49 SyncLock();
685 // find the catalog to be updated
686 WritableCatalog *catalog;
687
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 49 times.
49 if (!FindCatalog(parent_path, &catalog)) {
688 PANIC(kLogStderr, "catalog for entry '%s' cannot be found",
689 entry_path.c_str());
690 }
691
692
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 catalog->TouchEntry(entry, xattrs, entry_path);
693
694 // since we deal with a directory here, we might just touch a
695 // nested catalog transition point. If this is the case we would need to
696 // update two catalog entries:
697 // * the nested catalog MOUNTPOINT in the parent catalog
698 // * the nested catalog ROOT in the nested catalog
699
700 // first check if we really have a nested catalog transition point
701
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 catalog::DirectoryEntry potential_transition_point;
702
1/2
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
49 const PathString transition_path(entry_path.data(), entry_path.length());
703
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 bool retval = catalog->LookupPath(transition_path,
704 &potential_transition_point);
705
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 49 times.
49 assert(retval);
706
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 49 times.
49 if (potential_transition_point.IsNestedCatalogMountpoint()) {
707 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point at %s",
708 entry_path.c_str());
709
710 // find and mount nested catalog associated to this transition point
711 shash::Any nested_hash;
712 uint64_t nested_size;
713 retval = catalog->FindNested(transition_path, &nested_hash, &nested_size);
714 assert(retval);
715 Catalog *nested_catalog;
716 nested_catalog = MountCatalog(transition_path, nested_hash, catalog);
717 assert(nested_catalog != NULL);
718
719 // update nested catalog root in the child catalog
720 reinterpret_cast<WritableCatalog *>(nested_catalog)
721 ->TouchEntry(entry, xattrs, entry_path);
722 }
723
724 49 SyncUnlock();
725 49 }
726
727
728 /**
729 * Create a new nested catalog. Includes moving all entries belonging there
730 * from it's parent catalog.
731 * @param mountpoint the path of the directory to become a nested root
732 * @return true on success, false otherwise
733 */
734 981 void WritableCatalogManager::CreateNestedCatalog(
735 const std::string &mountpoint) {
736
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 const string nested_root_path = MakeRelativePath(mountpoint);
737
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 const PathString ps_nested_root_path(nested_root_path);
738
739 981 SyncLock();
740 // Find the catalog currently containing the directory structure, which
741 // will be represented as a new nested catalog and its root-entry/mountpoint
742 // along the way
743 981 WritableCatalog *old_catalog = NULL;
744
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 DirectoryEntry new_root_entry;
745
2/4
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 981 times.
981 if (!FindCatalog(nested_root_path, &old_catalog, &new_root_entry)) {
746 PANIC(kLogStderr,
747 "failed to create nested catalog '%s': "
748 "mountpoint was not found in current catalog structure",
749 nested_root_path.c_str());
750 }
751
752 // Create the database schema and the initial root entry
753 // for the new nested catalog
754
1/2
✓ Branch 2 taken 981 times.
✗ Branch 3 not taken.
981 const string database_file_path = CreateTempPath(dir_temp() + "/catalog",
755
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 0666);
756 981 const bool volatile_content = false;
757
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 CatalogDatabase *new_catalog_db = CatalogDatabase::Create(database_file_path);
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 981 times.
981 assert(NULL != new_catalog_db);
759 // Note we do not set the external_data bit for nested catalogs
760
2/4
✓ Branch 2 taken 981 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 981 times.
✗ Branch 6 not taken.
981 bool retval = new_catalog_db->InsertInitialValues(
761 nested_root_path,
762 volatile_content,
763 "", // At this point, only root
764 // catalog gets VOMS authz
765 new_root_entry);
766
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 981 times.
981 assert(retval);
767 // TODO(rmeusel): we need a way to attach a catalog directly from an open
768 // database to remove this indirection
769
1/2
✓ Branch 0 taken 981 times.
✗ Branch 1 not taken.
981 delete new_catalog_db;
770 981 new_catalog_db = NULL;
771
772 // Attach the just created nested catalog
773
2/4
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 981 times.
✗ Branch 5 not taken.
981 Catalog *new_catalog = CreateCatalog(ps_nested_root_path, shash::Any(),
774 old_catalog);
775
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 retval = AttachCatalog(database_file_path, new_catalog);
776
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 981 times.
981 assert(retval);
777
778
2/4
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 981 times.
981 assert(new_catalog->IsWritable());
779 981 WritableCatalog *wr_new_catalog = static_cast<WritableCatalog *>(new_catalog);
780
781
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 981 times.
981 if (new_root_entry.HasXattrs()) {
782 XattrList xattrs;
783 retval = old_catalog->LookupXattrsPath(ps_nested_root_path, &xattrs);
784 assert(retval);
785 wr_new_catalog->TouchEntry(new_root_entry, xattrs, nested_root_path);
786 }
787
788 // From now on, there are two catalogs, spanning the same directory structure
789 // we have to split the overlapping directory entries from the old catalog
790 // to the new catalog to re-gain a valid catalog structure
791
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 old_catalog->Partition(wr_new_catalog);
792
793 // Add the newly created nested catalog to the references of the containing
794 // catalog
795
4/8
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 981 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 981 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 981 times.
✗ Branch 11 not taken.
981 old_catalog->InsertNestedCatalog(new_catalog->mountpoint().ToString(), NULL,
796 981 shash::Any(spooler_->GetHashAlgorithm()), 0);
797
798 // Fix subtree counters in new nested catalogs: subtree is the sum of all
799 // entries of all "grand-nested" catalogs
800 // Note: taking a copy of the nested catalog list here
801 const Catalog::NestedCatalogList
802
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 &grand_nested = wr_new_catalog->ListOwnNestedCatalogs();
803
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 DeltaCounters fix_subtree_counters;
804 1962 for (Catalog::NestedCatalogList::const_iterator i = grand_nested.begin(),
805 981 iEnd = grand_nested.end();
806
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 981 times.
981 i != iEnd;
807 ++i) {
808 WritableCatalog *grand_catalog;
809 retval = FindCatalog(i->mountpoint.ToString(), &grand_catalog);
810 assert(retval);
811 const Counters &grand_counters = grand_catalog->GetCounters();
812 grand_counters.AddAsSubtree(&fix_subtree_counters);
813 }
814 981 const DeltaCounters save_counters = wr_new_catalog->delta_counters_;
815 981 wr_new_catalog->delta_counters_ = fix_subtree_counters;
816
1/2
✓ Branch 1 taken 981 times.
✗ Branch 2 not taken.
981 wr_new_catalog->UpdateCounters();
817 981 wr_new_catalog->delta_counters_ = save_counters;
818
819 981 SyncUnlock();
820 981 }
821
822
823 /**
824 * Remove a nested catalog
825 *
826 * If the merged parameter is true, when you remove a nested catalog
827 * all entries currently held by it will be merged into its parent
828 * catalog.
829 * @param mountpoint - the path of the nested catalog to be removed
830 * @param merge - merge the subtree associated with the nested catalog
831 * into its parent catalog
832 * @return - true on success, false otherwise
833 */
834 79 void WritableCatalogManager::RemoveNestedCatalog(const string &mountpoint,
835 const bool merge) {
836
1/2
✓ Branch 1 taken 79 times.
✗ Branch 2 not taken.
79 const string nested_root_path = MakeRelativePath(mountpoint);
837
838 79 SyncLock();
839 // Find the catalog which should be removed
840 79 WritableCatalog *nested_catalog = NULL;
841
2/4
✓ Branch 1 taken 79 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 79 times.
79 if (!FindCatalog(nested_root_path, &nested_catalog)) {
842 PANIC(kLogStderr,
843 "failed to remove nested catalog '%s': "
844 "mountpoint was not found in current catalog structure",
845 nested_root_path.c_str());
846 }
847
848 // Check if the found catalog is really the nested catalog to be deleted
849
6/19
✓ Branch 1 taken 79 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 79 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 79 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 79 times.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
✓ Branch 13 taken 79 times.
✗ Branch 14 not taken.
✓ Branch 16 taken 79 times.
✗ Branch 17 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
158 assert(!nested_catalog->IsRoot()
850 && (nested_catalog->mountpoint().ToString() == nested_root_path));
851
852
1/2
✓ Branch 0 taken 79 times.
✗ Branch 1 not taken.
79 if (merge) {
853 // Merge all data from the nested catalog into it's parent
854
1/2
✓ Branch 1 taken 79 times.
✗ Branch 2 not taken.
79 nested_catalog->MergeIntoParent();
855 } else {
856 nested_catalog->RemoveFromParent();
857 }
858
859 // Delete the catalog database file from the working copy
860
2/6
✓ Branch 1 taken 79 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 79 times.
79 if (unlink(nested_catalog->database_path().c_str()) != 0) {
861 PANIC(kLogStderr,
862 "unable to delete the removed nested catalog database file '%s'",
863 nested_catalog->database_path().c_str());
864 }
865
866 // Remove the catalog from internal data structures
867
1/2
✓ Branch 1 taken 79 times.
✗ Branch 2 not taken.
79 DetachCatalog(nested_catalog);
868 79 SyncUnlock();
869 79 }
870
871
872 /**
873 * Swap in a new nested catalog
874 *
875 * The old nested catalog must not have been already attached to the
876 * catalog tree. This method will not attach the new nested catalog
877 * to the catalog tree.
878 *
879 * @param mountpoint - the path of the nested catalog to be removed
880 * @param new_hash - the hash of the new nested catalog
881 * @param new_size - the size of the new nested catalog
882 */
883 102 void WritableCatalogManager::SwapNestedCatalog(const string &mountpoint,
884 const shash::Any &new_hash,
885 const uint64_t new_size) {
886
1/2
✓ Branch 1 taken 102 times.
✗ Branch 2 not taken.
102 const string nested_root_path = MakeRelativePath(mountpoint);
887
1/2
✓ Branch 1 taken 102 times.
✗ Branch 2 not taken.
102 const string parent_path = GetParentPath(nested_root_path);
888
1/2
✓ Branch 1 taken 102 times.
✗ Branch 2 not taken.
102 const PathString nested_root_ps = PathString(nested_root_path);
889
890 102 SyncLock();
891
892 // Find the immediate parent catalog
893 102 WritableCatalog *parent = NULL;
894
3/4
✓ Branch 1 taken 102 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
✓ Branch 4 taken 87 times.
102 if (!FindCatalog(parent_path, &parent)) {
895 15 SyncUnlock(); // this is needed for the unittest. otherwise they get stuck
896 15 PANIC(kLogStderr,
897 "failed to swap nested catalog '%s': could not find parent '%s'",
898 nested_root_path.c_str(), parent_path.c_str());
899 }
900
901 // Get old nested catalog counters
902
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 Catalog *old_attached_catalog = parent->FindChild(nested_root_ps);
903
1/2
✓ Branch 1 taken 87 times.
✗ Branch 2 not taken.
87 Counters old_counters;
904
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 72 times.
87 if (old_attached_catalog) {
905 // Old catalog was already attached (e.g. as a child catalog
906 // attached by a prior call to CreateNestedCatalog()). Ensure
907 // that it has not been modified, get counters, and detach it.
908 15 WritableCatalogList list;
909
2/4
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
✗ Branch 4 not taken.
15 if (GetModifiedCatalogLeafsRecursively(old_attached_catalog, &list)) {
910 15 SyncUnlock();
911 15 PANIC(kLogStderr,
912 "failed to swap nested catalog '%s': already modified",
913 nested_root_path.c_str());
914 }
915 old_counters = old_attached_catalog->GetCounters();
916 DetachSubtree(old_attached_catalog);
917
918 15 } else {
919 // Old catalog was not attached. Download a freely attached
920 // version and get counters.
921
1/2
✓ Branch 1 taken 72 times.
✗ Branch 2 not taken.
72 shash::Any old_hash;
922 uint64_t old_size;
923
1/2
✓ Branch 1 taken 72 times.
✗ Branch 2 not taken.
72 const bool old_found = parent->FindNested(nested_root_ps, &old_hash,
924 &old_size);
925
2/2
✓ Branch 0 taken 15 times.
✓ Branch 1 taken 57 times.
72 if (!old_found) {
926 15 SyncUnlock();
927 15 PANIC(kLogStderr,
928 "failed to swap nested catalog '%s': not found in parent",
929 nested_root_path.c_str());
930 }
931 const UniquePtr<Catalog> old_free_catalog(
932
2/4
✓ Branch 1 taken 57 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 57 times.
✗ Branch 5 not taken.
57 LoadFreeCatalog(nested_root_ps, old_hash));
933
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 57 times.
57 if (!old_free_catalog.IsValid()) {
934 SyncUnlock();
935 PANIC(kLogStderr,
936 "failed to swap nested catalog '%s': failed to load old catalog",
937 nested_root_path.c_str());
938 }
939 57 old_counters = old_free_catalog->GetCounters();
940 57 }
941
942 // Load freely attached new catalog
943 const UniquePtr<Catalog> new_catalog(
944
3/4
✓ Branch 1 taken 42 times.
✓ Branch 2 taken 15 times.
✓ Branch 4 taken 42 times.
✗ Branch 5 not taken.
57 LoadFreeCatalog(nested_root_ps, new_hash));
945
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (!new_catalog.IsValid()) {
946 SyncUnlock();
947 PANIC(kLogStderr,
948 "failed to swap nested catalog '%s': failed to load new catalog",
949 nested_root_path.c_str());
950 }
951
952 // Get new catalog root directory entry
953
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 DirectoryEntry dirent;
954 42 XattrList xattrs;
955
1/2
✓ Branch 2 taken 42 times.
✗ Branch 3 not taken.
42 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
956
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 42 times.
42 if (!dirent_found) {
957 SyncUnlock();
958 PANIC(kLogStderr,
959 "failed to swap nested catalog '%s': missing dirent in new catalog",
960 nested_root_path.c_str());
961 }
962
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (dirent.HasXattrs()) {
963 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
964 &xattrs);
965 if (!xattrs_found) {
966 SyncUnlock();
967 PANIC(kLogStderr,
968 "failed to swap nested catalog '%s': missing xattrs in new catalog",
969 nested_root_path.c_str());
970 }
971 }
972
973 // Swap catalogs
974
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 parent->RemoveNestedCatalog(nested_root_path, NULL);
975
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 parent->InsertNestedCatalog(nested_root_path, NULL, new_hash, new_size);
976
977 // Update parent directory entry
978 42 dirent.set_is_nested_catalog_mountpoint(true);
979 42 dirent.set_is_nested_catalog_root(false);
980
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 parent->UpdateEntry(dirent, nested_root_path);
981
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 parent->TouchEntry(dirent, xattrs, nested_root_path);
982
983 // Update counters
984 const DeltaCounters delta =
985
1/2
✓ Branch 3 taken 42 times.
✗ Branch 4 not taken.
42 Counters::Diff(old_counters, new_catalog->GetCounters());
986
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 delta.PopulateToParent(&parent->delta_counters_);
987
988 42 SyncUnlock();
989 222 }
990
991 /**
992 * Install a nested catalog (catalog hierarchy) at a new, empty directory
993 *
994 * The mountpoint directory must not yet exist. Its parent directory, however
995 * must exist. This method combines functionality from AddDirectory(),
996 * CreateNestedCatalog() and SwapNestedCatalog().
997 * The new nested catalog won't get attached.
998 *
999 * @param mountpoint - the path where the nested catalog should be installed
1000 * @param new_hash - the hash of the new nested catalog
1001 * @param new_size - the size of the new nested catalog
1002 */
1003 45 void WritableCatalogManager::GraftNestedCatalog(const string &mountpoint,
1004 const shash::Any &new_hash,
1005 const uint64_t new_size) {
1006
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 const string nested_root_path = MakeRelativePath(mountpoint);
1007
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 const string parent_path = GetParentPath(nested_root_path);
1008
1/2
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
45 const PathString nested_root_ps = PathString(nested_root_path);
1009
1010
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
45 assert(!nested_root_path.empty());
1011
1012 // Load freely attached new catalog
1013 const UniquePtr<Catalog> new_catalog(
1014
2/4
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 45 times.
✗ Branch 5 not taken.
45 LoadFreeCatalog(nested_root_ps, new_hash));
1015
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 45 times.
45 if (!new_catalog.IsValid()) {
1016 PANIC(kLogStderr,
1017 "failed to graft nested catalog '%s': failed to load new catalog",
1018 nested_root_path.c_str());
1019 }
1020
4/7
✓ Branch 2 taken 45 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 45 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 15 times.
✓ Branch 9 taken 30 times.
45 if (new_catalog->root_prefix() != nested_root_ps) {
1021
2/4
✓ Branch 2 taken 15 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 15 times.
✗ Branch 6 not taken.
45 PANIC(kLogStderr,
1022 "invalid nested catalog for grafting at '%s': catalog rooted at '%s'",
1023 nested_root_path.c_str(),
1024 new_catalog->root_prefix().ToString().c_str());
1025 }
1026
1027 // Get new catalog root directory entry
1028
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 DirectoryEntry dirent;
1029 30 XattrList xattrs;
1030
1/2
✓ Branch 2 taken 30 times.
✗ Branch 3 not taken.
30 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
1031
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 30 times.
30 if (!dirent_found) {
1032 PANIC(kLogStderr,
1033 "failed to swap nested catalog '%s': missing dirent in new catalog",
1034 nested_root_path.c_str());
1035 }
1036
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 30 times.
30 if (dirent.HasXattrs()) {
1037 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
1038 &xattrs);
1039 if (!xattrs_found) {
1040 PANIC(kLogStderr,
1041 "failed to swap nested catalog '%s': missing xattrs in new catalog",
1042 nested_root_path.c_str());
1043 }
1044 }
1045 // Transform the nested catalog root into a transition point to be inserted
1046 // in the parent catalog
1047 30 dirent.set_is_nested_catalog_root(false);
1048 30 dirent.set_is_nested_catalog_mountpoint(true);
1049
1050 // Add directory and nested catalog
1051
1052 30 SyncLock();
1053 WritableCatalog *parent_catalog;
1054
1/2
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
30 DirectoryEntry parent_entry;
1055
2/4
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 30 times.
30 if (!FindCatalog(parent_path, &parent_catalog, &parent_entry)) {
1056 SyncUnlock();
1057 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1058 parent_path.c_str());
1059 }
1060
3/4
✓ Branch 1 taken 30 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 15 times.
✓ Branch 4 taken 15 times.
30 if (parent_catalog->LookupPath(nested_root_ps, NULL)) {
1061 15 SyncUnlock();
1062 15 PANIC(kLogStderr,
1063 "invalid attempt to graft nested catalog into existing "
1064 "directory '%s'",
1065 nested_root_path.c_str());
1066 }
1067
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 parent_catalog->AddEntry(dirent, xattrs, nested_root_path, parent_path);
1068 15 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
1069
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 parent_catalog->UpdateEntry(parent_entry, parent_path);
1070
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 if (parent_entry.IsNestedCatalogRoot()) {
1071 WritableCatalog *grand_parent_catalog = reinterpret_cast<WritableCatalog *>(
1072 15 parent_catalog->parent());
1073 15 parent_entry.set_is_nested_catalog_root(false);
1074 15 parent_entry.set_is_nested_catalog_mountpoint(true);
1075
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 grand_parent_catalog->UpdateEntry(parent_entry, parent_path);
1076 }
1077
1078
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 parent_catalog->InsertNestedCatalog(nested_root_path, NULL, new_hash,
1079 new_size);
1080
1081 // Fix-up counters
1082
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 const Counters counters;
1083 const DeltaCounters delta =
1084
1/2
✓ Branch 3 taken 15 times.
✗ Branch 4 not taken.
15 Counters::Diff(counters, new_catalog->GetCounters());
1085
1/2
✓ Branch 1 taken 15 times.
✗ Branch 2 not taken.
15 delta.PopulateToParent(&parent_catalog->delta_counters_);
1086
1087 15 SyncUnlock();
1088 180 }
1089
1090 /**
1091 * Checks if a nested catalog starts at this path. The path must be valid.
1092 */
1093 bool WritableCatalogManager::IsTransitionPoint(const string &mountpoint) {
1094 const string path = MakeRelativePath(mountpoint);
1095
1096 SyncLock();
1097 WritableCatalog *catalog;
1098 DirectoryEntry entry;
1099 if (!FindCatalog(path, &catalog, &entry)) {
1100 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1101 path.c_str());
1102 }
1103 const bool result = entry.IsNestedCatalogRoot();
1104 SyncUnlock();
1105 return result;
1106 }
1107
1108
1109 void WritableCatalogManager::PrecalculateListings() {
1110 // TODO(jblomer): meant for micro catalogs
1111 }
1112
1113
1114 void WritableCatalogManager::SetTTL(const uint64_t new_ttl) {
1115 SyncLock();
1116 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetTTL(new_ttl);
1117 SyncUnlock();
1118 }
1119
1120
1121 bool WritableCatalogManager::SetVOMSAuthz(const std::string &voms_authz) {
1122 bool result;
1123 SyncLock();
1124 result = reinterpret_cast<WritableCatalog *>(GetRootCatalog())
1125 ->SetVOMSAuthz(voms_authz);
1126 SyncUnlock();
1127 return result;
1128 }
1129
1130
1131 863 bool WritableCatalogManager::Commit(const bool stop_for_tweaks,
1132 const uint64_t manual_revision,
1133 manifest::Manifest *manifest) {
1134 WritableCatalog *root_catalog = reinterpret_cast<WritableCatalog *>(
1135 863 GetRootCatalog());
1136
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 root_catalog->SetDirty();
1137
1138 // set root catalog revision to manually provided number if available
1139
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 863 times.
863 if (manual_revision > 0) {
1140 const uint64_t revision = root_catalog->GetRevision();
1141 if (revision >= manual_revision) {
1142 LogCvmfs(kLogCatalog, kLogStderr,
1143 "Manual revision (%" PRIu64 ") must not be "
1144 "smaller than the current root catalog's (%" PRIu64
1145 "). Skipped!",
1146 manual_revision, revision);
1147 } else {
1148 // Gets incremented by FinalizeCatalog() afterwards!
1149 root_catalog->SetRevision(manual_revision - 1);
1150 }
1151 }
1152
1153 // do the actual catalog snapshotting and upload
1154
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 CatalogInfo root_catalog_info;
1155
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 if (getenv("_CVMFS_SERIALIZED_CATALOG_PROCESSING_") == NULL)
1156
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 root_catalog_info = SnapshotCatalogs(stop_for_tweaks);
1157 else
1158 root_catalog_info = SnapshotCatalogsSerialized(stop_for_tweaks);
1159
2/4
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 863 times.
863 if (spooler_->GetNumberOfErrors() > 0) {
1160 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalogs");
1161 return false;
1162 }
1163
1164 // .cvmfspublished export
1165
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 LogCvmfs(kLogCatalog, kLogVerboseMsg, "Committing repository manifest");
1166 863 set_base_hash(root_catalog_info.content_hash);
1167
1168 863 manifest->set_catalog_hash(root_catalog_info.content_hash);
1169 863 manifest->set_catalog_size(root_catalog_info.size);
1170
2/4
✓ Branch 2 taken 863 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 863 times.
✗ Branch 6 not taken.
863 manifest->set_root_path("");
1171 863 manifest->set_ttl(root_catalog_info.ttl);
1172 863 manifest->set_revision(root_catalog_info.revision);
1173
1174 863 return true;
1175 }
1176
1177
1178 /**
1179 * Handles the snapshotting of dirty (i.e. modified) catalogs while trying to
1180 * parallelize the compression and upload as much as possible. We use a parallel
1181 * depth first post order tree traversal based on 'continuations'.
1182 *
1183 * The idea is as follows:
1184 * 1. find all leaf-catalogs (i.e. dirty catalogs with no dirty children)
1185 * --> these can be processed and uploaded immediately and independently
1186 * see WritableCatalogManager::GetModifiedCatalogLeafs()
1187 * 2. annotate non-leaf catalogs with their number of dirty children
1188 * --> a finished child will notify it's parent and decrement this number
1189 * see WritableCatalogManager::CatalogUploadCallback()
1190 * 3. if a non-leaf catalog's dirty children number reaches 0, it is scheduled
1191 * for processing as well (continuation)
1192 * --> the parallel processing walks bottom-up through the catalog tree
1193 * see WritableCatalogManager::CatalogUploadCallback()
1194 * 4. when the root catalog is reached, we notify the main thread and return
1195 * --> done through a Future<> in WritableCatalogManager::SnapshotCatalogs
1196 *
1197 * Note: The catalog finalisation (see WritableCatalogManager::FinalizeCatalog)
1198 * happens in a worker thread (i.e. the callback method) for non-leaf
1199 * catalogs.
1200 *
1201 * TODO(rmeusel): since all leaf catalogs are finalized in the main thread, we
1202 * sacrifice some potential concurrency for simplicity.
1203 */
1204 863 WritableCatalogManager::CatalogInfo WritableCatalogManager::SnapshotCatalogs(
1205 const bool stop_for_tweaks) {
1206 // prepare environment for parallel processing
1207
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 Future<CatalogInfo> root_catalog_info_future;
1208 CatalogUploadContext upload_context;
1209 863 upload_context.root_catalog_info = &root_catalog_info_future;
1210 863 upload_context.stop_for_tweaks = stop_for_tweaks;
1211
1212
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 spooler_->RegisterListener(&WritableCatalogManager::CatalogUploadCallback,
1213 this, upload_context);
1214
1215 // find dirty leaf catalogs and annotate non-leaf catalogs (dirty child count)
1216 // post-condition: the entire catalog tree is ready for concurrent processing
1217 863 WritableCatalogList leafs_to_snapshot;
1218
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 GetModifiedCatalogLeafs(&leafs_to_snapshot);
1219
1220 // finalize and schedule the catalog processing
1221 863 WritableCatalogList::const_iterator i = leafs_to_snapshot.begin();
1222 863 const WritableCatalogList::const_iterator iend = leafs_to_snapshot.end();
1223
2/2
✓ Branch 2 taken 1191 times.
✓ Branch 3 taken 863 times.
2054 for (; i != iend; ++i) {
1224
1/2
✓ Branch 2 taken 1191 times.
✗ Branch 3 not taken.
1191 FinalizeCatalog(*i, stop_for_tweaks);
1225
1/2
✓ Branch 2 taken 1191 times.
✗ Branch 3 not taken.
1191 ScheduleCatalogProcessing(*i);
1226 }
1227
1228
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 LogCvmfs(kLogCatalog, kLogVerboseMsg, "waiting for upload of catalogs");
1229
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 const CatalogInfo &root_catalog_info = root_catalog_info_future.Get();
1230
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 spooler_->WaitForUpload();
1231
1232
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 spooler_->UnregisterListeners();
1233 863 return root_catalog_info;
1234 863 }
1235
1236
1237 1802 void WritableCatalogManager::FinalizeCatalog(WritableCatalog *catalog,
1238 const bool stop_for_tweaks) {
1239 // update meta information of this catalog
1240
1/3
✓ Branch 2 taken 1802 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1802 LogCvmfs(kLogCatalog, kLogVerboseMsg, "creating snapshot of catalog '%s'",
1241 3604 catalog->mountpoint().c_str());
1242
1243 1802 catalog->UpdateCounters();
1244 1802 catalog->UpdateLastModified();
1245 1802 catalog->IncrementRevision();
1246
1247 // update the previous catalog revision pointer
1248
2/2
✓ Branch 1 taken 863 times.
✓ Branch 2 taken 939 times.
1802 if (catalog->IsRoot()) {
1249
1/4
✓ Branch 2 taken 863 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
863 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1250 "setting '%s' as previous revision "
1251 "for root catalog",
1252 1726 base_hash().ToStringWithSuffix().c_str());
1253 863 catalog->SetPreviousRevision(base_hash());
1254 } else {
1255 // Multiple catalogs might query the parent concurrently
1256 939 SyncLock();
1257
1/2
✓ Branch 1 taken 939 times.
✗ Branch 2 not taken.
939 shash::Any hash_previous;
1258 uint64_t size_previous;
1259
1/2
✓ Branch 2 taken 939 times.
✗ Branch 3 not taken.
1878 const bool retval = catalog->parent()->FindNested(
1260
1/2
✓ Branch 1 taken 939 times.
✗ Branch 2 not taken.
1878 catalog->mountpoint(), &hash_previous, &size_previous);
1261
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 939 times.
939 assert(retval);
1262 939 SyncUnlock();
1263
1264
1/8
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 939 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
1878 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1265 "found '%s' as previous revision "
1266 "for nested catalog '%s'",
1267
1/2
✓ Branch 1 taken 939 times.
✗ Branch 2 not taken.
1878 hash_previous.ToStringWithSuffix().c_str(),
1268
1/2
✓ Branch 1 taken 939 times.
✗ Branch 2 not taken.
1878 catalog->mountpoint().c_str());
1269
1/2
✓ Branch 1 taken 939 times.
✗ Branch 2 not taken.
939 catalog->SetPreviousRevision(hash_previous);
1270 }
1271 1802 catalog->Commit();
1272
1273 // check if catalog has too many entries
1274 const uint64_t catalog_limit =
1275
2/2
✓ Branch 1 taken 863 times.
✓ Branch 2 taken 939 times.
1802 uint64_t(1000) * uint64_t((catalog->IsRoot() ? root_kcatalog_limit_
1276 939 : nested_kcatalog_limit_));
1277 1802 if ((catalog_limit > 0)
1278
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 1802 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 1802 times.
1802 && (catalog->GetCounters().GetSelfEntries() > catalog_limit)) {
1279 LogCvmfs(kLogCatalog, kLogStderr,
1280 "%s: catalog at %s has more than %lu entries (%lu). "
1281 "Large catalogs stress the CernVM-FS transport infrastructure. "
1282 "Please split it into nested catalogs or increase the limit.",
1283 enforce_limits_ ? "FATAL" : "WARNING",
1284 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1285 catalog_limit, catalog->GetCounters().GetSelfEntries());
1286 if (enforce_limits_)
1287 PANIC(kLogStderr, "catalog at %s has more than %u entries (%u). ",
1288 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1289 catalog_limit, catalog->GetCounters().GetSelfEntries());
1290 }
1291
1292 // allow for manual adjustments in the catalog
1293
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1802 times.
1802 if (stop_for_tweaks) {
1294 LogCvmfs(kLogCatalog, kLogStdout,
1295 "Allowing for tweaks in %s at %s "
1296 "(hit return to continue)",
1297 catalog->database_path().c_str(), catalog->mountpoint().c_str());
1298 const int read_char = getchar();
1299 assert(read_char != EOF);
1300 }
1301
1302 // compaction of bloated catalogs (usually after high database churn)
1303 1802 catalog->VacuumDatabaseIfNecessary();
1304 1802 }
1305
1306
1307 1802 void WritableCatalogManager::ScheduleCatalogProcessing(
1308 WritableCatalog *catalog) {
1309 {
1310 1802 const MutexLockGuard guard(catalog_processing_lock_);
1311 // register catalog object for WritableCatalogManager::CatalogUploadCallback
1312
2/4
✓ Branch 1 taken 1802 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1802 times.
✗ Branch 5 not taken.
1802 catalog_processing_map_[catalog->database_path()] = catalog;
1313 1802 }
1314
1/2
✓ Branch 2 taken 1802 times.
✗ Branch 3 not taken.
1802 spooler_->ProcessCatalog(catalog->database_path());
1315 1802 }
1316
1317 /**
1318 * Copy catalog to local cache.server
1319 * Must be an atomic write into the cache_dir
1320 * As such: create a temporary copy in cache_dir/txn and then do a
1321 * `rename` (which is atomic) to the actual cache path
1322 *
1323 * @returns true on success, otherwise false
1324 */
1325 bool WritableCatalogManager::CopyCatalogToLocalCache(
1326 const upload::SpoolerResult &result) {
1327 std::string tmp_catalog_path;
1328 const std::string cache_catalog_path = dir_cache_ + "/"
1329 + result.content_hash
1330 .MakePathWithoutSuffix();
1331 FILE *fcatalog = CreateTempFile(dir_cache_ + "/txn/catalog", 0666, "w",
1332 &tmp_catalog_path);
1333 if (!fcatalog) {
1334 PANIC(kLogDebug | kLogStderr,
1335 "Creating file for temporary catalog failed: %s",
1336 tmp_catalog_path.c_str());
1337 }
1338 CopyPath2File(result.local_path.c_str(), fcatalog);
1339 (void)fclose(fcatalog);
1340
1341 if (rename(tmp_catalog_path.c_str(), cache_catalog_path.c_str()) != 0) {
1342 PANIC(kLogDebug | kLogStderr, "Failed to copy catalog from %s to cache %s",
1343 result.local_path.c_str(), cache_catalog_path.c_str());
1344 }
1345 return true;
1346 }
1347
1348 1802 void WritableCatalogManager::CatalogUploadCallback(
1349 const upload::SpoolerResult &result,
1350 const CatalogUploadContext catalog_upload_context) {
1351
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1802 times.
1802 if (result.return_code != 0) {
1352 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1353 result.local_path.c_str(), result.return_code);
1354 }
1355
1356 // retrieve the catalog object based on the callback information
1357 // see WritableCatalogManager::ScheduleCatalogProcessing()
1358 1802 WritableCatalog *catalog = NULL;
1359 {
1360 1802 const MutexLockGuard guard(catalog_processing_lock_);
1361 const std::map<std::string, WritableCatalog *>::iterator c =
1362
1/2
✓ Branch 1 taken 1802 times.
✗ Branch 2 not taken.
1802 catalog_processing_map_.find(result.local_path);
1363
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 1802 times.
1802 assert(c != catalog_processing_map_.end());
1364 1802 catalog = c->second;
1365 1802 }
1366
1367 1802 const uint64_t catalog_size = GetFileSize(result.local_path);
1368
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1802 times.
1802 assert(catalog_size > 0);
1369
1370
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1802 times.
1802 if (UseLocalCache()) {
1371 CopyCatalogToLocalCache(result);
1372 }
1373
1374 1802 SyncLock();
1375
2/2
✓ Branch 1 taken 939 times.
✓ Branch 2 taken 863 times.
1802 if (catalog->HasParent()) {
1376 // finalized nested catalogs will update their parent's pointer and schedule
1377 // them for processing (continuation) if the 'dirty children count' == 0
1378 939 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1379 939 WritableCatalog *parent = catalog->GetWritableParent();
1380
1381
2/4
✓ Branch 1 taken 939 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 939 times.
✗ Branch 5 not taken.
939 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1382 939 result.content_hash,
1383 catalog_size,
1384 939 catalog->delta_counters_);
1385 939 catalog->delta_counters_.SetZero();
1386
1387 const int remaining_dirty_children = catalog->GetWritableParent()
1388 939 ->DecrementDirtyChildren();
1389
1390 939 SyncUnlock();
1391
1392 // continuation of the dirty catalog tree traversal
1393 // see WritableCatalogManager::SnapshotCatalogs()
1394
2/2
✓ Branch 0 taken 611 times.
✓ Branch 1 taken 328 times.
939 if (remaining_dirty_children == 0) {
1395 611 FinalizeCatalog(parent, catalog_upload_context.stop_for_tweaks);
1396 611 ScheduleCatalogProcessing(parent);
1397 }
1398
1399
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 } else if (catalog->IsRoot()) {
1400 // once the root catalog is reached, we are done with processing and report
1401 // back to the main via a Future<> and provide the necessary information
1402
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 CatalogInfo root_catalog_info;
1403 863 root_catalog_info.size = catalog_size;
1404
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 root_catalog_info.ttl = catalog->GetTTL();
1405 863 root_catalog_info.content_hash = result.content_hash;
1406
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 root_catalog_info.revision = catalog->GetRevision();
1407
1/2
✓ Branch 1 taken 863 times.
✗ Branch 2 not taken.
863 catalog_upload_context.root_catalog_info->Set(root_catalog_info);
1408 863 SyncUnlock();
1409 } else {
1410 PANIC(kLogStderr, "inconsistent state detected");
1411 }
1412 1802 }
1413
1414
1415 /**
1416 * Finds dirty catalogs that can be snapshot right away and annotates all the
1417 * other catalogs with their number of dirty descendants.
1418 * Note that there is a convenience wrapper to start the recursion:
1419 * WritableCatalogManager::GetModifiedCatalogLeafs()
1420 *
1421 * @param catalog the catalog for this recursion step
1422 * @param result the result list to be appended to
1423 * @return true if 'catalog' is dirty
1424 */
1425 1817 bool WritableCatalogManager::GetModifiedCatalogLeafsRecursively(
1426 Catalog *catalog, WritableCatalogList *result) const {
1427 1817 WritableCatalog *wr_catalog = static_cast<WritableCatalog *>(catalog);
1428
1429 // Look for dirty catalogs in the descendants of *catalog
1430 1817 int dirty_children = 0;
1431
1/2
✓ Branch 1 taken 1817 times.
✗ Branch 2 not taken.
1817 CatalogList children = wr_catalog->GetChildren();
1432 1817 CatalogList::const_iterator i = children.begin();
1433 1817 const CatalogList::const_iterator iend = children.end();
1434
2/2
✓ Branch 2 taken 939 times.
✓ Branch 3 taken 1817 times.
2756 for (; i != iend; ++i) {
1435
2/4
✓ Branch 2 taken 939 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 939 times.
✗ Branch 5 not taken.
939 if (GetModifiedCatalogLeafsRecursively(*i, result)) {
1436 939 ++dirty_children;
1437 }
1438 }
1439
1440 // a catalog is dirty if itself or one of its children has changed
1441 // a leaf catalog doesn't have any dirty children
1442 1817 wr_catalog->set_dirty_children(dirty_children);
1443
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 1817 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1817 const bool is_dirty = wr_catalog->IsDirty() || dirty_children > 0;
1444 1817 const bool is_leaf = dirty_children == 0;
1445
3/4
✓ Branch 0 taken 1817 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1206 times.
✓ Branch 3 taken 611 times.
1817 if (is_dirty && is_leaf) {
1446
1/2
✓ Branch 1 taken 1206 times.
✗ Branch 2 not taken.
1206 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1447 }
1448
1449 1817 return is_dirty;
1450 1817 }
1451
1452
1453 void WritableCatalogManager::DoBalance() {
1454 CatalogList catalog_list = GetCatalogs();
1455 reverse(catalog_list.begin(), catalog_list.end());
1456 for (unsigned i = 0; i < catalog_list.size(); ++i) {
1457 FixWeight(static_cast<WritableCatalog *>(catalog_list[i]));
1458 }
1459 }
1460
1461 void WritableCatalogManager::FixWeight(WritableCatalog *catalog) {
1462 // firstly check underflow because they can provoke overflows
1463 if (catalog->GetNumEntries() < min_weight_ && !catalog->IsRoot()
1464 && catalog->IsAutogenerated()) {
1465 LogCvmfs(kLogCatalog, kLogStdout,
1466 "Deleting an autogenerated catalog in '%s'",
1467 catalog->mountpoint().c_str());
1468 // Remove the .cvmfscatalog and .cvmfsautocatalog files first
1469 const string path = catalog->mountpoint().ToString();
1470 catalog->RemoveEntry(path + "/.cvmfscatalog");
1471 catalog->RemoveEntry(path + "/.cvmfsautocatalog");
1472 // Remove the actual catalog
1473 const string catalog_path = catalog->mountpoint().ToString().substr(1);
1474 RemoveNestedCatalog(catalog_path);
1475 } else if (catalog->GetNumEntries() > max_weight_) {
1476 CatalogBalancer<WritableCatalogManager> catalog_balancer(this);
1477 catalog_balancer.Balance(catalog);
1478 }
1479 }
1480
1481
1482 //****************************************************************************
1483 // Workaround -- Serialized Catalog Committing
1484
1485 int WritableCatalogManager::GetModifiedCatalogsRecursively(
1486 const Catalog *catalog, WritableCatalogList *result) const {
1487 // A catalog must be snapshot, if itself or one of it's descendants is dirty.
1488 // So we traverse the catalog tree recursively and look for dirty catalogs
1489 // on the way.
1490 const WritableCatalog *wr_catalog = static_cast<const WritableCatalog *>(
1491 catalog);
1492 // This variable will contain the number of dirty catalogs in the sub tree
1493 // with *catalog as it's root.
1494 int dirty_catalogs = (wr_catalog->IsDirty()) ? 1 : 0;
1495
1496 // Look for dirty catalogs in the descendants of *catalog
1497 CatalogList children = wr_catalog->GetChildren();
1498 for (CatalogList::const_iterator i = children.begin(), iEnd = children.end();
1499 i != iEnd; ++i) {
1500 dirty_catalogs += GetModifiedCatalogsRecursively(*i, result);
1501 }
1502
1503 // If we found a dirty catalog in the checked sub tree, the root (*catalog)
1504 // must be snapshot and ends up in the result list
1505 if (dirty_catalogs > 0)
1506 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1507
1508 // tell the upper layer about number of catalogs
1509 return dirty_catalogs;
1510 }
1511
1512
1513 void WritableCatalogManager::CatalogUploadSerializedCallback(
1514 const upload::SpoolerResult &result, const CatalogUploadContext unused) {
1515 if (result.return_code != 0) {
1516 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1517 result.local_path.c_str(), result.return_code);
1518 }
1519
1520 if (UseLocalCache()) {
1521 CopyCatalogToLocalCache(result);
1522 }
1523
1524 unlink(result.local_path.c_str());
1525 }
1526
1527
1528 WritableCatalogManager::CatalogInfo
1529 WritableCatalogManager::SnapshotCatalogsSerialized(const bool stop_for_tweaks) {
1530 LogCvmfs(kLogCvmfs, kLogStdout, "Serialized committing of file catalogs...");
1531 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetDirty();
1532 WritableCatalogList catalogs_to_snapshot;
1533 GetModifiedCatalogs(&catalogs_to_snapshot);
1534 CatalogUploadContext unused;
1535 unused.root_catalog_info = NULL;
1536 unused.stop_for_tweaks = false;
1537 spooler_->RegisterListener(
1538 &WritableCatalogManager::CatalogUploadSerializedCallback, this, unused);
1539
1540 CatalogInfo root_catalog_info;
1541 WritableCatalogList::const_iterator i = catalogs_to_snapshot.begin();
1542 const WritableCatalogList::const_iterator iend = catalogs_to_snapshot.end();
1543 for (; i != iend; ++i) {
1544 FinalizeCatalog(*i, stop_for_tweaks);
1545
1546 // Compress and upload catalog
1547 shash::Any hash_catalog(spooler_->GetHashAlgorithm(),
1548 shash::kSuffixCatalog);
1549 if (!zlib::CompressPath2Null((*i)->database_path(), &hash_catalog)) {
1550 PANIC(kLogStderr, "could not compress catalog %s",
1551 (*i)->mountpoint().ToString().c_str());
1552 }
1553
1554 const int64_t catalog_size = GetFileSize((*i)->database_path());
1555 assert(catalog_size > 0);
1556
1557 if ((*i)->HasParent()) {
1558 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1559 WritableCatalog *parent = (*i)->GetWritableParent();
1560 parent->UpdateNestedCatalog((*i)->mountpoint().ToString(), hash_catalog,
1561 catalog_size, (*i)->delta_counters_);
1562 (*i)->delta_counters_.SetZero();
1563 } else if ((*i)->IsRoot()) {
1564 root_catalog_info.size = catalog_size;
1565 root_catalog_info.ttl = (*i)->GetTTL();
1566 root_catalog_info.content_hash = hash_catalog;
1567 root_catalog_info.revision = (*i)->GetRevision();
1568 } else {
1569 PANIC(kLogStderr, "inconsistent state detected");
1570 }
1571
1572 spooler_->ProcessCatalog((*i)->database_path());
1573 }
1574 spooler_->WaitForUpload();
1575
1576 spooler_->UnregisterListeners();
1577 return root_catalog_info;
1578 }
1579
1580 void WritableCatalogManager::SetupSingleCatalogUploadCallback()
1581 {
1582 spooler_->RegisterListener(&WritableCatalogManager::SingleCatalogUploadCallback, this);
1583 }
1584
1585 void WritableCatalogManager::RemoveSingleCatalogUploadCallback()
1586 {
1587 spooler_->WaitForUpload(); // wait for all outstanding jobs to finish before tearing it down
1588 spooler_->UnregisterListeners();
1589 pending_catalogs_ = {}; // whatever we couldn't process, leave it to the Commit
1590 }
1591
1592 void WritableCatalogManager::AddCatalogToQueue(const std::string &path)
1593 {
1594 SyncLock();
1595 WritableCatalog *catalog = NULL;
1596 bool const retval = FindCatalog(MakeRelativePath(path), &catalog, NULL);
1597 assert(retval);
1598 assert(catalog);
1599 catalog->SetDirty(); // ensure it's dirty so its parent will wait for it
1600 SyncUnlock();
1601 pending_catalogs_.push_back(catalog);
1602 }
1603
1604 void WritableCatalogManager::ScheduleReadyCatalogs()
1605 {
1606 // best effort to schedule as many catalogs for upload as possible
1607 for (auto it = pending_catalogs_.begin(); it != pending_catalogs_.end(); ) {
1608 if ((*it)->dirty_children() == 0) {
1609 FinalizeCatalog(*it, false /* stop_for_tweaks */);
1610 ScheduleCatalogProcessing(*it);
1611 LogCvmfs(kLogCatalog, kLogVerboseMsg, "scheduled %s for processing", (*it)->mountpoint().c_str());
1612 it = pending_catalogs_.erase(it);
1613 } else {
1614 ++it;
1615 }
1616 }
1617 }
1618
1619 // Callback for uploading a single catalog, similar to CatalogUploadCallback. The main difference
1620 // is that this callback would not trigger processing of the parent
1621 void WritableCatalogManager::SingleCatalogUploadCallback(
1622 const upload::SpoolerResult &result) {
1623 if (result.return_code != 0) {
1624 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1625 result.local_path.c_str(), result.return_code);
1626 }
1627
1628 // retrieve the catalog object based on the callback information
1629 // see WritableCatalogManager::ScheduleCatalogProcessing()
1630 WritableCatalog *catalog = NULL;
1631 {
1632 MutexLockGuard const guard(catalog_processing_lock_);
1633 std::map<std::string, WritableCatalog*>::iterator const c =
1634 catalog_processing_map_.find(result.local_path);
1635 assert(c != catalog_processing_map_.end());
1636 catalog = c->second;
1637 }
1638
1639 uint64_t const catalog_size = GetFileSize(result.local_path);
1640 assert(catalog_size > 0);
1641
1642 SyncLock();
1643 if (catalog->HasParent()) {
1644 // finalized nested catalogs will update their parent's pointer
1645 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1646 WritableCatalog *parent = catalog->GetWritableParent();
1647
1648 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1649 result.content_hash,
1650 catalog_size,
1651 catalog->delta_counters_);
1652 parent->DecrementDirtyChildren();
1653 catalog->delta_counters_.SetZero();
1654 }
1655 // JUMP: detach the catalog after uploading to free sqlite related resources
1656 DetachCatalog(catalog);
1657 SyncUnlock();
1658 }
1659 // using the given list of dirs, fetch all relevant catalogs
1660 void WritableCatalogManager::LoadCatalogs(const std::string &base_path, const std::unordered_set<std::string> &dirs)
1661 {
1662 // mount everything up to "base_path" first (this would be our lease_path typically)
1663 Catalog *base_catalog;
1664 MountSubtree(PathString(base_path), NULL /* entry_point */, true /* is_listable */, &base_catalog);
1665
1666 // start up the downloader
1667 CatalogDownloadContext context;
1668 context.dirs = &dirs;
1669 catalog_download_pipeline_ = new CatalogDownloadPipeline(static_cast<SimpleCatalogManager*>(this));
1670 catalog_download_pipeline_->RegisterListener(&WritableCatalogManager::CatalogDownloadCallback, this, context);
1671 catalog_download_pipeline_->Spawn();
1672
1673 Catalog::NestedCatalogList nested_catalogs = base_catalog->ListNestedCatalogs();
1674 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1675 // schedule relevant child nested catalogs for download
1676 std::string const mountpoint = it->mountpoint.ToString();
1677 if (dirs.find(mountpoint) != dirs.end()) {
1678 Catalog *catalog = CreateCatalog(it->mountpoint, it->hash, NULL /* parent */);
1679 {
1680 MutexLockGuard const guard(catalog_download_lock_);
1681 catalog_download_map_.insert(std::make_pair(it->hash.ToString(), catalog));
1682 }
1683 catalog_download_pipeline_->Process(it->hash);
1684 }
1685 }
1686
1687 catalog_download_pipeline_->WaitFor();
1688 delete catalog_download_pipeline_; // terminate all the threads
1689 }
1690
1691 bool WritableCatalogManager::LookupDirEntry(const string &path,
1692 const LookupOptions options,
1693 DirectoryEntry *dirent) {
1694 SyncLock();
1695 bool const exists = LookupPath(path, options, dirent);
1696 SyncUnlock();
1697 return exists;
1698 }
1699
1700 void WritableCatalogManager::CatalogHashSerializedCallback(
1701 const CompressHashResult &result)
1702 {
1703 MutexLockGuard const guard(catalog_hash_lock_);
1704 catalog_hash_map_[result.path] = result.hash;
1705 }
1706
1707 void WritableCatalogManager::CatalogDownloadCallback(
1708 const CatalogDownloadResult &result,
1709 CatalogDownloadContext context)
1710 {
1711 Catalog *downloaded_catalog;
1712 {
1713 MutexLockGuard const guard(catalog_download_lock_);
1714 auto it = catalog_download_map_.find(result.hash);
1715 assert(it != catalog_download_map_.end());
1716 downloaded_catalog = it->second;
1717 }
1718
1719 if (!downloaded_catalog->OpenDatabase(result.db_path)) {
1720 LogCvmfs(kLogCvmfs, kLogDebug, "failed to initialize catalog");
1721 delete downloaded_catalog;
1722 return;
1723 }
1724
1725 Catalog::NestedCatalogList nested_catalogs = downloaded_catalog->ListNestedCatalogs();
1726 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1727 // schedule relevant child nested catalogs for download
1728 if (context.dirs->find(it->mountpoint.ToString()) != context.dirs->end()) {
1729 Catalog *child_catalog = CreateCatalog(it->mountpoint, it->hash, NULL /* parent */);
1730 {
1731 MutexLockGuard const guard(catalog_download_lock_);
1732 catalog_download_map_.insert(std::make_pair(it->hash.ToString(), child_catalog));
1733 }
1734 catalog_download_pipeline_->Process(it->hash);
1735 }
1736 }
1737 delete downloaded_catalog;
1738 }
1739
1740
1741
1742 } // namespace catalog
1743