GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/catalog_mgr_rw.cc
Date: 2026-05-19 11:45:12
Exec Total Coverage
Lines: 463 832 55.6%
Branches: 356 1220 29.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM file system.
3 */
4
5 #include "catalog_mgr_rw.h"
6
7 #include <inttypes.h>
8 #include <unistd.h>
9
10 #include <cassert>
11 #include <cstdio>
12 #include <cstdlib>
13 #include <string>
14
15 #include "catalog_balancer.h"
16 #include "catalog_rw.h"
17 #include "manifest.h"
18 #include "statistics.h"
19 #include "upload.h"
20 #include "util/exception.h"
21 #include "util/logging.h"
22 #include "util/posix.h"
23 #include "util/smalloc.h"
24
25 using namespace std; // NOLINT
26
27 namespace catalog {
28
29 1020 WritableCatalogManager::WritableCatalogManager(
30 const shash::Any &base_hash,
31 const std::string &stratum0,
32 const string &dir_temp,
33 upload::Spooler *spooler,
34 download::DownloadManager *download_manager,
35 bool enforce_limits,
36 const unsigned nested_kcatalog_limit,
37 const unsigned root_kcatalog_limit,
38 const unsigned file_mbyte_limit,
39 perf::Statistics *statistics,
40 bool is_balanceable,
41 unsigned max_weight,
42 unsigned min_weight,
43 1020 const std::string &dir_cache)
44 : SimpleCatalogManager(base_hash, stratum0, dir_temp, download_manager,
45 statistics, false, dir_cache,
46 true /* copy to tmpdir */)
47 1020 , spooler_(spooler)
48 1020 , enforce_limits_(enforce_limits)
49 1020 , nested_kcatalog_limit_(nested_kcatalog_limit)
50 1020 , root_kcatalog_limit_(root_kcatalog_limit)
51 1020 , file_mbyte_limit_(file_mbyte_limit)
52 1020 , is_balanceable_(is_balanceable)
53 1020 , max_weight_(max_weight)
54 1020 , min_weight_(min_weight)
55 1020 , balance_weight_(max_weight / 2) {
56 1020 sync_lock_ = reinterpret_cast<pthread_mutex_t *>(
57 1020 smalloc(sizeof(pthread_mutex_t)));
58 1020 int retval = pthread_mutex_init(sync_lock_, NULL);
59
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1020 times.
1020 assert(retval == 0);
60 1020 catalog_processing_lock_ = reinterpret_cast<pthread_mutex_t *>(
61 1020 smalloc(sizeof(pthread_mutex_t)));
62 1020 retval = pthread_mutex_init(catalog_processing_lock_, NULL);
63
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1020 times.
1020 assert(retval == 0);
64 1020 }
65
66
67 4076 WritableCatalogManager::~WritableCatalogManager() {
68 2038 pthread_mutex_destroy(sync_lock_);
69 2038 free(sync_lock_);
70 2038 pthread_mutex_destroy(catalog_processing_lock_);
71 2038 free(catalog_processing_lock_);
72 4076 }
73
74
75 /**
76 * This method is virtual in AbstractCatalogManager. It returns a new catalog
77 * structure in the form the different CatalogManagers need it.
78 * In this case it returns a stub for a WritableCatalog.
79 * @param mountpoint the mount point of the catalog stub to create
80 * @param catalog_hash the content hash of the catalog to create
81 * @param parent_catalog the parent of the catalog stub to create
82 * @return a pointer to the catalog stub structure created
83 */
84 2359 Catalog *WritableCatalogManager::CreateCatalog(const PathString &mountpoint,
85 const shash::Any &catalog_hash,
86 Catalog *parent_catalog) {
87 4718 return new WritableCatalog(mountpoint.ToString(), catalog_hash,
88
2/4
✓ Branch 1 taken 2359 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2359 times.
✗ Branch 5 not taken.
4718 parent_catalog);
89 }
90
91
92 2359 void WritableCatalogManager::ActivateCatalog(Catalog *catalog) {
93 2359 catalog->TakeDatabaseFileOwnership();
94 2359 }
95
96
97 /**
98 * This method is invoked if we create a completely new repository.
99 * The new root catalog will already contain a root entry.
100 * It is uploaded by a Forklift to the upstream storage.
101 * @return true on success, false otherwise
102 */
103 603 manifest::Manifest *WritableCatalogManager::CreateRepository(
104 const string &dir_temp,
105 const bool volatile_content,
106 const std::string &voms_authz,
107 upload::Spooler *spooler) {
108 // Create a new root catalog at file_path
109
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 const string file_path = dir_temp + "/new_root_catalog";
110
111 603 const shash::Algorithms hash_algorithm = spooler->GetHashAlgorithm();
112
113 // A newly created catalog always needs a root entry
114 // we create and configure this here
115
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 DirectoryEntry root_entry;
116 603 root_entry.inode_ = DirectoryEntry::kInvalidInode;
117 603 root_entry.mode_ = 16877;
118 603 root_entry.size_ = 4096;
119 603 root_entry.mtime_ = time(NULL);
120 603 root_entry.uid_ = getuid();
121 603 root_entry.gid_ = getgid();
122
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 root_entry.checksum_ = shash::Any(hash_algorithm);
123 603 root_entry.linkcount_ = 2;
124
1/2
✓ Branch 2 taken 603 times.
✗ Branch 3 not taken.
603 const string root_path = "";
125
126 // Create the database schema and the initial root entry
127 {
128 const UniquePtr<CatalogDatabase> new_clg_db(
129
2/4
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 603 times.
✗ Branch 5 not taken.
603 CatalogDatabase::Create(file_path));
130 603 if (!new_clg_db.IsValid()
131
4/8
✓ Branch 0 taken 603 times.
✗ Branch 1 not taken.
✓ Branch 4 taken 603 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 603 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 603 times.
603 || !new_clg_db->InsertInitialValues(root_path, volatile_content,
132 voms_authz, root_entry)) {
133 LogCvmfs(kLogCatalog, kLogStderr, "creation of catalog '%s' failed",
134 file_path.c_str());
135 return NULL;
136 }
137
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 }
138
139 // Compress root catalog;
140
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 const int64_t catalog_size = GetFileSize(file_path);
141
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 603 times.
603 if (catalog_size < 0) {
142 unlink(file_path.c_str());
143 return NULL;
144 }
145
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 const string file_path_compressed = file_path + ".compressed";
146
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 shash::Any hash_catalog(hash_algorithm, shash::kSuffixCatalog);
147
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 const bool retval = zlib::CompressPath2Path(file_path, file_path_compressed,
148 &hash_catalog);
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 603 times.
603 if (!retval) {
150 LogCvmfs(kLogCatalog, kLogStderr, "compression of catalog '%s' failed",
151 file_path.c_str());
152 unlink(file_path.c_str());
153 return NULL;
154 }
155 603 unlink(file_path.c_str());
156
157 // Create manifest
158
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 const string manifest_path = dir_temp + "/manifest";
159 manifest::Manifest *manifest = new manifest::Manifest(hash_catalog,
160
3/6
✓ Branch 2 taken 603 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 603 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 603 times.
✗ Branch 9 not taken.
603 catalog_size, "");
161
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 603 times.
603 if (!voms_authz.empty()) {
162 manifest->set_has_alt_catalog_path(true);
163 }
164
165 // Upload catalog
166
3/6
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 603 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 603 times.
✗ Branch 8 not taken.
603 spooler->Upload(file_path_compressed, "data/" + hash_catalog.MakePath());
167
1/2
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
603 spooler->WaitForUpload();
168 603 unlink(file_path_compressed.c_str());
169
2/4
✓ Branch 1 taken 603 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 603 times.
603 if (spooler->GetNumberOfErrors() > 0) {
170 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalog %s",
171 file_path_compressed.c_str());
172 delete manifest;
173 return NULL;
174 }
175
176 603 return manifest;
177 603 }
178
179
180 /**
181 * Retrieve the catalog containing the given path.
182 * Other than AbstractCatalogManager::FindCatalog() this mounts nested
183 * catalogs if necessary and returns WritableCatalog objects.
184 * Furthermore it optionally returns the looked-up DirectoryEntry.
185 *
186 * @param path the path to look for
187 * @param result the retrieved catalog (as a pointer)
188 * @param dirent is set to looked up DirectoryEntry for 'path' if non-NULL
189 * @return true if catalog was found
190 */
191 10611 bool WritableCatalogManager::FindCatalog(const string &path,
192 WritableCatalog **result,
193 DirectoryEntry *dirent) {
194
1/2
✓ Branch 1 taken 10611 times.
✗ Branch 2 not taken.
10611 const PathString ps_path(path);
195
196
1/2
✓ Branch 1 taken 10611 times.
✗ Branch 2 not taken.
10611 Catalog *best_fit = AbstractCatalogManager<Catalog>::FindCatalog(ps_path);
197
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10611 times.
10611 assert(best_fit != NULL);
198 10611 Catalog *catalog = NULL;
199
1/2
✓ Branch 1 taken 10611 times.
✗ Branch 2 not taken.
10611 const bool retval = MountSubtree(ps_path, best_fit, true /* is_listable */,
200 &catalog);
201
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10611 times.
10611 if (!retval)
202 return false;
203
204
1/2
✓ Branch 1 taken 10611 times.
✗ Branch 2 not taken.
10611 catalog::DirectoryEntry dummy;
205
2/2
✓ Branch 0 taken 5561 times.
✓ Branch 1 taken 5050 times.
10611 if (NULL == dirent) {
206 5561 dirent = &dummy;
207 }
208
1/2
✓ Branch 1 taken 10611 times.
✗ Branch 2 not taken.
10611 const bool found = catalog->LookupPath(ps_path, dirent);
209
6/8
✓ Branch 0 taken 10585 times.
✓ Branch 1 taken 26 times.
✓ Branch 3 taken 10585 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 10585 times.
✓ Branch 7 taken 26 times.
✓ Branch 8 taken 10585 times.
10611 if (!found || !catalog->IsWritable())
210 26 return false;
211
212 10585 *result = static_cast<WritableCatalog *>(catalog);
213 10585 return true;
214 10611 }
215
216
217 WritableCatalog *WritableCatalogManager::GetHostingCatalog(
218 const std::string &path) {
219 WritableCatalog *result = NULL;
220 const bool retval = FindCatalog(MakeRelativePath(path), &result, NULL);
221 if (!retval)
222 return NULL;
223 return result;
224 }
225
226
227 /**
228 * Remove the given file from the catalogs.
229 * @param file_path the full path to the file to be removed
230 * @return true on success, false otherwise
231 */
232 164 void WritableCatalogManager::RemoveFile(const std::string &path) {
233
1/2
✓ Branch 1 taken 164 times.
✗ Branch 2 not taken.
164 const string file_path = MakeRelativePath(path);
234
1/2
✓ Branch 1 taken 164 times.
✗ Branch 2 not taken.
164 const string parent_path = GetParentPath(file_path);
235
236 164 SyncLock();
237 WritableCatalog *catalog;
238
2/4
✓ Branch 1 taken 164 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 164 times.
164 if (!FindCatalog(parent_path, &catalog)) {
239 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
240 file_path.c_str());
241 }
242
243
1/2
✓ Branch 1 taken 164 times.
✗ Branch 2 not taken.
164 catalog->RemoveEntry(file_path);
244 164 SyncUnlock();
245 164 }
246
247
248 /**
249 * Remove the given directory from the catalogs.
250 * @param directory_path the full path to the directory to be removed
251 * @return true on success, false otherwise
252 */
253 172 void WritableCatalogManager::RemoveDirectory(const std::string &path) {
254
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 const string directory_path = MakeRelativePath(path);
255
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 const string parent_path = GetParentPath(directory_path);
256
257 172 SyncLock();
258 WritableCatalog *catalog;
259
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 DirectoryEntry parent_entry;
260
2/4
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 172 times.
172 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
261 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
262 directory_path.c_str());
263 }
264
265 172 parent_entry.set_linkcount(parent_entry.linkcount() - 1);
266
267
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 catalog->RemoveEntry(directory_path);
268
1/2
✓ Branch 1 taken 172 times.
✗ Branch 2 not taken.
172 catalog->UpdateEntry(parent_entry, parent_path);
269
2/2
✓ Branch 1 taken 53 times.
✓ Branch 2 taken 119 times.
172 if (parent_entry.IsNestedCatalogRoot()) {
270
1/2
✓ Branch 2 taken 53 times.
✗ Branch 3 not taken.
53 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
271 parent_path.c_str());
272 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
273 53 catalog->parent());
274 53 parent_entry.set_is_nested_catalog_mountpoint(true);
275 53 parent_entry.set_is_nested_catalog_root(false);
276
1/2
✓ Branch 1 taken 53 times.
✗ Branch 2 not taken.
53 parent_catalog->UpdateEntry(parent_entry, parent_path);
277 }
278 172 SyncUnlock();
279 172 }
280
281 /**
282 * Clone the file called `source` changing its name into `destination`, the
283 * source file is keep intact.
284 * @params destination, the name of the new file, complete path
285 * @params source, the name of the file to clone, which must be already in the
286 * repository
287 * @return void
288 */
289 void WritableCatalogManager::Clone(const std::string destination,
290 const std::string source) {
291 const std::string relative_source = MakeRelativePath(source);
292
293 DirectoryEntry source_dirent;
294 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
295 PANIC(kLogStderr, "catalog for file '%s' cannot be found, aborting",
296 source.c_str());
297 }
298 if (source_dirent.IsDirectory()) {
299 PANIC(kLogStderr, "Trying to clone a directory: '%s', aborting",
300 source.c_str());
301 }
302
303 // if the file is already there we remove it and we add it back
304 DirectoryEntry check_dirent;
305 const bool destination_already_present = LookupPath(
306 MakeRelativePath(destination), kLookupDefault, &check_dirent);
307 if (destination_already_present) {
308 this->RemoveFile(destination);
309 }
310
311 DirectoryEntry destination_dirent(source_dirent);
312 std::string destination_dirname;
313 std::string destination_filename;
314 SplitPath(destination, &destination_dirname, &destination_filename);
315
316 destination_dirent.name_.Assign(
317 NameString(destination_filename.c_str(), destination_filename.length()));
318
319 // TODO(jblomer): clone is used by tarball engine and should eventually
320 // support extended attributes
321 this->AddFile(destination_dirent, empty_xattrs, destination_dirname);
322 }
323
324
325 /**
326 * Copies an entire directory tree from the existing from_dir to the
327 * non-existing to_dir. The destination's parent directory must exist. On the
328 * catalog level, the new entries will be identical to the old ones except
329 * for their path hash fields.
330 */
331 260 void WritableCatalogManager::CloneTree(const std::string &from_dir,
332 const std::string &to_dir) {
333 // Sanitize input paths
334
6/6
✓ Branch 1 taken 208 times.
✓ Branch 2 taken 52 times.
✓ Branch 4 taken 26 times.
✓ Branch 5 taken 182 times.
✓ Branch 6 taken 78 times.
✓ Branch 7 taken 182 times.
260 if (from_dir.empty() || to_dir.empty())
335 78 PANIC(kLogStderr, "clone tree from or to root impossible");
336
337
1/2
✓ Branch 1 taken 182 times.
✗ Branch 2 not taken.
182 const std::string relative_source = MakeRelativePath(from_dir);
338
1/2
✓ Branch 1 taken 182 times.
✗ Branch 2 not taken.
182 const std::string relative_dest = MakeRelativePath(to_dir);
339
340
2/2
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 156 times.
182 if (relative_source == relative_dest) {
341 26 PANIC(kLogStderr, "cannot clone tree into itself ('%s')", to_dir.c_str());
342 }
343
4/7
✓ Branch 1 taken 156 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 156 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 26 times.
✓ Branch 8 taken 130 times.
156 if (HasPrefix(relative_dest, relative_source + "/", false /*ignore_case*/)) {
344 26 PANIC(kLogStderr,
345 "cannot clone tree into sub directory of source '%s' --> '%s'",
346 from_dir.c_str(), to_dir.c_str());
347 }
348
349
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 DirectoryEntry source_dirent;
350
3/4
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26 times.
✓ Branch 4 taken 104 times.
130 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
351 26 PANIC(kLogStderr, "path '%s' cannot be found, aborting", from_dir.c_str());
352 }
353
2/2
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 78 times.
104 if (!source_dirent.IsDirectory()) {
354 26 PANIC(kLogStderr, "CloneTree: source '%s' not a directory, aborting",
355 from_dir.c_str());
356 }
357
358
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 DirectoryEntry dest_dirent;
359
3/4
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26 times.
✓ Branch 4 taken 52 times.
78 if (LookupPath(relative_dest, kLookupDefault, &dest_dirent)) {
360 26 PANIC(kLogStderr, "destination '%s' exists, aborting", to_dir.c_str());
361 }
362
363
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 const std::string dest_parent = GetParentPath(relative_dest);
364
1/2
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
52 DirectoryEntry dest_parent_dirent;
365
3/4
✓ Branch 1 taken 52 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26 times.
✓ Branch 4 taken 26 times.
52 if (!LookupPath(dest_parent, kLookupDefault, &dest_parent_dirent)) {
366 26 PANIC(kLogStderr, "destination '%s' not on a known path, aborting",
367 to_dir.c_str());
368 }
369
370
2/4
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26 times.
✗ Branch 5 not taken.
26 CloneTreeImpl(PathString(from_dir),
371
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
52 GetParentPath(to_dir),
372
2/4
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 26 times.
✗ Branch 5 not taken.
52 NameString(GetFileName(to_dir)));
373 546 }
374
375
376 /**
377 * Called from CloneTree(), assumes that from_dir and to_dir are sufficiently
378 * sanitized
379 */
380 208 void WritableCatalogManager::CloneTreeImpl(const PathString &source_dir,
381 const std::string &dest_parent_dir,
382 const NameString &dest_name) {
383
1/2
✓ Branch 4 taken 208 times.
✗ Branch 5 not taken.
208 LogCvmfs(kLogCatalog, kLogDebug, "cloning %s --> %s/%s", source_dir.c_str(),
384
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
416 dest_parent_dir.c_str(), dest_name.ToString().c_str());
385
3/6
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 208 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 208 times.
✗ Branch 8 not taken.
416 const PathString relative_source(MakeRelativePath(source_dir.ToString()));
386
387
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 DirectoryEntry source_dirent;
388
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 bool retval = LookupPath(relative_source, kLookupDefault, &source_dirent);
389
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 208 times.
208 assert(retval);
390
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 208 times.
208 assert(!source_dirent.IsBindMountpoint());
391
392
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 DirectoryEntry dest_dirent(source_dirent);
393
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 dest_dirent.name_.Assign(dest_name);
394 // Just in case, reset the nested catalog markers
395 208 dest_dirent.set_is_nested_catalog_mountpoint(false);
396 208 dest_dirent.set_is_nested_catalog_root(false);
397
398 208 XattrList xattrs;
399
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 208 times.
208 if (source_dirent.HasXattrs()) {
400 retval = LookupXattrs(relative_source, &xattrs);
401 assert(retval);
402 }
403
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 AddDirectory(dest_dirent, xattrs, dest_parent_dir);
404
405
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 std::string dest_dir = dest_parent_dir;
406
2/2
✓ Branch 1 taken 182 times.
✓ Branch 2 taken 26 times.
208 if (!dest_dir.empty())
407
1/2
✓ Branch 1 taken 182 times.
✗ Branch 2 not taken.
182 dest_dir.push_back('/');
408
2/4
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 208 times.
✗ Branch 5 not taken.
208 dest_dir += dest_name.ToString();
409 208 if (source_dirent.IsNestedCatalogRoot()
410
5/6
✓ Branch 0 taken 130 times.
✓ Branch 1 taken 78 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 130 times.
✓ Branch 5 taken 78 times.
✓ Branch 6 taken 130 times.
208 || source_dirent.IsNestedCatalogMountpoint()) {
411
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 CreateNestedCatalog(dest_dir);
412 }
413
414 208 DirectoryEntryList ls;
415
1/2
✓ Branch 1 taken 208 times.
✗ Branch 2 not taken.
208 retval = Listing(relative_source, &ls, false /* expand_symlink */);
416
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 208 times.
208 assert(retval);
417
2/2
✓ Branch 1 taken 442 times.
✓ Branch 2 taken 208 times.
650 for (unsigned i = 0; i < ls.size(); ++i) {
418
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 PathString sub_path(source_dir);
419
2/4
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 442 times.
442 assert(!sub_path.IsEmpty());
420
1/2
✓ Branch 1 taken 442 times.
✗ Branch 2 not taken.
442 sub_path.Append("/", 1);
421
3/6
✓ Branch 2 taken 442 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 442 times.
✗ Branch 8 not taken.
✓ Branch 11 taken 442 times.
✗ Branch 12 not taken.
442 sub_path.Append(ls[i].name().GetChars(), ls[i].name().GetLength());
422
423
2/2
✓ Branch 2 taken 182 times.
✓ Branch 3 taken 260 times.
442 if (ls[i].IsDirectory()) {
424
2/4
✓ Branch 2 taken 182 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 182 times.
✗ Branch 6 not taken.
182 CloneTreeImpl(sub_path, dest_dir, ls[i].name());
425 182 continue;
426 }
427
428 // We break hard-links during cloning
429 260 ls[i].set_hardlink_group(0);
430 260 ls[i].set_linkcount(1);
431
432 260 xattrs.Clear();
433
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 260 times.
260 if (ls[i].HasXattrs()) {
434 retval = LookupXattrs(sub_path, &xattrs);
435 assert(retval);
436 }
437
438
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 260 times.
260 if (ls[i].IsChunkedFile()) {
439 FileChunkList chunks;
440 const std::string relative_sub_path = MakeRelativePath(
441 sub_path.ToString());
442 retval = ListFileChunks(PathString(relative_sub_path),
443 ls[i].hash_algorithm(), &chunks);
444 assert(retval);
445 AddChunkedFile(ls[i], xattrs, dest_dir, chunks);
446 } else {
447
1/2
✓ Branch 2 taken 260 times.
✗ Branch 3 not taken.
260 AddFile(ls[i], xattrs, dest_dir);
448 }
449
2/2
✓ Branch 1 taken 260 times.
✓ Branch 2 taken 182 times.
442 }
450 208 }
451
452
453 /**
454 * Add a new directory to the catalogs.
455 * @param entry a DirectoryEntry structure describing the new directory
456 * @param parent_directory the absolute path of the directory containing the
457 * directory to be created
458 * @return true on success, false otherwise
459 */
460 3798 void WritableCatalogManager::AddDirectory(const DirectoryEntryBase &entry,
461 const XattrList &xattrs,
462 const std::string &parent_directory) {
463
1/2
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
3798 const string parent_path = MakeRelativePath(parent_directory);
464
1/2
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
3798 string directory_path = parent_path + "/";
465
3/6
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 3798 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 3798 times.
✗ Branch 10 not taken.
3798 directory_path.append(entry.name().GetChars(), entry.name().GetLength());
466
467 3798 SyncLock();
468 WritableCatalog *catalog;
469
1/2
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
3798 DirectoryEntry parent_entry;
470
2/4
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 3798 times.
3798 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
471 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
472 directory_path.c_str());
473 }
474
475
1/2
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
3798 DirectoryEntry fixed_hardlink_count(entry);
476 3798 fixed_hardlink_count.set_linkcount(2);
477
1/2
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
3798 catalog->AddEntry(fixed_hardlink_count, xattrs, directory_path, parent_path);
478
479 3798 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
480
1/2
✓ Branch 1 taken 3798 times.
✗ Branch 2 not taken.
3798 catalog->UpdateEntry(parent_entry, parent_path);
481
2/2
✓ Branch 1 taken 78 times.
✓ Branch 2 taken 3720 times.
3798 if (parent_entry.IsNestedCatalogRoot()) {
482
1/2
✓ Branch 2 taken 78 times.
✗ Branch 3 not taken.
78 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
483 parent_path.c_str());
484 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
485 78 catalog->parent());
486 78 parent_entry.set_is_nested_catalog_mountpoint(true);
487 78 parent_entry.set_is_nested_catalog_root(false);
488
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 parent_catalog->UpdateEntry(parent_entry, parent_path);
489 }
490 3798 SyncUnlock();
491 3798 }
492
493 /**
494 * Add a new file to the catalogs.
495 * @param entry a DirectoryEntry structure describing the new file
496 * @param parent_directory the absolute path of the directory containing the
497 * file to be created
498 * @return true on success, false otherwise
499 */
500 5067 void WritableCatalogManager::AddFile(const DirectoryEntry &entry,
501 const XattrList &xattrs,
502 const std::string &parent_directory) {
503
1/2
✓ Branch 1 taken 5067 times.
✗ Branch 2 not taken.
5067 const string parent_path = MakeRelativePath(parent_directory);
504
1/2
✓ Branch 1 taken 5067 times.
✗ Branch 2 not taken.
5067 const string file_path = entry.GetFullPath(parent_path);
505
506 5067 SyncLock();
507 WritableCatalog *catalog;
508
2/4
✓ Branch 1 taken 5067 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 5067 times.
5067 if (!FindCatalog(parent_path, &catalog)) {
509 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
510 file_path.c_str());
511 }
512
513
4/6
✓ Branch 1 taken 4947 times.
✓ Branch 2 taken 120 times.
✓ Branch 4 taken 4947 times.
✗ Branch 5 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 4947 times.
5067 assert(!entry.IsRegular() || entry.IsChunkedFile()
514 || !entry.checksum().IsNull());
515
3/4
✓ Branch 1 taken 120 times.
✓ Branch 2 taken 4947 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 120 times.
5067 assert(entry.IsRegular() || !entry.IsExternalFile());
516
517 // check if file is too big
518
1/2
✓ Branch 1 taken 5067 times.
✗ Branch 2 not taken.
5067 const unsigned mbytes = entry.size() / (1024 * 1024);
519
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5067 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
5067 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
520 LogCvmfs(kLogCatalog, kLogStderr,
521 "%s: file at %s is larger than %u megabytes (%u). "
522 "CernVM-FS works best with small files. "
523 "Please remove the file or increase the limit.",
524 enforce_limits_ ? "FATAL" : "WARNING", file_path.c_str(),
525 file_mbyte_limit_, mbytes);
526 if (enforce_limits_)
527 PANIC(kLogStderr, "file at %s is larger than %u megabytes (%u).",
528 file_path.c_str(), file_mbyte_limit_, mbytes);
529 }
530
531
1/2
✓ Branch 1 taken 5067 times.
✗ Branch 2 not taken.
5067 catalog->AddEntry(entry, xattrs, file_path, parent_path);
532 5067 SyncUnlock();
533 5067 }
534
535
536 void WritableCatalogManager::AddChunkedFile(const DirectoryEntryBase &entry,
537 const XattrList &xattrs,
538 const std::string &parent_directory,
539 const FileChunkList &file_chunks) {
540 assert(file_chunks.size() > 0);
541
542 DirectoryEntry full_entry(entry);
543 full_entry.set_is_chunked_file(true);
544
545 AddFile(full_entry, xattrs, parent_directory);
546
547 const string parent_path = MakeRelativePath(parent_directory);
548 const string file_path = entry.GetFullPath(parent_path);
549
550 SyncLock();
551 WritableCatalog *catalog;
552 if (!FindCatalog(parent_path, &catalog)) {
553 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
554 file_path.c_str());
555 }
556
557 for (unsigned i = 0; i < file_chunks.size(); ++i) {
558 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
559 }
560 SyncUnlock();
561 }
562
563
564 /**
565 * Add a hardlink group to the catalogs.
566 * @param entries a list of DirectoryEntries describing the new files
567 * @param parent_directory the absolute path of the directory containing the
568 * files to be created
569 * @return true on success, false otherwise
570 */
571 void WritableCatalogManager::AddHardlinkGroup(
572 const DirectoryEntryBaseList &entries,
573 const XattrList &xattrs,
574 const std::string &parent_directory,
575 const FileChunkList &file_chunks) {
576 assert(entries.size() >= 1);
577 assert(file_chunks.IsEmpty() || entries[0].IsRegular());
578 if (entries.size() == 1) {
579 DirectoryEntry fix_linkcount(entries[0]);
580 fix_linkcount.set_linkcount(1);
581 if (file_chunks.IsEmpty())
582 return AddFile(fix_linkcount, xattrs, parent_directory);
583 return AddChunkedFile(fix_linkcount, xattrs, parent_directory, file_chunks);
584 }
585
586 LogCvmfs(kLogCatalog, kLogVerboseMsg, "adding hardlink group %s/%s",
587 parent_directory.c_str(), entries[0].name().c_str());
588
589 // Hardlink groups have to reside in the same directory.
590 // Therefore we only have one parent directory here
591 const string parent_path = MakeRelativePath(parent_directory);
592
593 // check if hard link is too big
594 const unsigned mbytes = entries[0].size() / (1024 * 1024);
595 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
596 LogCvmfs(kLogCatalog, kLogStderr,
597 "%s: hard link at %s is larger than %u megabytes (%u). "
598 "CernVM-FS works best with small files. "
599 "Please remove the file or increase the limit.",
600 enforce_limits_ ? "FATAL" : "WARNING",
601 (parent_path + entries[0].name().ToString()).c_str(),
602 file_mbyte_limit_, mbytes);
603 if (enforce_limits_)
604 PANIC(kLogStderr, "hard link at %s is larger than %u megabytes (%u)",
605 (parent_path + entries[0].name().ToString()).c_str(),
606 file_mbyte_limit_, mbytes);
607 }
608
609 SyncLock();
610 WritableCatalog *catalog;
611 if (!FindCatalog(parent_path, &catalog)) {
612 PANIC(kLogStderr,
613 "catalog for hardlink group containing '%s' cannot be found",
614 parent_path.c_str());
615 }
616
617 // Get a valid hardlink group id for the catalog the group will end up in
618 // TODO(unknown): Compaction
619 const uint32_t new_group_id = catalog->GetMaxLinkId() + 1;
620 LogCvmfs(kLogCatalog, kLogVerboseMsg, "hardlink group id %u issued",
621 new_group_id);
622 assert(new_group_id > 0);
623
624 // Add the file entries to the catalog
625 for (DirectoryEntryBaseList::const_iterator i = entries.begin(),
626 iEnd = entries.end();
627 i != iEnd;
628 ++i) {
629 string file_path = parent_path + "/";
630 file_path.append(i->name().GetChars(), i->name().GetLength());
631
632 // create a fully fledged DirectoryEntry to add the hardlink group to it
633 // which is CVMFS specific meta data.
634 DirectoryEntry hardlink(*i);
635 hardlink.set_hardlink_group(new_group_id);
636 hardlink.set_linkcount(entries.size());
637 hardlink.set_is_chunked_file(!file_chunks.IsEmpty());
638
639 catalog->AddEntry(hardlink, xattrs, file_path, parent_path);
640 if (hardlink.IsChunkedFile()) {
641 for (unsigned i = 0; i < file_chunks.size(); ++i) {
642 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
643 }
644 }
645 }
646 SyncUnlock();
647 }
648
649
650 void WritableCatalogManager::ShrinkHardlinkGroup(const string &remove_path) {
651 const string relative_path = MakeRelativePath(remove_path);
652
653 SyncLock();
654 WritableCatalog *catalog;
655 if (!FindCatalog(relative_path, &catalog)) {
656 PANIC(kLogStderr,
657 "catalog for hardlink group containing '%s' cannot be found",
658 remove_path.c_str());
659 }
660
661 catalog->IncLinkcount(relative_path, -1);
662 SyncUnlock();
663 }
664
665
666 /**
667 * Update entry meta data (mode, owner, ...).
668 * CVMFS specific meta data (i.e. nested catalog transition points) are NOT
669 * changed by this method, although transition points intrinsics are taken into
670 * account, to keep nested catalogs consistent.
671 * @param entry the directory entry to be touched
672 * @param path the path of the directory entry to be touched
673 */
674 46 void WritableCatalogManager::TouchDirectory(const DirectoryEntryBase &entry,
675 const XattrList &xattrs,
676 const std::string &directory_path) {
677
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 46 times.
46 assert(entry.IsDirectory());
678
679
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 const string entry_path = MakeRelativePath(directory_path);
680
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 const string parent_path = GetParentPath(entry_path);
681
682 46 SyncLock();
683 // find the catalog to be updated
684 WritableCatalog *catalog;
685
2/4
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 46 times.
46 if (!FindCatalog(parent_path, &catalog)) {
686 PANIC(kLogStderr, "catalog for entry '%s' cannot be found",
687 entry_path.c_str());
688 }
689
690
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 catalog->TouchEntry(entry, xattrs, entry_path);
691
692 // since we deal with a directory here, we might just touch a
693 // nested catalog transition point. If this is the case we would need to
694 // update two catalog entries:
695 // * the nested catalog MOUNTPOINT in the parent catalog
696 // * the nested catalog ROOT in the nested catalog
697
698 // first check if we really have a nested catalog transition point
699
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 catalog::DirectoryEntry potential_transition_point;
700
1/2
✓ Branch 3 taken 46 times.
✗ Branch 4 not taken.
46 const PathString transition_path(entry_path.data(), entry_path.length());
701
1/2
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
46 bool retval = catalog->LookupPath(transition_path,
702 &potential_transition_point);
703
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
46 assert(retval);
704
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 46 times.
46 if (potential_transition_point.IsNestedCatalogMountpoint()) {
705 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point at %s",
706 entry_path.c_str());
707
708 // find and mount nested catalog associated to this transition point
709 shash::Any nested_hash;
710 uint64_t nested_size;
711 retval = catalog->FindNested(transition_path, &nested_hash, &nested_size);
712 assert(retval);
713 Catalog *nested_catalog;
714 nested_catalog = MountCatalog(transition_path, nested_hash, catalog);
715 assert(nested_catalog != NULL);
716
717 // update nested catalog root in the child catalog
718 reinterpret_cast<WritableCatalog *>(nested_catalog)
719 ->TouchEntry(entry, xattrs, entry_path);
720 }
721
722 46 SyncUnlock();
723 46 }
724
725
726 /**
727 * Create a new nested catalog. Includes moving all entries belonging there
728 * from it's parent catalog.
729 * @param mountpoint the path of the directory to become a nested root
730 * @return true on success, false otherwise
731 */
732 1027 void WritableCatalogManager::CreateNestedCatalog(
733 const std::string &mountpoint) {
734
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 const string nested_root_path = MakeRelativePath(mountpoint);
735
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 const PathString ps_nested_root_path(nested_root_path);
736
737 1027 SyncLock();
738 // Find the catalog currently containing the directory structure, which
739 // will be represented as a new nested catalog and its root-entry/mountpoint
740 // along the way
741 1027 WritableCatalog *old_catalog = NULL;
742
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 DirectoryEntry new_root_entry;
743
2/4
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1027 times.
1027 if (!FindCatalog(nested_root_path, &old_catalog, &new_root_entry)) {
744 PANIC(kLogStderr,
745 "failed to create nested catalog '%s': "
746 "mountpoint was not found in current catalog structure",
747 nested_root_path.c_str());
748 }
749
750 // Create the database schema and the initial root entry
751 // for the new nested catalog
752
1/2
✓ Branch 2 taken 1027 times.
✗ Branch 3 not taken.
1027 const string database_file_path = CreateTempPath(dir_temp() + "/catalog",
753
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 0666);
754 1027 const bool volatile_content = false;
755
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 CatalogDatabase *new_catalog_db = CatalogDatabase::Create(database_file_path);
756
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1027 times.
1027 assert(NULL != new_catalog_db);
757 // Note we do not set the external_data bit for nested catalogs
758
2/4
✓ Branch 2 taken 1027 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1027 times.
✗ Branch 6 not taken.
1027 bool retval = new_catalog_db->InsertInitialValues(
759 nested_root_path,
760 volatile_content,
761 "", // At this point, only root
762 // catalog gets VOMS authz
763 new_root_entry);
764
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1027 times.
1027 assert(retval);
765 // TODO(rmeusel): we need a way to attach a catalog directly from an open
766 // database to remove this indirection
767
1/2
✓ Branch 0 taken 1027 times.
✗ Branch 1 not taken.
1027 delete new_catalog_db;
768 1027 new_catalog_db = NULL;
769
770 // Attach the just created nested catalog
771
2/4
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1027 times.
✗ Branch 5 not taken.
1027 Catalog *new_catalog = CreateCatalog(ps_nested_root_path, shash::Any(),
772 old_catalog);
773
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 retval = AttachCatalog(database_file_path, new_catalog);
774
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1027 times.
1027 assert(retval);
775
776
2/4
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1027 times.
1027 assert(new_catalog->IsWritable());
777 1027 WritableCatalog *wr_new_catalog = static_cast<WritableCatalog *>(new_catalog);
778
779
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1027 times.
1027 if (new_root_entry.HasXattrs()) {
780 XattrList xattrs;
781 retval = old_catalog->LookupXattrsPath(ps_nested_root_path, &xattrs);
782 assert(retval);
783 wr_new_catalog->TouchEntry(new_root_entry, xattrs, nested_root_path);
784 }
785
786 // From now on, there are two catalogs, spanning the same directory structure
787 // we have to split the overlapping directory entries from the old catalog
788 // to the new catalog to re-gain a valid catalog structure
789
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 old_catalog->Partition(wr_new_catalog);
790
791 // Add the newly created nested catalog to the references of the containing
792 // catalog
793
4/8
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1027 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1027 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1027 times.
✗ Branch 11 not taken.
1027 old_catalog->InsertNestedCatalog(new_catalog->mountpoint().ToString(), NULL,
794 1027 shash::Any(spooler_->GetHashAlgorithm()), 0);
795
796 // Fix subtree counters in new nested catalogs: subtree is the sum of all
797 // entries of all "grand-nested" catalogs
798 // Note: taking a copy of the nested catalog list here
799 const Catalog::NestedCatalogList
800
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 &grand_nested = wr_new_catalog->ListOwnNestedCatalogs();
801
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 DeltaCounters fix_subtree_counters;
802 2054 for (Catalog::NestedCatalogList::const_iterator i = grand_nested.begin(),
803 1027 iEnd = grand_nested.end();
804
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1027 times.
1027 i != iEnd;
805 ++i) {
806 WritableCatalog *grand_catalog;
807 retval = FindCatalog(i->mountpoint.ToString(), &grand_catalog);
808 assert(retval);
809 const Counters &grand_counters = grand_catalog->GetCounters();
810 grand_counters.AddAsSubtree(&fix_subtree_counters);
811 }
812 1027 const DeltaCounters save_counters = wr_new_catalog->delta_counters_;
813 1027 wr_new_catalog->delta_counters_ = fix_subtree_counters;
814
1/2
✓ Branch 1 taken 1027 times.
✗ Branch 2 not taken.
1027 wr_new_catalog->UpdateCounters();
815 1027 wr_new_catalog->delta_counters_ = save_counters;
816
817 1027 SyncUnlock();
818 1027 }
819
820
821 /**
822 * Remove a nested catalog
823 *
824 * If the merged parameter is true, when you remove a nested catalog
825 * all entries currently held by it will be merged into its parent
826 * catalog.
827 * @param mountpoint - the path of the nested catalog to be removed
828 * @param merge - merge the subtree associated with the nested catalog
829 * into its parent catalog
830 * @return - true on success, false otherwise
831 */
832 99 void WritableCatalogManager::RemoveNestedCatalog(const string &mountpoint,
833 const bool merge) {
834
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 const string nested_root_path = MakeRelativePath(mountpoint);
835
836 99 SyncLock();
837 // Find the catalog which should be removed
838 99 WritableCatalog *nested_catalog = NULL;
839
2/4
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 99 times.
99 if (!FindCatalog(nested_root_path, &nested_catalog)) {
840 PANIC(kLogStderr,
841 "failed to remove nested catalog '%s': "
842 "mountpoint was not found in current catalog structure",
843 nested_root_path.c_str());
844 }
845
846 // Check if the found catalog is really the nested catalog to be deleted
847
6/19
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 99 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 99 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 99 times.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
✓ Branch 13 taken 99 times.
✗ Branch 14 not taken.
✓ Branch 16 taken 99 times.
✗ Branch 17 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
198 assert(!nested_catalog->IsRoot()
848 && (nested_catalog->mountpoint().ToString() == nested_root_path));
849
850
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 if (merge) {
851 // Merge all data from the nested catalog into it's parent
852
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 nested_catalog->MergeIntoParent();
853 } else {
854 nested_catalog->RemoveFromParent();
855 }
856
857 // Delete the catalog database file from the working copy
858
2/6
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 99 times.
99 if (unlink(nested_catalog->database_path().c_str()) != 0) {
859 PANIC(kLogStderr,
860 "unable to delete the removed nested catalog database file '%s'",
861 nested_catalog->database_path().c_str());
862 }
863
864 // Remove the catalog from internal data structures
865
1/2
✓ Branch 1 taken 99 times.
✗ Branch 2 not taken.
99 DetachCatalog(nested_catalog);
866 99 SyncUnlock();
867 99 }
868
869
870 /**
871 * Swap in a new nested catalog
872 *
873 * The old nested catalog must not have been already attached to the
874 * catalog tree. This method will not attach the new nested catalog
875 * to the catalog tree.
876 *
877 * @param mountpoint - the path of the nested catalog to be removed
878 * @param new_hash - the hash of the new nested catalog
879 * @param new_size - the size of the new nested catalog
880 */
881 185 void WritableCatalogManager::SwapNestedCatalog(const string &mountpoint,
882 const shash::Any &new_hash,
883 const uint64_t new_size) {
884
1/2
✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
185 const string nested_root_path = MakeRelativePath(mountpoint);
885
1/2
✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
185 const string parent_path = GetParentPath(nested_root_path);
886
1/2
✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
185 const PathString nested_root_ps = PathString(nested_root_path);
887
888 185 SyncLock();
889
890 // Find the immediate parent catalog
891 185 WritableCatalog *parent = NULL;
892
3/4
✓ Branch 1 taken 185 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26 times.
✓ Branch 4 taken 159 times.
185 if (!FindCatalog(parent_path, &parent)) {
893 26 SyncUnlock(); // this is needed for the unittest. otherwise they get stuck
894 26 PANIC(kLogStderr,
895 "failed to swap nested catalog '%s': could not find parent '%s'",
896 nested_root_path.c_str(), parent_path.c_str());
897 }
898
899 // Get old nested catalog counters
900
1/2
✓ Branch 1 taken 159 times.
✗ Branch 2 not taken.
159 Catalog *old_attached_catalog = parent->FindChild(nested_root_ps);
901
1/2
✓ Branch 1 taken 159 times.
✗ Branch 2 not taken.
159 Counters old_counters;
902
2/2
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 133 times.
159 if (old_attached_catalog) {
903 // Old catalog was already attached (e.g. as a child catalog
904 // attached by a prior call to CreateNestedCatalog()). Ensure
905 // that it has not been modified, get counters, and detach it.
906 26 WritableCatalogList list;
907
2/4
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
26 if (GetModifiedCatalogLeafsRecursively(old_attached_catalog, &list)) {
908 26 SyncUnlock();
909 26 PANIC(kLogStderr,
910 "failed to swap nested catalog '%s': already modified",
911 nested_root_path.c_str());
912 }
913 old_counters = old_attached_catalog->GetCounters();
914 DetachSubtree(old_attached_catalog);
915
916 26 } else {
917 // Old catalog was not attached. Download a freely attached
918 // version and get counters.
919
1/2
✓ Branch 1 taken 133 times.
✗ Branch 2 not taken.
133 shash::Any old_hash;
920 uint64_t old_size;
921
1/2
✓ Branch 1 taken 133 times.
✗ Branch 2 not taken.
133 const bool old_found = parent->FindNested(nested_root_ps, &old_hash,
922 &old_size);
923
2/2
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 107 times.
133 if (!old_found) {
924 26 SyncUnlock();
925 26 PANIC(kLogStderr,
926 "failed to swap nested catalog '%s': not found in parent",
927 nested_root_path.c_str());
928 }
929 const UniquePtr<Catalog> old_free_catalog(
930
2/4
✓ Branch 1 taken 107 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 107 times.
✗ Branch 5 not taken.
107 LoadFreeCatalog(nested_root_ps, old_hash));
931
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 107 times.
107 if (!old_free_catalog.IsValid()) {
932 SyncUnlock();
933 PANIC(kLogStderr,
934 "failed to swap nested catalog '%s': failed to load old catalog",
935 nested_root_path.c_str());
936 }
937 107 old_counters = old_free_catalog->GetCounters();
938 107 }
939
940 // Load freely attached new catalog
941 const UniquePtr<Catalog> new_catalog(
942
3/4
✓ Branch 1 taken 81 times.
✓ Branch 2 taken 26 times.
✓ Branch 4 taken 81 times.
✗ Branch 5 not taken.
107 LoadFreeCatalog(nested_root_ps, new_hash));
943
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 81 times.
81 if (!new_catalog.IsValid()) {
944 SyncUnlock();
945 PANIC(kLogStderr,
946 "failed to swap nested catalog '%s': failed to load new catalog",
947 nested_root_path.c_str());
948 }
949
950 // Get new catalog root directory entry
951
1/2
✓ Branch 1 taken 81 times.
✗ Branch 2 not taken.
81 DirectoryEntry dirent;
952 81 XattrList xattrs;
953
1/2
✓ Branch 2 taken 81 times.
✗ Branch 3 not taken.
81 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
954
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 81 times.
81 if (!dirent_found) {
955 SyncUnlock();
956 PANIC(kLogStderr,
957 "failed to swap nested catalog '%s': missing dirent in new catalog",
958 nested_root_path.c_str());
959 }
960
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 81 times.
81 if (dirent.HasXattrs()) {
961 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
962 &xattrs);
963 if (!xattrs_found) {
964 SyncUnlock();
965 PANIC(kLogStderr,
966 "failed to swap nested catalog '%s': missing xattrs in new catalog",
967 nested_root_path.c_str());
968 }
969 }
970
971 // Swap catalogs
972
1/2
✓ Branch 1 taken 81 times.
✗ Branch 2 not taken.
81 parent->RemoveNestedCatalog(nested_root_path, NULL);
973
1/2
✓ Branch 1 taken 81 times.
✗ Branch 2 not taken.
81 parent->InsertNestedCatalog(nested_root_path, NULL, new_hash, new_size);
974
975 // Update parent directory entry
976 81 dirent.set_is_nested_catalog_mountpoint(true);
977 81 dirent.set_is_nested_catalog_root(false);
978
1/2
✓ Branch 1 taken 81 times.
✗ Branch 2 not taken.
81 parent->UpdateEntry(dirent, nested_root_path);
979
1/2
✓ Branch 1 taken 81 times.
✗ Branch 2 not taken.
81 parent->TouchEntry(dirent, xattrs, nested_root_path);
980
981 // Update counters
982
1/2
✓ Branch 3 taken 81 times.
✗ Branch 4 not taken.
81 const DeltaCounters delta = Counters::Diff(old_counters,
983 new_catalog->GetCounters());
984
1/2
✓ Branch 1 taken 81 times.
✗ Branch 2 not taken.
81 delta.PopulateToParent(&parent->delta_counters_);
985
986 81 SyncUnlock();
987 393 }
988
989 /**
990 * Install a nested catalog (catalog hierarchy) at a new, empty directory
991 *
992 * The mountpoint directory must not yet exist. Its parent directory, however
993 * must exist. This method combines functionality from AddDirectory(),
994 * CreateNestedCatalog() and SwapNestedCatalog().
995 * The new nested catalog won't get attached.
996 *
997 * @param mountpoint - the path where the nested catalog should be installed
998 * @param new_hash - the hash of the new nested catalog
999 * @param new_size - the size of the new nested catalog
1000 */
1001 80 void WritableCatalogManager::GraftNestedCatalog(const string &mountpoint,
1002 const shash::Any &new_hash,
1003 const uint64_t new_size) {
1004
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 const string nested_root_path = MakeRelativePath(mountpoint);
1005
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 const string parent_path = GetParentPath(nested_root_path);
1006
1/2
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
80 const PathString nested_root_ps = PathString(nested_root_path);
1007
1008
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 80 times.
80 assert(!nested_root_path.empty());
1009
1010 // Load freely attached new catalog
1011 const UniquePtr<Catalog> new_catalog(
1012
2/4
✓ Branch 1 taken 80 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 80 times.
✗ Branch 5 not taken.
80 LoadFreeCatalog(nested_root_ps, new_hash));
1013
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 80 times.
80 if (!new_catalog.IsValid()) {
1014 PANIC(kLogStderr,
1015 "failed to graft nested catalog '%s': failed to load new catalog",
1016 nested_root_path.c_str());
1017 }
1018
4/7
✓ Branch 2 taken 80 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 80 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 27 times.
✓ Branch 9 taken 53 times.
80 if (new_catalog->root_prefix() != nested_root_ps) {
1019
2/4
✓ Branch 2 taken 27 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 27 times.
✗ Branch 6 not taken.
81 PANIC(kLogStderr,
1020 "invalid nested catalog for grafting at '%s': catalog rooted at '%s'",
1021 nested_root_path.c_str(),
1022 new_catalog->root_prefix().ToString().c_str());
1023 }
1024
1025 // Get new catalog root directory entry
1026
1/2
✓ Branch 1 taken 53 times.
✗ Branch 2 not taken.
53 DirectoryEntry dirent;
1027 53 XattrList xattrs;
1028
1/2
✓ Branch 2 taken 53 times.
✗ Branch 3 not taken.
53 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
1029
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 53 times.
53 if (!dirent_found) {
1030 PANIC(kLogStderr,
1031 "failed to swap nested catalog '%s': missing dirent in new catalog",
1032 nested_root_path.c_str());
1033 }
1034
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 53 times.
53 if (dirent.HasXattrs()) {
1035 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
1036 &xattrs);
1037 if (!xattrs_found) {
1038 PANIC(kLogStderr,
1039 "failed to swap nested catalog '%s': missing xattrs in new catalog",
1040 nested_root_path.c_str());
1041 }
1042 }
1043 // Transform the nested catalog root into a transition point to be inserted
1044 // in the parent catalog
1045 53 dirent.set_is_nested_catalog_root(false);
1046 53 dirent.set_is_nested_catalog_mountpoint(true);
1047
1048 // Add directory and nested catalog
1049
1050 53 SyncLock();
1051 WritableCatalog *parent_catalog;
1052
1/2
✓ Branch 1 taken 53 times.
✗ Branch 2 not taken.
53 DirectoryEntry parent_entry;
1053
2/4
✓ Branch 1 taken 53 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 53 times.
53 if (!FindCatalog(parent_path, &parent_catalog, &parent_entry)) {
1054 SyncUnlock();
1055 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1056 parent_path.c_str());
1057 }
1058
3/4
✓ Branch 1 taken 53 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 27 times.
✓ Branch 4 taken 26 times.
53 if (parent_catalog->LookupPath(nested_root_ps, NULL)) {
1059 27 SyncUnlock();
1060 27 PANIC(kLogStderr,
1061 "invalid attempt to graft nested catalog into existing "
1062 "directory '%s'",
1063 nested_root_path.c_str());
1064 }
1065
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 parent_catalog->AddEntry(dirent, xattrs, nested_root_path, parent_path);
1066 26 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
1067
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 parent_catalog->UpdateEntry(parent_entry, parent_path);
1068
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 if (parent_entry.IsNestedCatalogRoot()) {
1069 WritableCatalog *grand_parent_catalog = reinterpret_cast<WritableCatalog *>(
1070 26 parent_catalog->parent());
1071 26 parent_entry.set_is_nested_catalog_root(false);
1072 26 parent_entry.set_is_nested_catalog_mountpoint(true);
1073
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 grand_parent_catalog->UpdateEntry(parent_entry, parent_path);
1074 }
1075
1076
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 parent_catalog->InsertNestedCatalog(nested_root_path, NULL, new_hash,
1077 new_size);
1078
1079 // Fix-up counters
1080
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 const Counters counters;
1081
1/2
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
26 const DeltaCounters delta = Counters::Diff(counters,
1082 new_catalog->GetCounters());
1083
1/2
✓ Branch 1 taken 26 times.
✗ Branch 2 not taken.
26 delta.PopulateToParent(&parent_catalog->delta_counters_);
1084
1085 26 SyncUnlock();
1086 323 }
1087
1088 /**
1089 * Checks if a nested catalog starts at this path. The path must be valid.
1090 */
1091 bool WritableCatalogManager::IsTransitionPoint(const string &mountpoint) {
1092 const string path = MakeRelativePath(mountpoint);
1093
1094 SyncLock();
1095 WritableCatalog *catalog;
1096 DirectoryEntry entry;
1097 if (!FindCatalog(path, &catalog, &entry)) {
1098 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1099 path.c_str());
1100 }
1101 const bool result = entry.IsNestedCatalogRoot();
1102 SyncUnlock();
1103 return result;
1104 }
1105
1106
1107 void WritableCatalogManager::PrecalculateListings() {
1108 // TODO(jblomer): meant for micro catalogs
1109 }
1110
1111
1112 void WritableCatalogManager::SetTTL(const uint64_t new_ttl) {
1113 SyncLock();
1114 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetTTL(new_ttl);
1115 SyncUnlock();
1116 }
1117
1118
1119 bool WritableCatalogManager::SetVOMSAuthz(const std::string &voms_authz) {
1120 bool result;
1121 SyncLock();
1122 result = reinterpret_cast<WritableCatalog *>(GetRootCatalog())
1123 ->SetVOMSAuthz(voms_authz);
1124 SyncUnlock();
1125 return result;
1126 }
1127
1128
1129 745 bool WritableCatalogManager::Commit(const bool stop_for_tweaks,
1130 const uint64_t manual_revision,
1131 manifest::Manifest *manifest) {
1132 WritableCatalog *root_catalog = reinterpret_cast<WritableCatalog *>(
1133 745 GetRootCatalog());
1134
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 root_catalog->SetDirty();
1135
1136 // set root catalog revision to manually provided number if available
1137
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 745 times.
745 if (manual_revision > 0) {
1138 const uint64_t revision = root_catalog->GetRevision();
1139 if (revision >= manual_revision) {
1140 LogCvmfs(kLogCatalog, kLogStderr,
1141 "Manual revision (%" PRIu64 ") must not be "
1142 "smaller than the current root catalog's (%" PRIu64
1143 "). Skipped!",
1144 manual_revision, revision);
1145 } else {
1146 // Gets incremented by FinalizeCatalog() afterwards!
1147 root_catalog->SetRevision(manual_revision - 1);
1148 }
1149 }
1150
1151 // do the actual catalog snapshotting and upload
1152
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 CatalogInfo root_catalog_info;
1153
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 if (getenv("_CVMFS_SERIALIZED_CATALOG_PROCESSING_") == NULL)
1154
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 root_catalog_info = SnapshotCatalogs(stop_for_tweaks);
1155 else
1156 root_catalog_info = SnapshotCatalogsSerialized(stop_for_tweaks);
1157
2/4
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 745 times.
745 if (spooler_->GetNumberOfErrors() > 0) {
1158 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalogs");
1159 return false;
1160 }
1161
1162 // .cvmfspublished export
1163
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 LogCvmfs(kLogCatalog, kLogVerboseMsg, "Committing repository manifest");
1164 745 set_base_hash(root_catalog_info.content_hash);
1165
1166 745 manifest->set_catalog_hash(root_catalog_info.content_hash);
1167 745 manifest->set_catalog_size(root_catalog_info.size);
1168
2/4
✓ Branch 2 taken 745 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 745 times.
✗ Branch 6 not taken.
745 manifest->set_root_path("");
1169 745 manifest->set_ttl(root_catalog_info.ttl);
1170 745 manifest->set_revision(root_catalog_info.revision);
1171
1172 745 return true;
1173 }
1174
1175
1176 /**
1177 * Handles the snapshotting of dirty (i.e. modified) catalogs while trying to
1178 * parallelize the compression and upload as much as possible. We use a parallel
1179 * depth first post order tree traversal based on 'continuations'.
1180 *
1181 * The idea is as follows:
1182 * 1. find all leaf-catalogs (i.e. dirty catalogs with no dirty children)
1183 * --> these can be processed and uploaded immediately and independently
1184 * see WritableCatalogManager::GetModifiedCatalogLeafs()
1185 * 2. annotate non-leaf catalogs with their number of dirty children
1186 * --> a finished child will notify it's parent and decrement this number
1187 * see WritableCatalogManager::CatalogUploadCallback()
1188 * 3. if a non-leaf catalog's dirty children number reaches 0, it is scheduled
1189 * for processing as well (continuation)
1190 * --> the parallel processing walks bottom-up through the catalog tree
1191 * see WritableCatalogManager::CatalogUploadCallback()
1192 * 4. when the root catalog is reached, we notify the main thread and return
1193 * --> done through a Future<> in WritableCatalogManager::SnapshotCatalogs
1194 *
1195 * Note: The catalog finalisation (see WritableCatalogManager::FinalizeCatalog)
1196 * happens in a worker thread (i.e. the callback method) for non-leaf
1197 * catalogs.
1198 *
1199 * TODO(rmeusel): since all leaf catalogs are finalized in the main thread, we
1200 * sacrifice some potential concurrency for simplicity.
1201 */
1202 745 WritableCatalogManager::CatalogInfo WritableCatalogManager::SnapshotCatalogs(
1203 const bool stop_for_tweaks) {
1204 // prepare environment for parallel processing
1205
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 Future<CatalogInfo> root_catalog_info_future;
1206 CatalogUploadContext upload_context;
1207 745 upload_context.root_catalog_info = &root_catalog_info_future;
1208 745 upload_context.stop_for_tweaks = stop_for_tweaks;
1209
1210
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 spooler_->RegisterListener(&WritableCatalogManager::CatalogUploadCallback,
1211 this, upload_context);
1212
1213 // find dirty leaf catalogs and annotate non-leaf catalogs (dirty child count)
1214 // post-condition: the entire catalog tree is ready for concurrent processing
1215 745 WritableCatalogList leafs_to_snapshot;
1216
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 GetModifiedCatalogLeafs(&leafs_to_snapshot);
1217
1218 // finalize and schedule the catalog processing
1219 745 WritableCatalogList::const_iterator i = leafs_to_snapshot.begin();
1220 745 const WritableCatalogList::const_iterator iend = leafs_to_snapshot.end();
1221
2/2
✓ Branch 2 taken 1077 times.
✓ Branch 3 taken 745 times.
1822 for (; i != iend; ++i) {
1222
1/2
✓ Branch 2 taken 1077 times.
✗ Branch 3 not taken.
1077 FinalizeCatalog(*i, stop_for_tweaks);
1223
1/2
✓ Branch 2 taken 1077 times.
✗ Branch 3 not taken.
1077 ScheduleCatalogProcessing(*i);
1224 }
1225
1226
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 LogCvmfs(kLogCatalog, kLogVerboseMsg, "waiting for upload of catalogs");
1227
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 const CatalogInfo &root_catalog_info = root_catalog_info_future.Get();
1228
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 spooler_->WaitForUpload();
1229
1230
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 spooler_->UnregisterListeners();
1231 745 return root_catalog_info;
1232 745 }
1233
1234
1235 1694 void WritableCatalogManager::FinalizeCatalog(WritableCatalog *catalog,
1236 const bool stop_for_tweaks) {
1237 // update meta information of this catalog
1238
1/3
✓ Branch 2 taken 1694 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1694 LogCvmfs(kLogCatalog, kLogVerboseMsg, "creating snapshot of catalog '%s'",
1239 3388 catalog->mountpoint().c_str());
1240
1241 1694 catalog->UpdateCounters();
1242 1694 catalog->UpdateLastModified();
1243 1694 catalog->IncrementRevision();
1244
1245 // update the previous catalog revision pointer
1246
2/2
✓ Branch 1 taken 745 times.
✓ Branch 2 taken 949 times.
1694 if (catalog->IsRoot()) {
1247
1/4
✓ Branch 2 taken 745 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
745 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1248 "setting '%s' as previous revision "
1249 "for root catalog",
1250 1490 base_hash().ToStringWithSuffix().c_str());
1251 745 catalog->SetPreviousRevision(base_hash());
1252 } else {
1253 // Multiple catalogs might query the parent concurrently
1254 949 SyncLock();
1255
1/2
✓ Branch 1 taken 949 times.
✗ Branch 2 not taken.
949 shash::Any hash_previous;
1256 uint64_t size_previous;
1257
1/2
✓ Branch 2 taken 949 times.
✗ Branch 3 not taken.
1898 const bool retval = catalog->parent()->FindNested(
1258
1/2
✓ Branch 1 taken 949 times.
✗ Branch 2 not taken.
1898 catalog->mountpoint(), &hash_previous, &size_previous);
1259
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 949 times.
949 assert(retval);
1260 949 SyncUnlock();
1261
1262
1/8
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 949 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
1898 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1263 "found '%s' as previous revision "
1264 "for nested catalog '%s'",
1265
1/2
✓ Branch 1 taken 949 times.
✗ Branch 2 not taken.
1898 hash_previous.ToStringWithSuffix().c_str(),
1266
1/2
✓ Branch 1 taken 949 times.
✗ Branch 2 not taken.
1898 catalog->mountpoint().c_str());
1267
1/2
✓ Branch 1 taken 949 times.
✗ Branch 2 not taken.
949 catalog->SetPreviousRevision(hash_previous);
1268 }
1269 1694 catalog->Commit();
1270
1271 // check if catalog has too many entries
1272 const uint64_t catalog_limit = uint64_t(1000)
1273
2/2
✓ Branch 1 taken 745 times.
✓ Branch 2 taken 949 times.
1694 * uint64_t((catalog->IsRoot()
1274 745 ? root_kcatalog_limit_
1275 949 : nested_kcatalog_limit_));
1276 1694 if ((catalog_limit > 0)
1277
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 1694 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 1694 times.
1694 && (catalog->GetCounters().GetSelfEntries() > catalog_limit)) {
1278 LogCvmfs(kLogCatalog, kLogStderr,
1279 "%s: catalog at %s has more than %lu entries (%lu). "
1280 "Large catalogs stress the CernVM-FS transport infrastructure. "
1281 "Please split it into nested catalogs or increase the limit.",
1282 enforce_limits_ ? "FATAL" : "WARNING",
1283 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1284 catalog_limit, catalog->GetCounters().GetSelfEntries());
1285 if (enforce_limits_)
1286 PANIC(kLogStderr, "catalog at %s has more than %u entries (%u). ",
1287 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1288 catalog_limit, catalog->GetCounters().GetSelfEntries());
1289 }
1290
1291 // allow for manual adjustments in the catalog
1292
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1694 times.
1694 if (stop_for_tweaks) {
1293 LogCvmfs(kLogCatalog, kLogStdout,
1294 "Allowing for tweaks in %s at %s "
1295 "(hit return to continue)",
1296 catalog->database_path().c_str(), catalog->mountpoint().c_str());
1297 const int read_char = getchar();
1298 assert(read_char != EOF);
1299 }
1300
1301 // compaction of bloated catalogs (usually after high database churn)
1302 1694 catalog->VacuumDatabaseIfNecessary();
1303 1694 }
1304
1305
1306 1694 void WritableCatalogManager::ScheduleCatalogProcessing(
1307 WritableCatalog *catalog) {
1308 {
1309 1694 const MutexLockGuard guard(catalog_processing_lock_);
1310 // register catalog object for WritableCatalogManager::CatalogUploadCallback
1311
2/4
✓ Branch 1 taken 1694 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1694 times.
✗ Branch 5 not taken.
1694 catalog_processing_map_[catalog->database_path()] = catalog;
1312 1694 }
1313
1/2
✓ Branch 2 taken 1694 times.
✗ Branch 3 not taken.
1694 spooler_->ProcessCatalog(catalog->database_path());
1314 1694 }
1315
1316 /**
1317 * Copy catalog to local cache.server
1318 * Must be an atomic write into the cache_dir
1319 * As such: create a temporary copy in cache_dir/txn and then do a
1320 * `rename` (which is atomic) to the actual cache path
1321 *
1322 * @returns true on success, otherwise false
1323 */
1324 bool WritableCatalogManager::CopyCatalogToLocalCache(
1325 const upload::SpoolerResult &result) {
1326 std::string tmp_catalog_path;
1327 const std::string cache_catalog_path = dir_cache_ + "/"
1328 + result.content_hash
1329 .MakePathWithoutSuffix();
1330 FILE *fcatalog = CreateTempFile(dir_cache_ + "/txn/catalog", 0666, "w",
1331 &tmp_catalog_path);
1332 if (!fcatalog) {
1333 PANIC(kLogDebug | kLogStderr,
1334 "Creating file for temporary catalog failed: %s",
1335 tmp_catalog_path.c_str());
1336 }
1337 CopyPath2File(result.local_path.c_str(), fcatalog);
1338 (void)fclose(fcatalog);
1339
1340 if (rename(tmp_catalog_path.c_str(), cache_catalog_path.c_str()) != 0) {
1341 PANIC(kLogDebug | kLogStderr, "Failed to copy catalog from %s to cache %s",
1342 result.local_path.c_str(), cache_catalog_path.c_str());
1343 }
1344 return true;
1345 }
1346
1347 1694 void WritableCatalogManager::CatalogUploadCallback(
1348 const upload::SpoolerResult &result,
1349 const CatalogUploadContext catalog_upload_context) {
1350
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1694 times.
1694 if (result.return_code != 0) {
1351 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1352 result.local_path.c_str(), result.return_code);
1353 }
1354
1355 // retrieve the catalog object based on the callback information
1356 // see WritableCatalogManager::ScheduleCatalogProcessing()
1357 1694 WritableCatalog *catalog = NULL;
1358 {
1359 1694 const MutexLockGuard guard(catalog_processing_lock_);
1360 const std::map<std::string, WritableCatalog *>::iterator
1361
1/2
✓ Branch 1 taken 1694 times.
✗ Branch 2 not taken.
1694 c = catalog_processing_map_.find(result.local_path);
1362
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 1694 times.
1694 assert(c != catalog_processing_map_.end());
1363 1694 catalog = c->second;
1364 1694 }
1365
1366 1694 const uint64_t catalog_size = GetFileSize(result.local_path);
1367
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1694 times.
1694 assert(catalog_size > 0);
1368
1369
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1694 times.
1694 if (UseLocalCache()) {
1370 CopyCatalogToLocalCache(result);
1371 }
1372
1373 1694 SyncLock();
1374
2/2
✓ Branch 1 taken 949 times.
✓ Branch 2 taken 745 times.
1694 if (catalog->HasParent()) {
1375 // finalized nested catalogs will update their parent's pointer and schedule
1376 // them for processing (continuation) if the 'dirty children count' == 0
1377 949 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1378 949 WritableCatalog *parent = catalog->GetWritableParent();
1379
1380
2/4
✓ Branch 1 taken 949 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 949 times.
✗ Branch 5 not taken.
949 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1381 949 result.content_hash,
1382 catalog_size,
1383 949 catalog->delta_counters_);
1384 949 catalog->delta_counters_.SetZero();
1385
1386 const int remaining_dirty_children = catalog->GetWritableParent()
1387 949 ->DecrementDirtyChildren();
1388
1389 949 SyncUnlock();
1390
1391 // continuation of the dirty catalog tree traversal
1392 // see WritableCatalogManager::SnapshotCatalogs()
1393
2/2
✓ Branch 0 taken 617 times.
✓ Branch 1 taken 332 times.
949 if (remaining_dirty_children == 0) {
1394 617 FinalizeCatalog(parent, catalog_upload_context.stop_for_tweaks);
1395 617 ScheduleCatalogProcessing(parent);
1396 }
1397
1398
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 } else if (catalog->IsRoot()) {
1399 // once the root catalog is reached, we are done with processing and report
1400 // back to the main via a Future<> and provide the necessary information
1401
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 CatalogInfo root_catalog_info;
1402 745 root_catalog_info.size = catalog_size;
1403
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 root_catalog_info.ttl = catalog->GetTTL();
1404 745 root_catalog_info.content_hash = result.content_hash;
1405
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 root_catalog_info.revision = catalog->GetRevision();
1406
1/2
✓ Branch 1 taken 745 times.
✗ Branch 2 not taken.
745 catalog_upload_context.root_catalog_info->Set(root_catalog_info);
1407 745 SyncUnlock();
1408 } else {
1409 PANIC(kLogStderr, "inconsistent state detected");
1410 }
1411 1694 }
1412
1413
1414 /**
1415 * Finds dirty catalogs that can be snapshot right away and annotates all the
1416 * other catalogs with their number of dirty descendants.
1417 * Note that there is a convenience wrapper to start the recursion:
1418 * WritableCatalogManager::GetModifiedCatalogLeafs()
1419 *
1420 * @param catalog the catalog for this recursion step
1421 * @param result the result list to be appended to
1422 * @return true if 'catalog' is dirty
1423 */
1424 1720 bool WritableCatalogManager::GetModifiedCatalogLeafsRecursively(
1425 Catalog *catalog, WritableCatalogList *result) const {
1426 1720 WritableCatalog *wr_catalog = static_cast<WritableCatalog *>(catalog);
1427
1428 // Look for dirty catalogs in the descendants of *catalog
1429 1720 int dirty_children = 0;
1430
1/2
✓ Branch 1 taken 1720 times.
✗ Branch 2 not taken.
1720 CatalogList children = wr_catalog->GetChildren();
1431 1720 CatalogList::const_iterator i = children.begin();
1432 1720 const CatalogList::const_iterator iend = children.end();
1433
2/2
✓ Branch 2 taken 949 times.
✓ Branch 3 taken 1720 times.
2669 for (; i != iend; ++i) {
1434
2/4
✓ Branch 2 taken 949 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 949 times.
✗ Branch 5 not taken.
949 if (GetModifiedCatalogLeafsRecursively(*i, result)) {
1435 949 ++dirty_children;
1436 }
1437 }
1438
1439 // a catalog is dirty if itself or one of its children has changed
1440 // a leaf catalog doesn't have any dirty children
1441 1720 wr_catalog->set_dirty_children(dirty_children);
1442
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 1720 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1720 const bool is_dirty = wr_catalog->IsDirty() || dirty_children > 0;
1443 1720 const bool is_leaf = dirty_children == 0;
1444
3/4
✓ Branch 0 taken 1720 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1103 times.
✓ Branch 3 taken 617 times.
1720 if (is_dirty && is_leaf) {
1445
1/2
✓ Branch 1 taken 1103 times.
✗ Branch 2 not taken.
1103 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1446 }
1447
1448 1720 return is_dirty;
1449 1720 }
1450
1451
1452 void WritableCatalogManager::DoBalance() {
1453 CatalogList catalog_list = GetCatalogs();
1454 reverse(catalog_list.begin(), catalog_list.end());
1455 for (unsigned i = 0; i < catalog_list.size(); ++i) {
1456 FixWeight(static_cast<WritableCatalog *>(catalog_list[i]));
1457 }
1458 }
1459
1460 void WritableCatalogManager::FixWeight(WritableCatalog *catalog) {
1461 // firstly check underflow because they can provoke overflows
1462 if (catalog->GetNumEntries() < min_weight_ && !catalog->IsRoot()
1463 && catalog->IsAutogenerated()) {
1464 LogCvmfs(kLogCatalog, kLogStdout,
1465 "Deleting an autogenerated catalog in '%s'",
1466 catalog->mountpoint().c_str());
1467 // Remove the .cvmfscatalog and .cvmfsautocatalog files first
1468 const string path = catalog->mountpoint().ToString();
1469 catalog->RemoveEntry(path + "/.cvmfscatalog");
1470 catalog->RemoveEntry(path + "/.cvmfsautocatalog");
1471 // Remove the actual catalog
1472 const string catalog_path = catalog->mountpoint().ToString().substr(1);
1473 RemoveNestedCatalog(catalog_path);
1474 } else if (catalog->GetNumEntries() > max_weight_) {
1475 CatalogBalancer<WritableCatalogManager> catalog_balancer(this);
1476 catalog_balancer.Balance(catalog);
1477 }
1478 }
1479
1480
1481 //****************************************************************************
1482 // Workaround -- Serialized Catalog Committing
1483
1484 int WritableCatalogManager::GetModifiedCatalogsRecursively(
1485 const Catalog *catalog, WritableCatalogList *result) const {
1486 // A catalog must be snapshot, if itself or one of it's descendants is dirty.
1487 // So we traverse the catalog tree recursively and look for dirty catalogs
1488 // on the way.
1489 const WritableCatalog *wr_catalog = static_cast<const WritableCatalog *>(
1490 catalog);
1491 // This variable will contain the number of dirty catalogs in the sub tree
1492 // with *catalog as it's root.
1493 int dirty_catalogs = (wr_catalog->IsDirty()) ? 1 : 0;
1494
1495 // Look for dirty catalogs in the descendants of *catalog
1496 CatalogList children = wr_catalog->GetChildren();
1497 for (CatalogList::const_iterator i = children.begin(), iEnd = children.end();
1498 i != iEnd; ++i) {
1499 dirty_catalogs += GetModifiedCatalogsRecursively(*i, result);
1500 }
1501
1502 // If we found a dirty catalog in the checked sub tree, the root (*catalog)
1503 // must be snapshot and ends up in the result list
1504 if (dirty_catalogs > 0)
1505 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1506
1507 // tell the upper layer about number of catalogs
1508 return dirty_catalogs;
1509 }
1510
1511
1512 void WritableCatalogManager::CatalogUploadSerializedCallback(
1513 const upload::SpoolerResult &result, const CatalogUploadContext unused) {
1514 if (result.return_code != 0) {
1515 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1516 result.local_path.c_str(), result.return_code);
1517 }
1518
1519 if (UseLocalCache()) {
1520 CopyCatalogToLocalCache(result);
1521 }
1522
1523 unlink(result.local_path.c_str());
1524 }
1525
1526
1527 WritableCatalogManager::CatalogInfo
1528 WritableCatalogManager::SnapshotCatalogsSerialized(const bool stop_for_tweaks) {
1529 LogCvmfs(kLogCvmfs, kLogStdout, "Serialized committing of file catalogs...");
1530 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetDirty();
1531 WritableCatalogList catalogs_to_snapshot;
1532 GetModifiedCatalogs(&catalogs_to_snapshot);
1533 CatalogUploadContext unused;
1534 unused.root_catalog_info = NULL;
1535 unused.stop_for_tweaks = false;
1536 spooler_->RegisterListener(
1537 &WritableCatalogManager::CatalogUploadSerializedCallback, this, unused);
1538
1539 CatalogInfo root_catalog_info;
1540 WritableCatalogList::const_iterator i = catalogs_to_snapshot.begin();
1541 const WritableCatalogList::const_iterator iend = catalogs_to_snapshot.end();
1542 for (; i != iend; ++i) {
1543 FinalizeCatalog(*i, stop_for_tweaks);
1544
1545 // Compress and upload catalog
1546 shash::Any hash_catalog(spooler_->GetHashAlgorithm(),
1547 shash::kSuffixCatalog);
1548 if (!zlib::CompressPath2Null((*i)->database_path(), &hash_catalog)) {
1549 PANIC(kLogStderr, "could not compress catalog %s",
1550 (*i)->mountpoint().ToString().c_str());
1551 }
1552
1553 const int64_t catalog_size = GetFileSize((*i)->database_path());
1554 assert(catalog_size > 0);
1555
1556 if ((*i)->HasParent()) {
1557 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1558 WritableCatalog *parent = (*i)->GetWritableParent();
1559 parent->UpdateNestedCatalog((*i)->mountpoint().ToString(), hash_catalog,
1560 catalog_size, (*i)->delta_counters_);
1561 (*i)->delta_counters_.SetZero();
1562 } else if ((*i)->IsRoot()) {
1563 root_catalog_info.size = catalog_size;
1564 root_catalog_info.ttl = (*i)->GetTTL();
1565 root_catalog_info.content_hash = hash_catalog;
1566 root_catalog_info.revision = (*i)->GetRevision();
1567 } else {
1568 PANIC(kLogStderr, "inconsistent state detected");
1569 }
1570
1571 spooler_->ProcessCatalog((*i)->database_path());
1572 }
1573 spooler_->WaitForUpload();
1574
1575 spooler_->UnregisterListeners();
1576 return root_catalog_info;
1577 }
1578
1579 void WritableCatalogManager::SetupSingleCatalogUploadCallback() {
1580 spooler_->RegisterListener(
1581 &WritableCatalogManager::SingleCatalogUploadCallback, this);
1582 }
1583
1584 void WritableCatalogManager::RemoveSingleCatalogUploadCallback() {
1585 spooler_->WaitForUpload(); // wait for all outstanding jobs to finish before
1586 // tearing it down
1587 spooler_->UnregisterListeners();
1588 pending_catalogs_ =
1589 {}; // whatever we couldn't process, leave it to the Commit
1590 }
1591
1592 void WritableCatalogManager::AddCatalogToQueue(const std::string &path) {
1593 SyncLock();
1594 WritableCatalog *catalog = NULL;
1595 bool const retval = FindCatalog(MakeRelativePath(path), &catalog, NULL);
1596 assert(retval);
1597 assert(catalog);
1598 catalog->SetDirty(); // ensure it's dirty so its parent will wait for it
1599 SyncUnlock();
1600 pending_catalogs_.push_back(catalog);
1601 }
1602
1603 void WritableCatalogManager::ScheduleReadyCatalogs() {
1604 // best effort to schedule as many catalogs for upload as possible
1605 for (auto it = pending_catalogs_.begin(); it != pending_catalogs_.end();) {
1606 if ((*it)->dirty_children() == 0) {
1607 FinalizeCatalog(*it, false /* stop_for_tweaks */);
1608 ScheduleCatalogProcessing(*it);
1609 LogCvmfs(kLogCatalog, kLogVerboseMsg, "scheduled %s for processing",
1610 (*it)->mountpoint().c_str());
1611 it = pending_catalogs_.erase(it);
1612 } else {
1613 ++it;
1614 }
1615 }
1616 }
1617
1618 // Callback for uploading a single catalog, similar to CatalogUploadCallback.
1619 // The main difference is that this callback would not trigger processing of the
1620 // parent
1621 void WritableCatalogManager::SingleCatalogUploadCallback(
1622 const upload::SpoolerResult &result) {
1623 if (result.return_code != 0) {
1624 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1625 result.local_path.c_str(), result.return_code);
1626 }
1627
1628 // retrieve the catalog object based on the callback information
1629 // see WritableCatalogManager::ScheduleCatalogProcessing()
1630 WritableCatalog *catalog = NULL;
1631 {
1632 MutexLockGuard const guard(catalog_processing_lock_);
1633 std::map<std::string, WritableCatalog *>::iterator const
1634 c = catalog_processing_map_.find(result.local_path);
1635 assert(c != catalog_processing_map_.end());
1636 catalog = c->second;
1637 }
1638
1639 uint64_t const catalog_size = GetFileSize(result.local_path);
1640 assert(catalog_size > 0);
1641
1642 SyncLock();
1643 if (catalog->HasParent()) {
1644 // finalized nested catalogs will update their parent's pointer
1645 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1646 WritableCatalog *parent = catalog->GetWritableParent();
1647
1648 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1649 result.content_hash,
1650 catalog_size,
1651 catalog->delta_counters_);
1652 parent->DecrementDirtyChildren();
1653 catalog->delta_counters_.SetZero();
1654 }
1655 // JUMP: detach the catalog after uploading to free sqlite related resources
1656 DetachCatalog(catalog);
1657 SyncUnlock();
1658 }
1659 // using the given list of dirs, fetch all relevant catalogs
1660 void WritableCatalogManager::LoadCatalogs(
1661 const std::string &base_path, const std::unordered_set<std::string> &dirs) {
1662 // mount everything up to "base_path" first (this would be our lease_path
1663 // typically)
1664 Catalog *base_catalog;
1665 MountSubtree(PathString(base_path), NULL /* entry_point */,
1666 true /* is_listable */, &base_catalog);
1667
1668 // start up the downloader
1669 CatalogDownloadContext context;
1670 context.dirs = &dirs;
1671 catalog_download_pipeline_ = new CatalogDownloadPipeline(
1672 static_cast<SimpleCatalogManager *>(this));
1673 catalog_download_pipeline_->RegisterListener(
1674 &WritableCatalogManager::CatalogDownloadCallback, this, context);
1675 catalog_download_pipeline_->Spawn();
1676
1677 Catalog::NestedCatalogList nested_catalogs = base_catalog
1678 ->ListNestedCatalogs();
1679 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1680 // schedule relevant child nested catalogs for download
1681 std::string const mountpoint = it->mountpoint.ToString();
1682 if (dirs.find(mountpoint) != dirs.end()) {
1683 Catalog *catalog = CreateCatalog(it->mountpoint, it->hash,
1684 NULL /* parent */);
1685 {
1686 MutexLockGuard const guard(catalog_download_lock_);
1687 catalog_download_map_.insert(
1688 std::make_pair(it->hash.ToString(), catalog));
1689 }
1690 catalog_download_pipeline_->Process(it->hash);
1691 }
1692 }
1693
1694 catalog_download_pipeline_->WaitFor();
1695 delete catalog_download_pipeline_; // terminate all the threads
1696 }
1697
1698 bool WritableCatalogManager::LookupDirEntry(const string &path,
1699 const LookupOptions options,
1700 DirectoryEntry *dirent) {
1701 SyncLock();
1702 bool const exists = LookupPath(path, options, dirent);
1703 SyncUnlock();
1704 return exists;
1705 }
1706
1707 void WritableCatalogManager::CatalogHashSerializedCallback(
1708 const CompressHashResult &result) {
1709 MutexLockGuard const guard(catalog_hash_lock_);
1710 catalog_hash_map_[result.path] = result.hash;
1711 }
1712
1713 void WritableCatalogManager::CatalogDownloadCallback(
1714 const CatalogDownloadResult &result, CatalogDownloadContext context) {
1715 Catalog *downloaded_catalog;
1716 {
1717 MutexLockGuard const guard(catalog_download_lock_);
1718 auto it = catalog_download_map_.find(result.hash);
1719 assert(it != catalog_download_map_.end());
1720 downloaded_catalog = it->second;
1721 }
1722
1723 if (!downloaded_catalog->OpenDatabase(result.db_path)) {
1724 LogCvmfs(kLogCvmfs, kLogDebug, "failed to initialize catalog");
1725 delete downloaded_catalog;
1726 return;
1727 }
1728
1729 Catalog::NestedCatalogList nested_catalogs = downloaded_catalog
1730 ->ListNestedCatalogs();
1731 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1732 // schedule relevant child nested catalogs for download
1733 if (context.dirs->find(it->mountpoint.ToString()) != context.dirs->end()) {
1734 Catalog *child_catalog = CreateCatalog(it->mountpoint, it->hash,
1735 NULL /* parent */);
1736 {
1737 MutexLockGuard const guard(catalog_download_lock_);
1738 catalog_download_map_.insert(
1739 std::make_pair(it->hash.ToString(), child_catalog));
1740 }
1741 catalog_download_pipeline_->Process(it->hash);
1742 }
1743 }
1744 delete downloaded_catalog;
1745 }
1746
1747
1748 } // namespace catalog
1749