GCC Code Coverage Report


Directory: cvmfs/
File: cvmfs/catalog_mgr_rw.cc
Date: 2025-07-13 02:35:07
Exec Total Coverage
Lines: 463 832 55.6%
Branches: 356 1220 29.2%

Line Branch Exec Source
1 /**
2 * This file is part of the CernVM file system.
3 */
4
5 #define __STDC_FORMAT_MACROS
6
7 #include "catalog_mgr_rw.h"
8
9 #include <inttypes.h>
10 #include <unistd.h>
11
12 #include <cassert>
13 #include <cstdio>
14 #include <cstdlib>
15 #include <string>
16
17 #include "catalog_balancer.h"
18 #include "catalog_rw.h"
19 #include "manifest.h"
20 #include "statistics.h"
21 #include "upload.h"
22 #include "util/exception.h"
23 #include "util/logging.h"
24 #include "util/posix.h"
25 #include "util/smalloc.h"
26
27 using namespace std; // NOLINT
28
29 namespace catalog {
30
31 819 WritableCatalogManager::WritableCatalogManager(
32 const shash::Any &base_hash,
33 const std::string &stratum0,
34 const string &dir_temp,
35 upload::Spooler *spooler,
36 download::DownloadManager *download_manager,
37 bool enforce_limits,
38 const unsigned nested_kcatalog_limit,
39 const unsigned root_kcatalog_limit,
40 const unsigned file_mbyte_limit,
41 perf::Statistics *statistics,
42 bool is_balanceable,
43 unsigned max_weight,
44 unsigned min_weight,
45 819 const std::string &dir_cache)
46 : SimpleCatalogManager(base_hash, stratum0, dir_temp, download_manager,
47 statistics, false, dir_cache,
48 true /* copy to tmpdir */)
49 819 , spooler_(spooler)
50 819 , enforce_limits_(enforce_limits)
51 819 , nested_kcatalog_limit_(nested_kcatalog_limit)
52 819 , root_kcatalog_limit_(root_kcatalog_limit)
53 819 , file_mbyte_limit_(file_mbyte_limit)
54 819 , is_balanceable_(is_balanceable)
55 819 , max_weight_(max_weight)
56 819 , min_weight_(min_weight)
57 819 , balance_weight_(max_weight / 2) {
58 819 sync_lock_ = reinterpret_cast<pthread_mutex_t *>(
59 819 smalloc(sizeof(pthread_mutex_t)));
60 819 int retval = pthread_mutex_init(sync_lock_, NULL);
61
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 819 times.
819 assert(retval == 0);
62 819 catalog_processing_lock_ = reinterpret_cast<pthread_mutex_t *>(
63 819 smalloc(sizeof(pthread_mutex_t)));
64 819 retval = pthread_mutex_init(catalog_processing_lock_, NULL);
65
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 819 times.
819 assert(retval == 0);
66 819 }
67
68
69 3272 WritableCatalogManager::~WritableCatalogManager() {
70 1636 pthread_mutex_destroy(sync_lock_);
71 1636 free(sync_lock_);
72 1636 pthread_mutex_destroy(catalog_processing_lock_);
73 1636 free(catalog_processing_lock_);
74 3272 }
75
76
77 /**
78 * This method is virtual in AbstractCatalogManager. It returns a new catalog
79 * structure in the form the different CatalogManagers need it.
80 * In this case it returns a stub for a WritableCatalog.
81 * @param mountpoint the mount point of the catalog stub to create
82 * @param catalog_hash the content hash of the catalog to create
83 * @param parent_catalog the parent of the catalog stub to create
84 * @return a pointer to the catalog stub structure created
85 */
86 2629 Catalog *WritableCatalogManager::CreateCatalog(const PathString &mountpoint,
87 const shash::Any &catalog_hash,
88 Catalog *parent_catalog) {
89 5258 return new WritableCatalog(mountpoint.ToString(), catalog_hash,
90
2/4
✓ Branch 1 taken 2629 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 2629 times.
✗ Branch 5 not taken.
5258 parent_catalog);
91 }
92
93
94 2629 void WritableCatalogManager::ActivateCatalog(Catalog *catalog) {
95 2629 catalog->TakeDatabaseFileOwnership();
96 2629 }
97
98
99 /**
100 * This method is invoked if we create a completely new repository.
101 * The new root catalog will already contain a root entry.
102 * It is uploaded by a Forklift to the upstream storage.
103 * @return true on success, false otherwise
104 */
105 608 manifest::Manifest *WritableCatalogManager::CreateRepository(
106 const string &dir_temp,
107 const bool volatile_content,
108 const std::string &voms_authz,
109 upload::Spooler *spooler) {
110 // Create a new root catalog at file_path
111
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 const string file_path = dir_temp + "/new_root_catalog";
112
113 608 const shash::Algorithms hash_algorithm = spooler->GetHashAlgorithm();
114
115 // A newly created catalog always needs a root entry
116 // we create and configure this here
117
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 DirectoryEntry root_entry;
118 608 root_entry.inode_ = DirectoryEntry::kInvalidInode;
119 608 root_entry.mode_ = 16877;
120 608 root_entry.size_ = 4096;
121 608 root_entry.mtime_ = time(NULL);
122 608 root_entry.uid_ = getuid();
123 608 root_entry.gid_ = getgid();
124
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 root_entry.checksum_ = shash::Any(hash_algorithm);
125 608 root_entry.linkcount_ = 2;
126
1/2
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
608 const string root_path = "";
127
128 // Create the database schema and the initial root entry
129 {
130 const UniquePtr<CatalogDatabase> new_clg_db(
131
2/4
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
608 CatalogDatabase::Create(file_path));
132 608 if (!new_clg_db.IsValid()
133
4/8
✓ Branch 0 taken 608 times.
✗ Branch 1 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 608 times.
✗ Branch 8 not taken.
✓ Branch 9 taken 608 times.
608 || !new_clg_db->InsertInitialValues(root_path, volatile_content,
134 voms_authz, root_entry)) {
135 LogCvmfs(kLogCatalog, kLogStderr, "creation of catalog '%s' failed",
136 file_path.c_str());
137 return NULL;
138 }
139
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 }
140
141 // Compress root catalog;
142
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 const int64_t catalog_size = GetFileSize(file_path);
143
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (catalog_size < 0) {
144 unlink(file_path.c_str());
145 return NULL;
146 }
147
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 const string file_path_compressed = file_path + ".compressed";
148
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 shash::Any hash_catalog(hash_algorithm, shash::kSuffixCatalog);
149
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 const bool retval = zlib::CompressPath2Path(file_path, file_path_compressed,
150 &hash_catalog);
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 608 times.
608 if (!retval) {
152 LogCvmfs(kLogCatalog, kLogStderr, "compression of catalog '%s' failed",
153 file_path.c_str());
154 unlink(file_path.c_str());
155 return NULL;
156 }
157 608 unlink(file_path.c_str());
158
159 // Create manifest
160
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 const string manifest_path = dir_temp + "/manifest";
161 manifest::Manifest *manifest = new manifest::Manifest(hash_catalog,
162
3/6
✓ Branch 2 taken 608 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 608 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 608 times.
✗ Branch 9 not taken.
608 catalog_size, "");
163
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 608 times.
608 if (!voms_authz.empty()) {
164 manifest->set_has_alt_catalog_path(true);
165 }
166
167 // Upload catalog
168
3/6
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 608 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 608 times.
✗ Branch 8 not taken.
608 spooler->Upload(file_path_compressed, "data/" + hash_catalog.MakePath());
169
1/2
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
608 spooler->WaitForUpload();
170 608 unlink(file_path_compressed.c_str());
171
2/4
✓ Branch 1 taken 608 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 608 times.
608 if (spooler->GetNumberOfErrors() > 0) {
172 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalog %s",
173 file_path_compressed.c_str());
174 delete manifest;
175 return NULL;
176 }
177
178 608 return manifest;
179 608 }
180
181
182 /**
183 * Retrieve the catalog containing the given path.
184 * Other than AbstractCatalogManager::FindCatalog() this mounts nested
185 * catalogs if necessary and returns WritableCatalog objects.
186 * Furthermore it optionally returns the looked-up DirectoryEntry.
187 *
188 * @param path the path to look for
189 * @param result the retrieved catalog (as a pointer)
190 * @param dirent is set to looked up DirectoryEntry for 'path' if non-NULL
191 * @return true if catalog was found
192 */
193 12392 bool WritableCatalogManager::FindCatalog(const string &path,
194 WritableCatalog **result,
195 DirectoryEntry *dirent) {
196
1/2
✓ Branch 1 taken 12392 times.
✗ Branch 2 not taken.
12392 const PathString ps_path(path);
197
198
1/2
✓ Branch 1 taken 12392 times.
✗ Branch 2 not taken.
12392 Catalog *best_fit = AbstractCatalogManager<Catalog>::FindCatalog(ps_path);
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12392 times.
12392 assert(best_fit != NULL);
200 12392 Catalog *catalog = NULL;
201
1/2
✓ Branch 1 taken 12392 times.
✗ Branch 2 not taken.
12392 const bool retval = MountSubtree(ps_path, best_fit, true /* is_listable */,
202 &catalog);
203
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 12392 times.
12392 if (!retval)
204 return false;
205
206
1/2
✓ Branch 1 taken 12392 times.
✗ Branch 2 not taken.
12392 catalog::DirectoryEntry dummy;
207
2/2
✓ Branch 0 taken 6338 times.
✓ Branch 1 taken 6054 times.
12392 if (NULL == dirent) {
208 6338 dirent = &dummy;
209 }
210
1/2
✓ Branch 1 taken 12392 times.
✗ Branch 2 not taken.
12392 const bool found = catalog->LookupPath(ps_path, dirent);
211
6/8
✓ Branch 0 taken 12343 times.
✓ Branch 1 taken 49 times.
✓ Branch 3 taken 12343 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 12343 times.
✓ Branch 7 taken 49 times.
✓ Branch 8 taken 12343 times.
12392 if (!found || !catalog->IsWritable())
212 49 return false;
213
214 12343 *result = static_cast<WritableCatalog *>(catalog);
215 12343 return true;
216 12392 }
217
218
219 WritableCatalog *WritableCatalogManager::GetHostingCatalog(
220 const std::string &path) {
221 WritableCatalog *result = NULL;
222 const bool retval = FindCatalog(MakeRelativePath(path), &result, NULL);
223 if (!retval)
224 return NULL;
225 return result;
226 }
227
228
229 /**
230 * Remove the given file from the catalogs.
231 * @param file_path the full path to the file to be removed
232 * @return true on success, false otherwise
233 */
234 70 void WritableCatalogManager::RemoveFile(const std::string &path) {
235
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 const string file_path = MakeRelativePath(path);
236
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 const string parent_path = GetParentPath(file_path);
237
238 70 SyncLock();
239 WritableCatalog *catalog;
240
2/4
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 70 times.
70 if (!FindCatalog(parent_path, &catalog)) {
241 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
242 file_path.c_str());
243 }
244
245
1/2
✓ Branch 1 taken 70 times.
✗ Branch 2 not taken.
70 catalog->RemoveEntry(file_path);
246 70 SyncUnlock();
247 70 }
248
249
250 /**
251 * Remove the given directory from the catalogs.
252 * @param directory_path the full path to the directory to be removed
253 * @return true on success, false otherwise
254 */
255 161 void WritableCatalogManager::RemoveDirectory(const std::string &path) {
256
1/2
✓ Branch 1 taken 161 times.
✗ Branch 2 not taken.
161 const string directory_path = MakeRelativePath(path);
257
1/2
✓ Branch 1 taken 161 times.
✗ Branch 2 not taken.
161 const string parent_path = GetParentPath(directory_path);
258
259 161 SyncLock();
260 WritableCatalog *catalog;
261
1/2
✓ Branch 1 taken 161 times.
✗ Branch 2 not taken.
161 DirectoryEntry parent_entry;
262
2/4
✓ Branch 1 taken 161 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 161 times.
161 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
263 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
264 directory_path.c_str());
265 }
266
267 161 parent_entry.set_linkcount(parent_entry.linkcount() - 1);
268
269
1/2
✓ Branch 1 taken 161 times.
✗ Branch 2 not taken.
161 catalog->RemoveEntry(directory_path);
270
1/2
✓ Branch 1 taken 161 times.
✗ Branch 2 not taken.
161 catalog->UpdateEntry(parent_entry, parent_path);
271
2/2
✓ Branch 1 taken 98 times.
✓ Branch 2 taken 63 times.
161 if (parent_entry.IsNestedCatalogRoot()) {
272
1/2
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
98 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
273 parent_path.c_str());
274 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
275 98 catalog->parent());
276 98 parent_entry.set_is_nested_catalog_mountpoint(true);
277 98 parent_entry.set_is_nested_catalog_root(false);
278
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 parent_catalog->UpdateEntry(parent_entry, parent_path);
279 }
280 161 SyncUnlock();
281 161 }
282
283 /**
284 * Clone the file called `source` changing its name into `destination`, the
285 * source file is keep intact.
286 * @params destination, the name of the new file, complete path
287 * @params source, the name of the file to clone, which must be already in the
288 * repository
289 * @return void
290 */
291 void WritableCatalogManager::Clone(const std::string destination,
292 const std::string source) {
293 const std::string relative_source = MakeRelativePath(source);
294
295 DirectoryEntry source_dirent;
296 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
297 PANIC(kLogStderr, "catalog for file '%s' cannot be found, aborting",
298 source.c_str());
299 }
300 if (source_dirent.IsDirectory()) {
301 PANIC(kLogStderr, "Trying to clone a directory: '%s', aborting",
302 source.c_str());
303 }
304
305 // if the file is already there we remove it and we add it back
306 DirectoryEntry check_dirent;
307 const bool destination_already_present = LookupPath(
308 MakeRelativePath(destination), kLookupDefault, &check_dirent);
309 if (destination_already_present) {
310 this->RemoveFile(destination);
311 }
312
313 DirectoryEntry destination_dirent(source_dirent);
314 std::string destination_dirname;
315 std::string destination_filename;
316 SplitPath(destination, &destination_dirname, &destination_filename);
317
318 destination_dirent.name_.Assign(
319 NameString(destination_filename.c_str(), destination_filename.length()));
320
321 // TODO(jblomer): clone is used by tarball engine and should eventually
322 // support extended attributes
323 this->AddFile(destination_dirent, empty_xattrs, destination_dirname);
324 }
325
326
327 /**
328 * Copies an entire directory tree from the existing from_dir to the
329 * non-existing to_dir. The destination's parent directory must exist. On the
330 * catalog level, the new entries will be identical to the old ones except
331 * for their path hash fields.
332 */
333 490 void WritableCatalogManager::CloneTree(const std::string &from_dir,
334 const std::string &to_dir) {
335 // Sanitize input paths
336
6/6
✓ Branch 1 taken 392 times.
✓ Branch 2 taken 98 times.
✓ Branch 4 taken 49 times.
✓ Branch 5 taken 343 times.
✓ Branch 6 taken 147 times.
✓ Branch 7 taken 343 times.
490 if (from_dir.empty() || to_dir.empty())
337 147 PANIC(kLogStderr, "clone tree from or to root impossible");
338
339
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 const std::string relative_source = MakeRelativePath(from_dir);
340
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 const std::string relative_dest = MakeRelativePath(to_dir);
341
342
2/2
✓ Branch 1 taken 49 times.
✓ Branch 2 taken 294 times.
343 if (relative_source == relative_dest) {
343 49 PANIC(kLogStderr, "cannot clone tree into itself ('%s')", to_dir.c_str());
344 }
345
4/7
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 294 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 49 times.
✓ Branch 8 taken 245 times.
294 if (HasPrefix(relative_dest, relative_source + "/", false /*ignore_case*/)) {
346 49 PANIC(kLogStderr,
347 "cannot clone tree into sub directory of source '%s' --> '%s'",
348 from_dir.c_str(), to_dir.c_str());
349 }
350
351
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 DirectoryEntry source_dirent;
352
3/4
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✓ Branch 4 taken 196 times.
245 if (!LookupPath(relative_source, kLookupDefault, &source_dirent)) {
353 49 PANIC(kLogStderr, "path '%s' cannot be found, aborting", from_dir.c_str());
354 }
355
2/2
✓ Branch 1 taken 49 times.
✓ Branch 2 taken 147 times.
196 if (!source_dirent.IsDirectory()) {
356 49 PANIC(kLogStderr, "CloneTree: source '%s' not a directory, aborting",
357 from_dir.c_str());
358 }
359
360
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 DirectoryEntry dest_dirent;
361
3/4
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✓ Branch 4 taken 98 times.
147 if (LookupPath(relative_dest, kLookupDefault, &dest_dirent)) {
362 49 PANIC(kLogStderr, "destination '%s' exists, aborting", to_dir.c_str());
363 }
364
365
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 const std::string dest_parent = GetParentPath(relative_dest);
366
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 DirectoryEntry dest_parent_dirent;
367
3/4
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✓ Branch 4 taken 49 times.
98 if (!LookupPath(dest_parent, kLookupDefault, &dest_parent_dirent)) {
368 49 PANIC(kLogStderr, "destination '%s' not on a known path, aborting",
369 to_dir.c_str());
370 }
371
372
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
49 CloneTreeImpl(PathString(from_dir),
373
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
98 GetParentPath(to_dir),
374
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 49 times.
✗ Branch 5 not taken.
98 NameString(GetFileName(to_dir)));
375 1029 }
376
377
378 /**
379 * Called from CloneTree(), assumes that from_dir and to_dir are sufficiently
380 * sanitized
381 */
382 392 void WritableCatalogManager::CloneTreeImpl(const PathString &source_dir,
383 const std::string &dest_parent_dir,
384 const NameString &dest_name) {
385
1/2
✓ Branch 4 taken 392 times.
✗ Branch 5 not taken.
392 LogCvmfs(kLogCatalog, kLogDebug, "cloning %s --> %s/%s", source_dir.c_str(),
386
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
784 dest_parent_dir.c_str(), dest_name.ToString().c_str());
387
3/6
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 392 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 392 times.
✗ Branch 8 not taken.
784 const PathString relative_source(MakeRelativePath(source_dir.ToString()));
388
389
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 DirectoryEntry source_dirent;
390
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 bool retval = LookupPath(relative_source, kLookupDefault, &source_dirent);
391
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 392 times.
392 assert(retval);
392
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 392 times.
392 assert(!source_dirent.IsBindMountpoint());
393
394
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 DirectoryEntry dest_dirent(source_dirent);
395
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 dest_dirent.name_.Assign(dest_name);
396 // Just in case, reset the nested catalog markers
397 392 dest_dirent.set_is_nested_catalog_mountpoint(false);
398 392 dest_dirent.set_is_nested_catalog_root(false);
399
400 392 XattrList xattrs;
401
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 392 times.
392 if (source_dirent.HasXattrs()) {
402 retval = LookupXattrs(relative_source, &xattrs);
403 assert(retval);
404 }
405
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 AddDirectory(dest_dirent, xattrs, dest_parent_dir);
406
407
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 std::string dest_dir = dest_parent_dir;
408
2/2
✓ Branch 1 taken 343 times.
✓ Branch 2 taken 49 times.
392 if (!dest_dir.empty())
409
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 dest_dir.push_back('/');
410
2/4
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 392 times.
✗ Branch 5 not taken.
392 dest_dir += dest_name.ToString();
411 392 if (source_dirent.IsNestedCatalogRoot()
412
5/6
✓ Branch 0 taken 245 times.
✓ Branch 1 taken 147 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 245 times.
✓ Branch 5 taken 147 times.
✓ Branch 6 taken 245 times.
392 || source_dirent.IsNestedCatalogMountpoint()) {
413
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 CreateNestedCatalog(dest_dir);
414 }
415
416 392 DirectoryEntryList ls;
417
1/2
✓ Branch 1 taken 392 times.
✗ Branch 2 not taken.
392 retval = Listing(relative_source, &ls, false /* expand_symlink */);
418
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 392 times.
392 assert(retval);
419
2/2
✓ Branch 1 taken 833 times.
✓ Branch 2 taken 392 times.
1225 for (unsigned i = 0; i < ls.size(); ++i) {
420
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 PathString sub_path(source_dir);
421
2/4
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 833 times.
833 assert(!sub_path.IsEmpty());
422
1/2
✓ Branch 1 taken 833 times.
✗ Branch 2 not taken.
833 sub_path.Append("/", 1);
423
3/6
✓ Branch 2 taken 833 times.
✗ Branch 3 not taken.
✓ Branch 7 taken 833 times.
✗ Branch 8 not taken.
✓ Branch 11 taken 833 times.
✗ Branch 12 not taken.
833 sub_path.Append(ls[i].name().GetChars(), ls[i].name().GetLength());
424
425
2/2
✓ Branch 2 taken 343 times.
✓ Branch 3 taken 490 times.
833 if (ls[i].IsDirectory()) {
426
2/4
✓ Branch 2 taken 343 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 343 times.
✗ Branch 6 not taken.
343 CloneTreeImpl(sub_path, dest_dir, ls[i].name());
427 343 continue;
428 }
429
430 // We break hard-links during cloning
431 490 ls[i].set_hardlink_group(0);
432 490 ls[i].set_linkcount(1);
433
434 490 xattrs.Clear();
435
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 490 times.
490 if (ls[i].HasXattrs()) {
436 retval = LookupXattrs(sub_path, &xattrs);
437 assert(retval);
438 }
439
440
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 490 times.
490 if (ls[i].IsChunkedFile()) {
441 FileChunkList chunks;
442 const std::string relative_sub_path = MakeRelativePath(
443 sub_path.ToString());
444 retval = ListFileChunks(PathString(relative_sub_path),
445 ls[i].hash_algorithm(), &chunks);
446 assert(retval);
447 AddChunkedFile(ls[i], xattrs, dest_dir, chunks);
448 } else {
449
1/2
✓ Branch 2 taken 490 times.
✗ Branch 3 not taken.
490 AddFile(ls[i], xattrs, dest_dir);
450 }
451
2/2
✓ Branch 1 taken 490 times.
✓ Branch 2 taken 343 times.
833 }
452 392 }
453
454
455 /**
456 * Add a new directory to the catalogs.
457 * @param entry a DirectoryEntry structure describing the new directory
458 * @param parent_directory the absolute path of the directory containing the
459 * directory to be created
460 * @return true on success, false otherwise
461 */
462 4470 void WritableCatalogManager::AddDirectory(const DirectoryEntryBase &entry,
463 const XattrList &xattrs,
464 const std::string &parent_directory) {
465
1/2
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
4470 const string parent_path = MakeRelativePath(parent_directory);
466
1/2
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
4470 string directory_path = parent_path + "/";
467
3/6
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
✓ Branch 5 taken 4470 times.
✗ Branch 6 not taken.
✓ Branch 9 taken 4470 times.
✗ Branch 10 not taken.
4470 directory_path.append(entry.name().GetChars(), entry.name().GetLength());
468
469 4470 SyncLock();
470 WritableCatalog *catalog;
471
1/2
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
4470 DirectoryEntry parent_entry;
472
2/4
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 4470 times.
4470 if (!FindCatalog(parent_path, &catalog, &parent_entry)) {
473 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
474 directory_path.c_str());
475 }
476
477
1/2
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
4470 DirectoryEntry fixed_hardlink_count(entry);
478 4470 fixed_hardlink_count.set_linkcount(2);
479
1/2
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
4470 catalog->AddEntry(fixed_hardlink_count, xattrs, directory_path, parent_path);
480
481 4470 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
482
1/2
✓ Branch 1 taken 4470 times.
✗ Branch 2 not taken.
4470 catalog->UpdateEntry(parent_entry, parent_path);
483
2/2
✓ Branch 1 taken 147 times.
✓ Branch 2 taken 4323 times.
4470 if (parent_entry.IsNestedCatalogRoot()) {
484
1/2
✓ Branch 2 taken 147 times.
✗ Branch 3 not taken.
147 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point %s",
485 parent_path.c_str());
486 WritableCatalog *parent_catalog = reinterpret_cast<WritableCatalog *>(
487 147 catalog->parent());
488 147 parent_entry.set_is_nested_catalog_mountpoint(true);
489 147 parent_entry.set_is_nested_catalog_root(false);
490
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 parent_catalog->UpdateEntry(parent_entry, parent_path);
491 }
492 4470 SyncUnlock();
493 4470 }
494
495 /**
496 * Add a new file to the catalogs.
497 * @param entry a DirectoryEntry structure describing the new file
498 * @param parent_directory the absolute path of the directory containing the
499 * file to be created
500 * @return true on success, false otherwise
501 */
502 5813 void WritableCatalogManager::AddFile(const DirectoryEntry &entry,
503 const XattrList &xattrs,
504 const std::string &parent_directory) {
505
1/2
✓ Branch 1 taken 5813 times.
✗ Branch 2 not taken.
5813 const string parent_path = MakeRelativePath(parent_directory);
506
1/2
✓ Branch 1 taken 5813 times.
✗ Branch 2 not taken.
5813 const string file_path = entry.GetFullPath(parent_path);
507
508 5813 SyncLock();
509 WritableCatalog *catalog;
510
2/4
✓ Branch 1 taken 5813 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 5813 times.
5813 if (!FindCatalog(parent_path, &catalog)) {
511 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
512 file_path.c_str());
513 }
514
515
4/6
✓ Branch 1 taken 5762 times.
✓ Branch 2 taken 51 times.
✓ Branch 4 taken 5762 times.
✗ Branch 5 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 5762 times.
5813 assert(!entry.IsRegular() || entry.IsChunkedFile()
516 || !entry.checksum().IsNull());
517
3/4
✓ Branch 1 taken 51 times.
✓ Branch 2 taken 5762 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 51 times.
5813 assert(entry.IsRegular() || !entry.IsExternalFile());
518
519 // check if file is too big
520
1/2
✓ Branch 1 taken 5813 times.
✗ Branch 2 not taken.
5813 const unsigned mbytes = entry.size() / (1024 * 1024);
521
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5813 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
5813 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
522 LogCvmfs(kLogCatalog, kLogStderr,
523 "%s: file at %s is larger than %u megabytes (%u). "
524 "CernVM-FS works best with small files. "
525 "Please remove the file or increase the limit.",
526 enforce_limits_ ? "FATAL" : "WARNING", file_path.c_str(),
527 file_mbyte_limit_, mbytes);
528 if (enforce_limits_)
529 PANIC(kLogStderr, "file at %s is larger than %u megabytes (%u).",
530 file_path.c_str(), file_mbyte_limit_, mbytes);
531 }
532
533
1/2
✓ Branch 1 taken 5813 times.
✗ Branch 2 not taken.
5813 catalog->AddEntry(entry, xattrs, file_path, parent_path);
534 5813 SyncUnlock();
535 5813 }
536
537
538 void WritableCatalogManager::AddChunkedFile(const DirectoryEntryBase &entry,
539 const XattrList &xattrs,
540 const std::string &parent_directory,
541 const FileChunkList &file_chunks) {
542 assert(file_chunks.size() > 0);
543
544 DirectoryEntry full_entry(entry);
545 full_entry.set_is_chunked_file(true);
546
547 AddFile(full_entry, xattrs, parent_directory);
548
549 const string parent_path = MakeRelativePath(parent_directory);
550 const string file_path = entry.GetFullPath(parent_path);
551
552 SyncLock();
553 WritableCatalog *catalog;
554 if (!FindCatalog(parent_path, &catalog)) {
555 PANIC(kLogStderr, "catalog for file '%s' cannot be found",
556 file_path.c_str());
557 }
558
559 for (unsigned i = 0; i < file_chunks.size(); ++i) {
560 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
561 }
562 SyncUnlock();
563 }
564
565
566 /**
567 * Add a hardlink group to the catalogs.
568 * @param entries a list of DirectoryEntries describing the new files
569 * @param parent_directory the absolute path of the directory containing the
570 * files to be created
571 * @return true on success, false otherwise
572 */
573 void WritableCatalogManager::AddHardlinkGroup(
574 const DirectoryEntryBaseList &entries,
575 const XattrList &xattrs,
576 const std::string &parent_directory,
577 const FileChunkList &file_chunks) {
578 assert(entries.size() >= 1);
579 assert(file_chunks.IsEmpty() || entries[0].IsRegular());
580 if (entries.size() == 1) {
581 DirectoryEntry fix_linkcount(entries[0]);
582 fix_linkcount.set_linkcount(1);
583 if (file_chunks.IsEmpty())
584 return AddFile(fix_linkcount, xattrs, parent_directory);
585 return AddChunkedFile(fix_linkcount, xattrs, parent_directory, file_chunks);
586 }
587
588 LogCvmfs(kLogCatalog, kLogVerboseMsg, "adding hardlink group %s/%s",
589 parent_directory.c_str(), entries[0].name().c_str());
590
591 // Hardlink groups have to reside in the same directory.
592 // Therefore we only have one parent directory here
593 const string parent_path = MakeRelativePath(parent_directory);
594
595 // check if hard link is too big
596 const unsigned mbytes = entries[0].size() / (1024 * 1024);
597 if ((file_mbyte_limit_ > 0) && (mbytes > file_mbyte_limit_)) {
598 LogCvmfs(kLogCatalog, kLogStderr,
599 "%s: hard link at %s is larger than %u megabytes (%u). "
600 "CernVM-FS works best with small files. "
601 "Please remove the file or increase the limit.",
602 enforce_limits_ ? "FATAL" : "WARNING",
603 (parent_path + entries[0].name().ToString()).c_str(),
604 file_mbyte_limit_, mbytes);
605 if (enforce_limits_)
606 PANIC(kLogStderr, "hard link at %s is larger than %u megabytes (%u)",
607 (parent_path + entries[0].name().ToString()).c_str(),
608 file_mbyte_limit_, mbytes);
609 }
610
611 SyncLock();
612 WritableCatalog *catalog;
613 if (!FindCatalog(parent_path, &catalog)) {
614 PANIC(kLogStderr,
615 "catalog for hardlink group containing '%s' cannot be found",
616 parent_path.c_str());
617 }
618
619 // Get a valid hardlink group id for the catalog the group will end up in
620 // TODO(unknown): Compaction
621 const uint32_t new_group_id = catalog->GetMaxLinkId() + 1;
622 LogCvmfs(kLogCatalog, kLogVerboseMsg, "hardlink group id %u issued",
623 new_group_id);
624 assert(new_group_id > 0);
625
626 // Add the file entries to the catalog
627 for (DirectoryEntryBaseList::const_iterator i = entries.begin(),
628 iEnd = entries.end();
629 i != iEnd;
630 ++i) {
631 string file_path = parent_path + "/";
632 file_path.append(i->name().GetChars(), i->name().GetLength());
633
634 // create a fully fledged DirectoryEntry to add the hardlink group to it
635 // which is CVMFS specific meta data.
636 DirectoryEntry hardlink(*i);
637 hardlink.set_hardlink_group(new_group_id);
638 hardlink.set_linkcount(entries.size());
639 hardlink.set_is_chunked_file(!file_chunks.IsEmpty());
640
641 catalog->AddEntry(hardlink, xattrs, file_path, parent_path);
642 if (hardlink.IsChunkedFile()) {
643 for (unsigned i = 0; i < file_chunks.size(); ++i) {
644 catalog->AddFileChunk(file_path, *file_chunks.AtPtr(i));
645 }
646 }
647 }
648 SyncUnlock();
649 }
650
651
652 void WritableCatalogManager::ShrinkHardlinkGroup(const string &remove_path) {
653 const string relative_path = MakeRelativePath(remove_path);
654
655 SyncLock();
656 WritableCatalog *catalog;
657 if (!FindCatalog(relative_path, &catalog)) {
658 PANIC(kLogStderr,
659 "catalog for hardlink group containing '%s' cannot be found",
660 remove_path.c_str());
661 }
662
663 catalog->IncLinkcount(relative_path, -1);
664 SyncUnlock();
665 }
666
667
668 /**
669 * Update entry meta data (mode, owner, ...).
670 * CVMFS specific meta data (i.e. nested catalog transition points) are NOT
671 * changed by this method, although transition points intrinsics are taken into
672 * account, to keep nested catalogs consistent.
673 * @param entry the directory entry to be touched
674 * @param path the path of the directory entry to be touched
675 */
676 7 void WritableCatalogManager::TouchDirectory(const DirectoryEntryBase &entry,
677 const XattrList &xattrs,
678 const std::string &directory_path) {
679
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 assert(entry.IsDirectory());
680
681
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 const string entry_path = MakeRelativePath(directory_path);
682
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 const string parent_path = GetParentPath(entry_path);
683
684 7 SyncLock();
685 // find the catalog to be updated
686 WritableCatalog *catalog;
687
2/4
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 7 times.
7 if (!FindCatalog(parent_path, &catalog)) {
688 PANIC(kLogStderr, "catalog for entry '%s' cannot be found",
689 entry_path.c_str());
690 }
691
692
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 catalog->TouchEntry(entry, xattrs, entry_path);
693
694 // since we deal with a directory here, we might just touch a
695 // nested catalog transition point. If this is the case we would need to
696 // update two catalog entries:
697 // * the nested catalog MOUNTPOINT in the parent catalog
698 // * the nested catalog ROOT in the nested catalog
699
700 // first check if we really have a nested catalog transition point
701
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 catalog::DirectoryEntry potential_transition_point;
702
1/2
✓ Branch 3 taken 7 times.
✗ Branch 4 not taken.
7 const PathString transition_path(entry_path.data(), entry_path.length());
703
1/2
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
7 bool retval = catalog->LookupPath(transition_path,
704 &potential_transition_point);
705
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
7 assert(retval);
706
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 7 times.
7 if (potential_transition_point.IsNestedCatalogMountpoint()) {
707 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating transition point at %s",
708 entry_path.c_str());
709
710 // find and mount nested catalog associated to this transition point
711 shash::Any nested_hash;
712 uint64_t nested_size;
713 retval = catalog->FindNested(transition_path, &nested_hash, &nested_size);
714 assert(retval);
715 Catalog *nested_catalog;
716 nested_catalog = MountCatalog(transition_path, nested_hash, catalog);
717 assert(nested_catalog != NULL);
718
719 // update nested catalog root in the child catalog
720 reinterpret_cast<WritableCatalog *>(nested_catalog)
721 ->TouchEntry(entry, xattrs, entry_path);
722 }
723
724 7 SyncUnlock();
725 7 }
726
727
728 /**
729 * Create a new nested catalog. Includes moving all entries belonging there
730 * from it's parent catalog.
731 * @param mountpoint the path of the directory to become a nested root
732 * @return true on success, false otherwise
733 */
734 1325 void WritableCatalogManager::CreateNestedCatalog(
735 const std::string &mountpoint) {
736
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 const string nested_root_path = MakeRelativePath(mountpoint);
737
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 const PathString ps_nested_root_path(nested_root_path);
738
739 1325 SyncLock();
740 // Find the catalog currently containing the directory structure, which
741 // will be represented as a new nested catalog and its root-entry/mountpoint
742 // along the way
743 1325 WritableCatalog *old_catalog = NULL;
744
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 DirectoryEntry new_root_entry;
745
2/4
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1325 times.
1325 if (!FindCatalog(nested_root_path, &old_catalog, &new_root_entry)) {
746 PANIC(kLogStderr,
747 "failed to create nested catalog '%s': "
748 "mountpoint was not found in current catalog structure",
749 nested_root_path.c_str());
750 }
751
752 // Create the database schema and the initial root entry
753 // for the new nested catalog
754
1/2
✓ Branch 2 taken 1325 times.
✗ Branch 3 not taken.
1325 const string database_file_path = CreateTempPath(dir_temp() + "/catalog",
755
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 0666);
756 1325 const bool volatile_content = false;
757
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 CatalogDatabase *new_catalog_db = CatalogDatabase::Create(database_file_path);
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1325 times.
1325 assert(NULL != new_catalog_db);
759 // Note we do not set the external_data bit for nested catalogs
760
2/4
✓ Branch 2 taken 1325 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 1325 times.
✗ Branch 6 not taken.
1325 bool retval = new_catalog_db->InsertInitialValues(
761 nested_root_path,
762 volatile_content,
763 "", // At this point, only root
764 // catalog gets VOMS authz
765 new_root_entry);
766
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1325 times.
1325 assert(retval);
767 // TODO(rmeusel): we need a way to attach a catalog directly from an open
768 // database to remove this indirection
769
1/2
✓ Branch 0 taken 1325 times.
✗ Branch 1 not taken.
1325 delete new_catalog_db;
770 1325 new_catalog_db = NULL;
771
772 // Attach the just created nested catalog
773
2/4
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1325 times.
✗ Branch 5 not taken.
1325 Catalog *new_catalog = CreateCatalog(ps_nested_root_path, shash::Any(),
774 old_catalog);
775
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 retval = AttachCatalog(database_file_path, new_catalog);
776
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1325 times.
1325 assert(retval);
777
778
2/4
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 1325 times.
1325 assert(new_catalog->IsWritable());
779 1325 WritableCatalog *wr_new_catalog = static_cast<WritableCatalog *>(new_catalog);
780
781
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1325 times.
1325 if (new_root_entry.HasXattrs()) {
782 XattrList xattrs;
783 retval = old_catalog->LookupXattrsPath(ps_nested_root_path, &xattrs);
784 assert(retval);
785 wr_new_catalog->TouchEntry(new_root_entry, xattrs, nested_root_path);
786 }
787
788 // From now on, there are two catalogs, spanning the same directory structure
789 // we have to split the overlapping directory entries from the old catalog
790 // to the new catalog to re-gain a valid catalog structure
791
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 old_catalog->Partition(wr_new_catalog);
792
793 // Add the newly created nested catalog to the references of the containing
794 // catalog
795
4/8
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1325 times.
✗ Branch 5 not taken.
✓ Branch 7 taken 1325 times.
✗ Branch 8 not taken.
✓ Branch 10 taken 1325 times.
✗ Branch 11 not taken.
1325 old_catalog->InsertNestedCatalog(new_catalog->mountpoint().ToString(), NULL,
796 1325 shash::Any(spooler_->GetHashAlgorithm()), 0);
797
798 // Fix subtree counters in new nested catalogs: subtree is the sum of all
799 // entries of all "grand-nested" catalogs
800 // Note: taking a copy of the nested catalog list here
801 const Catalog::NestedCatalogList
802
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 &grand_nested = wr_new_catalog->ListOwnNestedCatalogs();
803
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 DeltaCounters fix_subtree_counters;
804 2650 for (Catalog::NestedCatalogList::const_iterator i = grand_nested.begin(),
805 1325 iEnd = grand_nested.end();
806
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1325 times.
1325 i != iEnd;
807 ++i) {
808 WritableCatalog *grand_catalog;
809 retval = FindCatalog(i->mountpoint.ToString(), &grand_catalog);
810 assert(retval);
811 const Counters &grand_counters = grand_catalog->GetCounters();
812 grand_counters.AddAsSubtree(&fix_subtree_counters);
813 }
814 1325 const DeltaCounters save_counters = wr_new_catalog->delta_counters_;
815 1325 wr_new_catalog->delta_counters_ = fix_subtree_counters;
816
1/2
✓ Branch 1 taken 1325 times.
✗ Branch 2 not taken.
1325 wr_new_catalog->UpdateCounters();
817 1325 wr_new_catalog->delta_counters_ = save_counters;
818
819 1325 SyncUnlock();
820 1325 }
821
822
823 /**
824 * Remove a nested catalog
825 *
826 * If the merged parameter is true, when you remove a nested catalog
827 * all entries currently held by it will be merged into its parent
828 * catalog.
829 * @param mountpoint - the path of the nested catalog to be removed
830 * @param merge - merge the subtree associated with the nested catalog
831 * into its parent catalog
832 * @return - true on success, false otherwise
833 */
834 105 void WritableCatalogManager::RemoveNestedCatalog(const string &mountpoint,
835 const bool merge) {
836
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 const string nested_root_path = MakeRelativePath(mountpoint);
837
838 105 SyncLock();
839 // Find the catalog which should be removed
840 105 WritableCatalog *nested_catalog = NULL;
841
2/4
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 105 times.
105 if (!FindCatalog(nested_root_path, &nested_catalog)) {
842 PANIC(kLogStderr,
843 "failed to remove nested catalog '%s': "
844 "mountpoint was not found in current catalog structure",
845 nested_root_path.c_str());
846 }
847
848 // Check if the found catalog is really the nested catalog to be deleted
849
6/19
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 105 times.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 105 times.
✗ Branch 8 not taken.
✗ Branch 9 not taken.
✓ Branch 10 taken 105 times.
✗ Branch 11 not taken.
✗ Branch 12 not taken.
✓ Branch 13 taken 105 times.
✗ Branch 14 not taken.
✓ Branch 16 taken 105 times.
✗ Branch 17 not taken.
✗ Branch 19 not taken.
✗ Branch 20 not taken.
✗ Branch 22 not taken.
✗ Branch 23 not taken.
210 assert(!nested_catalog->IsRoot()
850 && (nested_catalog->mountpoint().ToString() == nested_root_path));
851
852
1/2
✓ Branch 0 taken 105 times.
✗ Branch 1 not taken.
105 if (merge) {
853 // Merge all data from the nested catalog into it's parent
854
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 nested_catalog->MergeIntoParent();
855 } else {
856 nested_catalog->RemoveFromParent();
857 }
858
859 // Delete the catalog database file from the working copy
860
2/6
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 105 times.
105 if (unlink(nested_catalog->database_path().c_str()) != 0) {
861 PANIC(kLogStderr,
862 "unable to delete the removed nested catalog database file '%s'",
863 nested_catalog->database_path().c_str());
864 }
865
866 // Remove the catalog from internal data structures
867
1/2
✓ Branch 1 taken 105 times.
✗ Branch 2 not taken.
105 DetachCatalog(nested_catalog);
868 105 SyncUnlock();
869 105 }
870
871
872 /**
873 * Swap in a new nested catalog
874 *
875 * The old nested catalog must not have been already attached to the
876 * catalog tree. This method will not attach the new nested catalog
877 * to the catalog tree.
878 *
879 * @param mountpoint - the path of the nested catalog to be removed
880 * @param new_hash - the hash of the new nested catalog
881 * @param new_size - the size of the new nested catalog
882 */
883 343 void WritableCatalogManager::SwapNestedCatalog(const string &mountpoint,
884 const shash::Any &new_hash,
885 const uint64_t new_size) {
886
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 const string nested_root_path = MakeRelativePath(mountpoint);
887
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 const string parent_path = GetParentPath(nested_root_path);
888
1/2
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
343 const PathString nested_root_ps = PathString(nested_root_path);
889
890 343 SyncLock();
891
892 // Find the immediate parent catalog
893 343 WritableCatalog *parent = NULL;
894
3/4
✓ Branch 1 taken 343 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✓ Branch 4 taken 294 times.
343 if (!FindCatalog(parent_path, &parent)) {
895 49 SyncUnlock(); // this is needed for the unittest. otherwise they get stuck
896 49 PANIC(kLogStderr,
897 "failed to swap nested catalog '%s': could not find parent '%s'",
898 nested_root_path.c_str(), parent_path.c_str());
899 }
900
901 // Get old nested catalog counters
902
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 Catalog *old_attached_catalog = parent->FindChild(nested_root_ps);
903
1/2
✓ Branch 1 taken 294 times.
✗ Branch 2 not taken.
294 Counters old_counters;
904
2/2
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 245 times.
294 if (old_attached_catalog) {
905 // Old catalog was already attached (e.g. as a child catalog
906 // attached by a prior call to CreateNestedCatalog()). Ensure
907 // that it has not been modified, get counters, and detach it.
908 49 WritableCatalogList list;
909
2/4
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
49 if (GetModifiedCatalogLeafsRecursively(old_attached_catalog, &list)) {
910 49 SyncUnlock();
911 49 PANIC(kLogStderr,
912 "failed to swap nested catalog '%s': already modified",
913 nested_root_path.c_str());
914 }
915 old_counters = old_attached_catalog->GetCounters();
916 DetachSubtree(old_attached_catalog);
917
918 49 } else {
919 // Old catalog was not attached. Download a freely attached
920 // version and get counters.
921
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 shash::Any old_hash;
922 uint64_t old_size;
923
1/2
✓ Branch 1 taken 245 times.
✗ Branch 2 not taken.
245 const bool old_found = parent->FindNested(nested_root_ps, &old_hash,
924 &old_size);
925
2/2
✓ Branch 0 taken 49 times.
✓ Branch 1 taken 196 times.
245 if (!old_found) {
926 49 SyncUnlock();
927 49 PANIC(kLogStderr,
928 "failed to swap nested catalog '%s': not found in parent",
929 nested_root_path.c_str());
930 }
931 const UniquePtr<Catalog> old_free_catalog(
932
2/4
✓ Branch 1 taken 196 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 196 times.
✗ Branch 5 not taken.
196 LoadFreeCatalog(nested_root_ps, old_hash));
933
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 196 times.
196 if (!old_free_catalog.IsValid()) {
934 SyncUnlock();
935 PANIC(kLogStderr,
936 "failed to swap nested catalog '%s': failed to load old catalog",
937 nested_root_path.c_str());
938 }
939 196 old_counters = old_free_catalog->GetCounters();
940 196 }
941
942 // Load freely attached new catalog
943 const UniquePtr<Catalog> new_catalog(
944
3/4
✓ Branch 1 taken 147 times.
✓ Branch 2 taken 49 times.
✓ Branch 4 taken 147 times.
✗ Branch 5 not taken.
196 LoadFreeCatalog(nested_root_ps, new_hash));
945
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
147 if (!new_catalog.IsValid()) {
946 SyncUnlock();
947 PANIC(kLogStderr,
948 "failed to swap nested catalog '%s': failed to load new catalog",
949 nested_root_path.c_str());
950 }
951
952 // Get new catalog root directory entry
953
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 DirectoryEntry dirent;
954 147 XattrList xattrs;
955
1/2
✓ Branch 2 taken 147 times.
✗ Branch 3 not taken.
147 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
956
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 147 times.
147 if (!dirent_found) {
957 SyncUnlock();
958 PANIC(kLogStderr,
959 "failed to swap nested catalog '%s': missing dirent in new catalog",
960 nested_root_path.c_str());
961 }
962
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
147 if (dirent.HasXattrs()) {
963 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
964 &xattrs);
965 if (!xattrs_found) {
966 SyncUnlock();
967 PANIC(kLogStderr,
968 "failed to swap nested catalog '%s': missing xattrs in new catalog",
969 nested_root_path.c_str());
970 }
971 }
972
973 // Swap catalogs
974
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 parent->RemoveNestedCatalog(nested_root_path, NULL);
975
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 parent->InsertNestedCatalog(nested_root_path, NULL, new_hash, new_size);
976
977 // Update parent directory entry
978 147 dirent.set_is_nested_catalog_mountpoint(true);
979 147 dirent.set_is_nested_catalog_root(false);
980
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 parent->UpdateEntry(dirent, nested_root_path);
981
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 parent->TouchEntry(dirent, xattrs, nested_root_path);
982
983 // Update counters
984
1/2
✓ Branch 3 taken 147 times.
✗ Branch 4 not taken.
147 const DeltaCounters delta = Counters::Diff(old_counters,
985 new_catalog->GetCounters());
986
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 delta.PopulateToParent(&parent->delta_counters_);
987
988 147 SyncUnlock();
989 735 }
990
991 /**
992 * Install a nested catalog (catalog hierarchy) at a new, empty directory
993 *
994 * The mountpoint directory must not yet exist. Its parent directory, however
995 * must exist. This method combines functionality from AddDirectory(),
996 * CreateNestedCatalog() and SwapNestedCatalog().
997 * The new nested catalog won't get attached.
998 *
999 * @param mountpoint - the path where the nested catalog should be installed
1000 * @param new_hash - the hash of the new nested catalog
1001 * @param new_size - the size of the new nested catalog
1002 */
1003 147 void WritableCatalogManager::GraftNestedCatalog(const string &mountpoint,
1004 const shash::Any &new_hash,
1005 const uint64_t new_size) {
1006
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 const string nested_root_path = MakeRelativePath(mountpoint);
1007
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 const string parent_path = GetParentPath(nested_root_path);
1008
1/2
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
147 const PathString nested_root_ps = PathString(nested_root_path);
1009
1010
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
147 assert(!nested_root_path.empty());
1011
1012 // Load freely attached new catalog
1013 const UniquePtr<Catalog> new_catalog(
1014
2/4
✓ Branch 1 taken 147 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 147 times.
✗ Branch 5 not taken.
147 LoadFreeCatalog(nested_root_ps, new_hash));
1015
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 147 times.
147 if (!new_catalog.IsValid()) {
1016 PANIC(kLogStderr,
1017 "failed to graft nested catalog '%s': failed to load new catalog",
1018 nested_root_path.c_str());
1019 }
1020
4/7
✓ Branch 2 taken 147 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 147 times.
✗ Branch 6 not taken.
✓ Branch 8 taken 49 times.
✓ Branch 9 taken 98 times.
147 if (new_catalog->root_prefix() != nested_root_ps) {
1021
2/4
✓ Branch 2 taken 49 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 49 times.
✗ Branch 6 not taken.
147 PANIC(kLogStderr,
1022 "invalid nested catalog for grafting at '%s': catalog rooted at '%s'",
1023 nested_root_path.c_str(),
1024 new_catalog->root_prefix().ToString().c_str());
1025 }
1026
1027 // Get new catalog root directory entry
1028
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 DirectoryEntry dirent;
1029 98 XattrList xattrs;
1030
1/2
✓ Branch 2 taken 98 times.
✗ Branch 3 not taken.
98 const bool dirent_found = new_catalog->LookupPath(nested_root_ps, &dirent);
1031
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (!dirent_found) {
1032 PANIC(kLogStderr,
1033 "failed to swap nested catalog '%s': missing dirent in new catalog",
1034 nested_root_path.c_str());
1035 }
1036
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 98 times.
98 if (dirent.HasXattrs()) {
1037 const bool xattrs_found = new_catalog->LookupXattrsPath(nested_root_ps,
1038 &xattrs);
1039 if (!xattrs_found) {
1040 PANIC(kLogStderr,
1041 "failed to swap nested catalog '%s': missing xattrs in new catalog",
1042 nested_root_path.c_str());
1043 }
1044 }
1045 // Transform the nested catalog root into a transition point to be inserted
1046 // in the parent catalog
1047 98 dirent.set_is_nested_catalog_root(false);
1048 98 dirent.set_is_nested_catalog_mountpoint(true);
1049
1050 // Add directory and nested catalog
1051
1052 98 SyncLock();
1053 WritableCatalog *parent_catalog;
1054
1/2
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
98 DirectoryEntry parent_entry;
1055
2/4
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 98 times.
98 if (!FindCatalog(parent_path, &parent_catalog, &parent_entry)) {
1056 SyncUnlock();
1057 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1058 parent_path.c_str());
1059 }
1060
3/4
✓ Branch 1 taken 98 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 49 times.
✓ Branch 4 taken 49 times.
98 if (parent_catalog->LookupPath(nested_root_ps, NULL)) {
1061 49 SyncUnlock();
1062 49 PANIC(kLogStderr,
1063 "invalid attempt to graft nested catalog into existing "
1064 "directory '%s'",
1065 nested_root_path.c_str());
1066 }
1067
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 parent_catalog->AddEntry(dirent, xattrs, nested_root_path, parent_path);
1068 49 parent_entry.set_linkcount(parent_entry.linkcount() + 1);
1069
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 parent_catalog->UpdateEntry(parent_entry, parent_path);
1070
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 if (parent_entry.IsNestedCatalogRoot()) {
1071 WritableCatalog *grand_parent_catalog = reinterpret_cast<WritableCatalog *>(
1072 49 parent_catalog->parent());
1073 49 parent_entry.set_is_nested_catalog_root(false);
1074 49 parent_entry.set_is_nested_catalog_mountpoint(true);
1075
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 grand_parent_catalog->UpdateEntry(parent_entry, parent_path);
1076 }
1077
1078
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 parent_catalog->InsertNestedCatalog(nested_root_path, NULL, new_hash,
1079 new_size);
1080
1081 // Fix-up counters
1082
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 const Counters counters;
1083
1/2
✓ Branch 3 taken 49 times.
✗ Branch 4 not taken.
49 const DeltaCounters delta = Counters::Diff(counters,
1084 new_catalog->GetCounters());
1085
1/2
✓ Branch 1 taken 49 times.
✗ Branch 2 not taken.
49 delta.PopulateToParent(&parent_catalog->delta_counters_);
1086
1087 49 SyncUnlock();
1088 588 }
1089
1090 /**
1091 * Checks if a nested catalog starts at this path. The path must be valid.
1092 */
1093 bool WritableCatalogManager::IsTransitionPoint(const string &mountpoint) {
1094 const string path = MakeRelativePath(mountpoint);
1095
1096 SyncLock();
1097 WritableCatalog *catalog;
1098 DirectoryEntry entry;
1099 if (!FindCatalog(path, &catalog, &entry)) {
1100 PANIC(kLogStderr, "catalog for directory '%s' cannot be found",
1101 path.c_str());
1102 }
1103 const bool result = entry.IsNestedCatalogRoot();
1104 SyncUnlock();
1105 return result;
1106 }
1107
1108
1109 void WritableCatalogManager::PrecalculateListings() {
1110 // TODO(jblomer): meant for micro catalogs
1111 }
1112
1113
1114 void WritableCatalogManager::SetTTL(const uint64_t new_ttl) {
1115 SyncLock();
1116 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetTTL(new_ttl);
1117 SyncUnlock();
1118 }
1119
1120
1121 bool WritableCatalogManager::SetVOMSAuthz(const std::string &voms_authz) {
1122 bool result;
1123 SyncLock();
1124 result = reinterpret_cast<WritableCatalog *>(GetRootCatalog())
1125 ->SetVOMSAuthz(voms_authz);
1126 SyncUnlock();
1127 return result;
1128 }
1129
1130
1131 612 bool WritableCatalogManager::Commit(const bool stop_for_tweaks,
1132 const uint64_t manual_revision,
1133 manifest::Manifest *manifest) {
1134 WritableCatalog *root_catalog = reinterpret_cast<WritableCatalog *>(
1135 612 GetRootCatalog());
1136
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 root_catalog->SetDirty();
1137
1138 // set root catalog revision to manually provided number if available
1139
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 612 times.
612 if (manual_revision > 0) {
1140 const uint64_t revision = root_catalog->GetRevision();
1141 if (revision >= manual_revision) {
1142 LogCvmfs(kLogCatalog, kLogStderr,
1143 "Manual revision (%" PRIu64 ") must not be "
1144 "smaller than the current root catalog's (%" PRIu64
1145 "). Skipped!",
1146 manual_revision, revision);
1147 } else {
1148 // Gets incremented by FinalizeCatalog() afterwards!
1149 root_catalog->SetRevision(manual_revision - 1);
1150 }
1151 }
1152
1153 // do the actual catalog snapshotting and upload
1154
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 CatalogInfo root_catalog_info;
1155
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 if (getenv("_CVMFS_SERIALIZED_CATALOG_PROCESSING_") == NULL)
1156
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 root_catalog_info = SnapshotCatalogs(stop_for_tweaks);
1157 else
1158 root_catalog_info = SnapshotCatalogsSerialized(stop_for_tweaks);
1159
2/4
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
✓ Branch 4 taken 612 times.
612 if (spooler_->GetNumberOfErrors() > 0) {
1160 LogCvmfs(kLogCatalog, kLogStderr, "failed to commit catalogs");
1161 return false;
1162 }
1163
1164 // .cvmfspublished export
1165
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 LogCvmfs(kLogCatalog, kLogVerboseMsg, "Committing repository manifest");
1166 612 set_base_hash(root_catalog_info.content_hash);
1167
1168 612 manifest->set_catalog_hash(root_catalog_info.content_hash);
1169 612 manifest->set_catalog_size(root_catalog_info.size);
1170
2/4
✓ Branch 2 taken 612 times.
✗ Branch 3 not taken.
✓ Branch 5 taken 612 times.
✗ Branch 6 not taken.
612 manifest->set_root_path("");
1171 612 manifest->set_ttl(root_catalog_info.ttl);
1172 612 manifest->set_revision(root_catalog_info.revision);
1173
1174 612 return true;
1175 }
1176
1177
1178 /**
1179 * Handles the snapshotting of dirty (i.e. modified) catalogs while trying to
1180 * parallelize the compression and upload as much as possible. We use a parallel
1181 * depth first post order tree traversal based on 'continuations'.
1182 *
1183 * The idea is as follows:
1184 * 1. find all leaf-catalogs (i.e. dirty catalogs with no dirty children)
1185 * --> these can be processed and uploaded immediately and independently
1186 * see WritableCatalogManager::GetModifiedCatalogLeafs()
1187 * 2. annotate non-leaf catalogs with their number of dirty children
1188 * --> a finished child will notify it's parent and decrement this number
1189 * see WritableCatalogManager::CatalogUploadCallback()
1190 * 3. if a non-leaf catalog's dirty children number reaches 0, it is scheduled
1191 * for processing as well (continuation)
1192 * --> the parallel processing walks bottom-up through the catalog tree
1193 * see WritableCatalogManager::CatalogUploadCallback()
1194 * 4. when the root catalog is reached, we notify the main thread and return
1195 * --> done through a Future<> in WritableCatalogManager::SnapshotCatalogs
1196 *
1197 * Note: The catalog finalisation (see WritableCatalogManager::FinalizeCatalog)
1198 * happens in a worker thread (i.e. the callback method) for non-leaf
1199 * catalogs.
1200 *
1201 * TODO(rmeusel): since all leaf catalogs are finalized in the main thread, we
1202 * sacrifice some potential concurrency for simplicity.
1203 */
1204 612 WritableCatalogManager::CatalogInfo WritableCatalogManager::SnapshotCatalogs(
1205 const bool stop_for_tweaks) {
1206 // prepare environment for parallel processing
1207
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 Future<CatalogInfo> root_catalog_info_future;
1208 CatalogUploadContext upload_context;
1209 612 upload_context.root_catalog_info = &root_catalog_info_future;
1210 612 upload_context.stop_for_tweaks = stop_for_tweaks;
1211
1212
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 spooler_->RegisterListener(&WritableCatalogManager::CatalogUploadCallback,
1213 this, upload_context);
1214
1215 // find dirty leaf catalogs and annotate non-leaf catalogs (dirty child count)
1216 // post-condition: the entire catalog tree is ready for concurrent processing
1217 612 WritableCatalogList leafs_to_snapshot;
1218
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 GetModifiedCatalogLeafs(&leafs_to_snapshot);
1219
1220 // finalize and schedule the catalog processing
1221 612 WritableCatalogList::const_iterator i = leafs_to_snapshot.begin();
1222 612 const WritableCatalogList::const_iterator iend = leafs_to_snapshot.end();
1223
2/2
✓ Branch 2 taken 995 times.
✓ Branch 3 taken 612 times.
1607 for (; i != iend; ++i) {
1224
1/2
✓ Branch 2 taken 995 times.
✗ Branch 3 not taken.
995 FinalizeCatalog(*i, stop_for_tweaks);
1225
1/2
✓ Branch 2 taken 995 times.
✗ Branch 3 not taken.
995 ScheduleCatalogProcessing(*i);
1226 }
1227
1228
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 LogCvmfs(kLogCatalog, kLogVerboseMsg, "waiting for upload of catalogs");
1229
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 const CatalogInfo &root_catalog_info = root_catalog_info_future.Get();
1230
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 spooler_->WaitForUpload();
1231
1232
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 spooler_->UnregisterListeners();
1233 612 return root_catalog_info;
1234 612 }
1235
1236
1237 1790 void WritableCatalogManager::FinalizeCatalog(WritableCatalog *catalog,
1238 const bool stop_for_tweaks) {
1239 // update meta information of this catalog
1240
1/3
✓ Branch 2 taken 1790 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1790 LogCvmfs(kLogCatalog, kLogVerboseMsg, "creating snapshot of catalog '%s'",
1241 3580 catalog->mountpoint().c_str());
1242
1243 1790 catalog->UpdateCounters();
1244 1790 catalog->UpdateLastModified();
1245 1790 catalog->IncrementRevision();
1246
1247 // update the previous catalog revision pointer
1248
2/2
✓ Branch 1 taken 612 times.
✓ Branch 2 taken 1178 times.
1790 if (catalog->IsRoot()) {
1249
1/4
✓ Branch 2 taken 612 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
612 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1250 "setting '%s' as previous revision "
1251 "for root catalog",
1252 1224 base_hash().ToStringWithSuffix().c_str());
1253 612 catalog->SetPreviousRevision(base_hash());
1254 } else {
1255 // Multiple catalogs might query the parent concurrently
1256 1178 SyncLock();
1257
1/2
✓ Branch 1 taken 1178 times.
✗ Branch 2 not taken.
1178 shash::Any hash_previous;
1258 uint64_t size_previous;
1259
1/2
✓ Branch 2 taken 1178 times.
✗ Branch 3 not taken.
2356 const bool retval = catalog->parent()->FindNested(
1260
1/2
✓ Branch 1 taken 1178 times.
✗ Branch 2 not taken.
2356 catalog->mountpoint(), &hash_previous, &size_previous);
1261
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1178 times.
1178 assert(retval);
1262 1178 SyncUnlock();
1263
1264
1/8
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1178 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✗ Branch 9 not taken.
✗ Branch 10 not taken.
2356 LogCvmfs(kLogCatalog, kLogVerboseMsg,
1265 "found '%s' as previous revision "
1266 "for nested catalog '%s'",
1267
1/2
✓ Branch 1 taken 1178 times.
✗ Branch 2 not taken.
2356 hash_previous.ToStringWithSuffix().c_str(),
1268
1/2
✓ Branch 1 taken 1178 times.
✗ Branch 2 not taken.
2356 catalog->mountpoint().c_str());
1269
1/2
✓ Branch 1 taken 1178 times.
✗ Branch 2 not taken.
1178 catalog->SetPreviousRevision(hash_previous);
1270 }
1271 1790 catalog->Commit();
1272
1273 // check if catalog has too many entries
1274 const uint64_t catalog_limit = uint64_t(1000)
1275
2/2
✓ Branch 1 taken 612 times.
✓ Branch 2 taken 1178 times.
1790 * uint64_t((catalog->IsRoot()
1276 612 ? root_kcatalog_limit_
1277 1178 : nested_kcatalog_limit_));
1278 1790 if ((catalog_limit > 0)
1279
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 1790 times.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✗ Branch 6 not taken.
✓ Branch 7 taken 1790 times.
1790 && (catalog->GetCounters().GetSelfEntries() > catalog_limit)) {
1280 LogCvmfs(kLogCatalog, kLogStderr,
1281 "%s: catalog at %s has more than %lu entries (%lu). "
1282 "Large catalogs stress the CernVM-FS transport infrastructure. "
1283 "Please split it into nested catalogs or increase the limit.",
1284 enforce_limits_ ? "FATAL" : "WARNING",
1285 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1286 catalog_limit, catalog->GetCounters().GetSelfEntries());
1287 if (enforce_limits_)
1288 PANIC(kLogStderr, "catalog at %s has more than %u entries (%u). ",
1289 (catalog->IsRoot() ? "/" : catalog->mountpoint().c_str()),
1290 catalog_limit, catalog->GetCounters().GetSelfEntries());
1291 }
1292
1293 // allow for manual adjustments in the catalog
1294
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1790 times.
1790 if (stop_for_tweaks) {
1295 LogCvmfs(kLogCatalog, kLogStdout,
1296 "Allowing for tweaks in %s at %s "
1297 "(hit return to continue)",
1298 catalog->database_path().c_str(), catalog->mountpoint().c_str());
1299 const int read_char = getchar();
1300 assert(read_char != EOF);
1301 }
1302
1303 // compaction of bloated catalogs (usually after high database churn)
1304 1790 catalog->VacuumDatabaseIfNecessary();
1305 1790 }
1306
1307
1308 1790 void WritableCatalogManager::ScheduleCatalogProcessing(
1309 WritableCatalog *catalog) {
1310 {
1311 1790 const MutexLockGuard guard(catalog_processing_lock_);
1312 // register catalog object for WritableCatalogManager::CatalogUploadCallback
1313
2/4
✓ Branch 1 taken 1790 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1790 times.
✗ Branch 5 not taken.
1790 catalog_processing_map_[catalog->database_path()] = catalog;
1314 1790 }
1315
1/2
✓ Branch 2 taken 1790 times.
✗ Branch 3 not taken.
1790 spooler_->ProcessCatalog(catalog->database_path());
1316 1790 }
1317
1318 /**
1319 * Copy catalog to local cache.server
1320 * Must be an atomic write into the cache_dir
1321 * As such: create a temporary copy in cache_dir/txn and then do a
1322 * `rename` (which is atomic) to the actual cache path
1323 *
1324 * @returns true on success, otherwise false
1325 */
1326 bool WritableCatalogManager::CopyCatalogToLocalCache(
1327 const upload::SpoolerResult &result) {
1328 std::string tmp_catalog_path;
1329 const std::string cache_catalog_path = dir_cache_ + "/"
1330 + result.content_hash
1331 .MakePathWithoutSuffix();
1332 FILE *fcatalog = CreateTempFile(dir_cache_ + "/txn/catalog", 0666, "w",
1333 &tmp_catalog_path);
1334 if (!fcatalog) {
1335 PANIC(kLogDebug | kLogStderr,
1336 "Creating file for temporary catalog failed: %s",
1337 tmp_catalog_path.c_str());
1338 }
1339 CopyPath2File(result.local_path.c_str(), fcatalog);
1340 (void)fclose(fcatalog);
1341
1342 if (rename(tmp_catalog_path.c_str(), cache_catalog_path.c_str()) != 0) {
1343 PANIC(kLogDebug | kLogStderr, "Failed to copy catalog from %s to cache %s",
1344 result.local_path.c_str(), cache_catalog_path.c_str());
1345 }
1346 return true;
1347 }
1348
1349 1790 void WritableCatalogManager::CatalogUploadCallback(
1350 const upload::SpoolerResult &result,
1351 const CatalogUploadContext catalog_upload_context) {
1352
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1790 times.
1790 if (result.return_code != 0) {
1353 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1354 result.local_path.c_str(), result.return_code);
1355 }
1356
1357 // retrieve the catalog object based on the callback information
1358 // see WritableCatalogManager::ScheduleCatalogProcessing()
1359 1790 WritableCatalog *catalog = NULL;
1360 {
1361 1790 const MutexLockGuard guard(catalog_processing_lock_);
1362 const std::map<std::string, WritableCatalog *>::iterator
1363
1/2
✓ Branch 1 taken 1790 times.
✗ Branch 2 not taken.
1790 c = catalog_processing_map_.find(result.local_path);
1364
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 1790 times.
1790 assert(c != catalog_processing_map_.end());
1365 1790 catalog = c->second;
1366 1790 }
1367
1368 1790 const uint64_t catalog_size = GetFileSize(result.local_path);
1369
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1790 times.
1790 assert(catalog_size > 0);
1370
1371
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1790 times.
1790 if (UseLocalCache()) {
1372 CopyCatalogToLocalCache(result);
1373 }
1374
1375 1790 SyncLock();
1376
2/2
✓ Branch 1 taken 1178 times.
✓ Branch 2 taken 612 times.
1790 if (catalog->HasParent()) {
1377 // finalized nested catalogs will update their parent's pointer and schedule
1378 // them for processing (continuation) if the 'dirty children count' == 0
1379 1178 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1380 1178 WritableCatalog *parent = catalog->GetWritableParent();
1381
1382
2/4
✓ Branch 1 taken 1178 times.
✗ Branch 2 not taken.
✓ Branch 4 taken 1178 times.
✗ Branch 5 not taken.
1178 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1383 1178 result.content_hash,
1384 catalog_size,
1385 1178 catalog->delta_counters_);
1386 1178 catalog->delta_counters_.SetZero();
1387
1388 const int remaining_dirty_children = catalog->GetWritableParent()
1389 1178 ->DecrementDirtyChildren();
1390
1391 1178 SyncUnlock();
1392
1393 // continuation of the dirty catalog tree traversal
1394 // see WritableCatalogManager::SnapshotCatalogs()
1395
2/2
✓ Branch 0 taken 795 times.
✓ Branch 1 taken 383 times.
1178 if (remaining_dirty_children == 0) {
1396 795 FinalizeCatalog(parent, catalog_upload_context.stop_for_tweaks);
1397 795 ScheduleCatalogProcessing(parent);
1398 }
1399
1400
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 } else if (catalog->IsRoot()) {
1401 // once the root catalog is reached, we are done with processing and report
1402 // back to the main via a Future<> and provide the necessary information
1403
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 CatalogInfo root_catalog_info;
1404 612 root_catalog_info.size = catalog_size;
1405
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 root_catalog_info.ttl = catalog->GetTTL();
1406 612 root_catalog_info.content_hash = result.content_hash;
1407
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 root_catalog_info.revision = catalog->GetRevision();
1408
1/2
✓ Branch 1 taken 612 times.
✗ Branch 2 not taken.
612 catalog_upload_context.root_catalog_info->Set(root_catalog_info);
1409 612 SyncUnlock();
1410 } else {
1411 PANIC(kLogStderr, "inconsistent state detected");
1412 }
1413 1790 }
1414
1415
1416 /**
1417 * Finds dirty catalogs that can be snapshot right away and annotates all the
1418 * other catalogs with their number of dirty descendants.
1419 * Note that there is a convenience wrapper to start the recursion:
1420 * WritableCatalogManager::GetModifiedCatalogLeafs()
1421 *
1422 * @param catalog the catalog for this recursion step
1423 * @param result the result list to be appended to
1424 * @return true if 'catalog' is dirty
1425 */
1426 1839 bool WritableCatalogManager::GetModifiedCatalogLeafsRecursively(
1427 Catalog *catalog, WritableCatalogList *result) const {
1428 1839 WritableCatalog *wr_catalog = static_cast<WritableCatalog *>(catalog);
1429
1430 // Look for dirty catalogs in the descendants of *catalog
1431 1839 int dirty_children = 0;
1432
1/2
✓ Branch 1 taken 1839 times.
✗ Branch 2 not taken.
1839 CatalogList children = wr_catalog->GetChildren();
1433 1839 CatalogList::const_iterator i = children.begin();
1434 1839 const CatalogList::const_iterator iend = children.end();
1435
2/2
✓ Branch 2 taken 1178 times.
✓ Branch 3 taken 1839 times.
3017 for (; i != iend; ++i) {
1436
2/4
✓ Branch 2 taken 1178 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 1178 times.
✗ Branch 5 not taken.
1178 if (GetModifiedCatalogLeafsRecursively(*i, result)) {
1437 1178 ++dirty_children;
1438 }
1439 }
1440
1441 // a catalog is dirty if itself or one of its children has changed
1442 // a leaf catalog doesn't have any dirty children
1443 1839 wr_catalog->set_dirty_children(dirty_children);
1444
1/4
✗ Branch 1 not taken.
✓ Branch 2 taken 1839 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
1839 const bool is_dirty = wr_catalog->IsDirty() || dirty_children > 0;
1445 1839 const bool is_leaf = dirty_children == 0;
1446
3/4
✓ Branch 0 taken 1839 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 1044 times.
✓ Branch 3 taken 795 times.
1839 if (is_dirty && is_leaf) {
1447
1/2
✓ Branch 1 taken 1044 times.
✗ Branch 2 not taken.
1044 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1448 }
1449
1450 1839 return is_dirty;
1451 1839 }
1452
1453
1454 void WritableCatalogManager::DoBalance() {
1455 CatalogList catalog_list = GetCatalogs();
1456 reverse(catalog_list.begin(), catalog_list.end());
1457 for (unsigned i = 0; i < catalog_list.size(); ++i) {
1458 FixWeight(static_cast<WritableCatalog *>(catalog_list[i]));
1459 }
1460 }
1461
1462 void WritableCatalogManager::FixWeight(WritableCatalog *catalog) {
1463 // firstly check underflow because they can provoke overflows
1464 if (catalog->GetNumEntries() < min_weight_ && !catalog->IsRoot()
1465 && catalog->IsAutogenerated()) {
1466 LogCvmfs(kLogCatalog, kLogStdout,
1467 "Deleting an autogenerated catalog in '%s'",
1468 catalog->mountpoint().c_str());
1469 // Remove the .cvmfscatalog and .cvmfsautocatalog files first
1470 const string path = catalog->mountpoint().ToString();
1471 catalog->RemoveEntry(path + "/.cvmfscatalog");
1472 catalog->RemoveEntry(path + "/.cvmfsautocatalog");
1473 // Remove the actual catalog
1474 const string catalog_path = catalog->mountpoint().ToString().substr(1);
1475 RemoveNestedCatalog(catalog_path);
1476 } else if (catalog->GetNumEntries() > max_weight_) {
1477 CatalogBalancer<WritableCatalogManager> catalog_balancer(this);
1478 catalog_balancer.Balance(catalog);
1479 }
1480 }
1481
1482
1483 //****************************************************************************
1484 // Workaround -- Serialized Catalog Committing
1485
1486 int WritableCatalogManager::GetModifiedCatalogsRecursively(
1487 const Catalog *catalog, WritableCatalogList *result) const {
1488 // A catalog must be snapshot, if itself or one of it's descendants is dirty.
1489 // So we traverse the catalog tree recursively and look for dirty catalogs
1490 // on the way.
1491 const WritableCatalog *wr_catalog = static_cast<const WritableCatalog *>(
1492 catalog);
1493 // This variable will contain the number of dirty catalogs in the sub tree
1494 // with *catalog as it's root.
1495 int dirty_catalogs = (wr_catalog->IsDirty()) ? 1 : 0;
1496
1497 // Look for dirty catalogs in the descendants of *catalog
1498 CatalogList children = wr_catalog->GetChildren();
1499 for (CatalogList::const_iterator i = children.begin(), iEnd = children.end();
1500 i != iEnd; ++i) {
1501 dirty_catalogs += GetModifiedCatalogsRecursively(*i, result);
1502 }
1503
1504 // If we found a dirty catalog in the checked sub tree, the root (*catalog)
1505 // must be snapshot and ends up in the result list
1506 if (dirty_catalogs > 0)
1507 result->push_back(const_cast<WritableCatalog *>(wr_catalog));
1508
1509 // tell the upper layer about number of catalogs
1510 return dirty_catalogs;
1511 }
1512
1513
1514 void WritableCatalogManager::CatalogUploadSerializedCallback(
1515 const upload::SpoolerResult &result, const CatalogUploadContext unused) {
1516 if (result.return_code != 0) {
1517 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1518 result.local_path.c_str(), result.return_code);
1519 }
1520
1521 if (UseLocalCache()) {
1522 CopyCatalogToLocalCache(result);
1523 }
1524
1525 unlink(result.local_path.c_str());
1526 }
1527
1528
1529 WritableCatalogManager::CatalogInfo
1530 WritableCatalogManager::SnapshotCatalogsSerialized(const bool stop_for_tweaks) {
1531 LogCvmfs(kLogCvmfs, kLogStdout, "Serialized committing of file catalogs...");
1532 reinterpret_cast<WritableCatalog *>(GetRootCatalog())->SetDirty();
1533 WritableCatalogList catalogs_to_snapshot;
1534 GetModifiedCatalogs(&catalogs_to_snapshot);
1535 CatalogUploadContext unused;
1536 unused.root_catalog_info = NULL;
1537 unused.stop_for_tweaks = false;
1538 spooler_->RegisterListener(
1539 &WritableCatalogManager::CatalogUploadSerializedCallback, this, unused);
1540
1541 CatalogInfo root_catalog_info;
1542 WritableCatalogList::const_iterator i = catalogs_to_snapshot.begin();
1543 const WritableCatalogList::const_iterator iend = catalogs_to_snapshot.end();
1544 for (; i != iend; ++i) {
1545 FinalizeCatalog(*i, stop_for_tweaks);
1546
1547 // Compress and upload catalog
1548 shash::Any hash_catalog(spooler_->GetHashAlgorithm(),
1549 shash::kSuffixCatalog);
1550 if (!zlib::CompressPath2Null((*i)->database_path(), &hash_catalog)) {
1551 PANIC(kLogStderr, "could not compress catalog %s",
1552 (*i)->mountpoint().ToString().c_str());
1553 }
1554
1555 const int64_t catalog_size = GetFileSize((*i)->database_path());
1556 assert(catalog_size > 0);
1557
1558 if ((*i)->HasParent()) {
1559 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1560 WritableCatalog *parent = (*i)->GetWritableParent();
1561 parent->UpdateNestedCatalog((*i)->mountpoint().ToString(), hash_catalog,
1562 catalog_size, (*i)->delta_counters_);
1563 (*i)->delta_counters_.SetZero();
1564 } else if ((*i)->IsRoot()) {
1565 root_catalog_info.size = catalog_size;
1566 root_catalog_info.ttl = (*i)->GetTTL();
1567 root_catalog_info.content_hash = hash_catalog;
1568 root_catalog_info.revision = (*i)->GetRevision();
1569 } else {
1570 PANIC(kLogStderr, "inconsistent state detected");
1571 }
1572
1573 spooler_->ProcessCatalog((*i)->database_path());
1574 }
1575 spooler_->WaitForUpload();
1576
1577 spooler_->UnregisterListeners();
1578 return root_catalog_info;
1579 }
1580
1581 void WritableCatalogManager::SetupSingleCatalogUploadCallback() {
1582 spooler_->RegisterListener(
1583 &WritableCatalogManager::SingleCatalogUploadCallback, this);
1584 }
1585
1586 void WritableCatalogManager::RemoveSingleCatalogUploadCallback() {
1587 spooler_->WaitForUpload(); // wait for all outstanding jobs to finish before
1588 // tearing it down
1589 spooler_->UnregisterListeners();
1590 pending_catalogs_ =
1591 {}; // whatever we couldn't process, leave it to the Commit
1592 }
1593
1594 void WritableCatalogManager::AddCatalogToQueue(const std::string &path) {
1595 SyncLock();
1596 WritableCatalog *catalog = NULL;
1597 bool const retval = FindCatalog(MakeRelativePath(path), &catalog, NULL);
1598 assert(retval);
1599 assert(catalog);
1600 catalog->SetDirty(); // ensure it's dirty so its parent will wait for it
1601 SyncUnlock();
1602 pending_catalogs_.push_back(catalog);
1603 }
1604
1605 void WritableCatalogManager::ScheduleReadyCatalogs() {
1606 // best effort to schedule as many catalogs for upload as possible
1607 for (auto it = pending_catalogs_.begin(); it != pending_catalogs_.end();) {
1608 if ((*it)->dirty_children() == 0) {
1609 FinalizeCatalog(*it, false /* stop_for_tweaks */);
1610 ScheduleCatalogProcessing(*it);
1611 LogCvmfs(kLogCatalog, kLogVerboseMsg, "scheduled %s for processing",
1612 (*it)->mountpoint().c_str());
1613 it = pending_catalogs_.erase(it);
1614 } else {
1615 ++it;
1616 }
1617 }
1618 }
1619
1620 // Callback for uploading a single catalog, similar to CatalogUploadCallback.
1621 // The main difference is that this callback would not trigger processing of the
1622 // parent
1623 void WritableCatalogManager::SingleCatalogUploadCallback(
1624 const upload::SpoolerResult &result) {
1625 if (result.return_code != 0) {
1626 PANIC(kLogStderr, "failed to upload '%s' (retval: %d)",
1627 result.local_path.c_str(), result.return_code);
1628 }
1629
1630 // retrieve the catalog object based on the callback information
1631 // see WritableCatalogManager::ScheduleCatalogProcessing()
1632 WritableCatalog *catalog = NULL;
1633 {
1634 MutexLockGuard const guard(catalog_processing_lock_);
1635 std::map<std::string, WritableCatalog *>::iterator const
1636 c = catalog_processing_map_.find(result.local_path);
1637 assert(c != catalog_processing_map_.end());
1638 catalog = c->second;
1639 }
1640
1641 uint64_t const catalog_size = GetFileSize(result.local_path);
1642 assert(catalog_size > 0);
1643
1644 SyncLock();
1645 if (catalog->HasParent()) {
1646 // finalized nested catalogs will update their parent's pointer
1647 LogCvmfs(kLogCatalog, kLogVerboseMsg, "updating nested catalog link");
1648 WritableCatalog *parent = catalog->GetWritableParent();
1649
1650 parent->UpdateNestedCatalog(catalog->mountpoint().ToString(),
1651 result.content_hash,
1652 catalog_size,
1653 catalog->delta_counters_);
1654 parent->DecrementDirtyChildren();
1655 catalog->delta_counters_.SetZero();
1656 }
1657 // JUMP: detach the catalog after uploading to free sqlite related resources
1658 DetachCatalog(catalog);
1659 SyncUnlock();
1660 }
1661 // using the given list of dirs, fetch all relevant catalogs
1662 void WritableCatalogManager::LoadCatalogs(
1663 const std::string &base_path, const std::unordered_set<std::string> &dirs) {
1664 // mount everything up to "base_path" first (this would be our lease_path
1665 // typically)
1666 Catalog *base_catalog;
1667 MountSubtree(PathString(base_path), NULL /* entry_point */,
1668 true /* is_listable */, &base_catalog);
1669
1670 // start up the downloader
1671 CatalogDownloadContext context;
1672 context.dirs = &dirs;
1673 catalog_download_pipeline_ = new CatalogDownloadPipeline(
1674 static_cast<SimpleCatalogManager *>(this));
1675 catalog_download_pipeline_->RegisterListener(
1676 &WritableCatalogManager::CatalogDownloadCallback, this, context);
1677 catalog_download_pipeline_->Spawn();
1678
1679 Catalog::NestedCatalogList nested_catalogs = base_catalog
1680 ->ListNestedCatalogs();
1681 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1682 // schedule relevant child nested catalogs for download
1683 std::string const mountpoint = it->mountpoint.ToString();
1684 if (dirs.find(mountpoint) != dirs.end()) {
1685 Catalog *catalog = CreateCatalog(it->mountpoint, it->hash,
1686 NULL /* parent */);
1687 {
1688 MutexLockGuard const guard(catalog_download_lock_);
1689 catalog_download_map_.insert(
1690 std::make_pair(it->hash.ToString(), catalog));
1691 }
1692 catalog_download_pipeline_->Process(it->hash);
1693 }
1694 }
1695
1696 catalog_download_pipeline_->WaitFor();
1697 delete catalog_download_pipeline_; // terminate all the threads
1698 }
1699
1700 bool WritableCatalogManager::LookupDirEntry(const string &path,
1701 const LookupOptions options,
1702 DirectoryEntry *dirent) {
1703 SyncLock();
1704 bool const exists = LookupPath(path, options, dirent);
1705 SyncUnlock();
1706 return exists;
1707 }
1708
1709 void WritableCatalogManager::CatalogHashSerializedCallback(
1710 const CompressHashResult &result) {
1711 MutexLockGuard const guard(catalog_hash_lock_);
1712 catalog_hash_map_[result.path] = result.hash;
1713 }
1714
1715 void WritableCatalogManager::CatalogDownloadCallback(
1716 const CatalogDownloadResult &result, CatalogDownloadContext context) {
1717 Catalog *downloaded_catalog;
1718 {
1719 MutexLockGuard const guard(catalog_download_lock_);
1720 auto it = catalog_download_map_.find(result.hash);
1721 assert(it != catalog_download_map_.end());
1722 downloaded_catalog = it->second;
1723 }
1724
1725 if (!downloaded_catalog->OpenDatabase(result.db_path)) {
1726 LogCvmfs(kLogCvmfs, kLogDebug, "failed to initialize catalog");
1727 delete downloaded_catalog;
1728 return;
1729 }
1730
1731 Catalog::NestedCatalogList nested_catalogs = downloaded_catalog
1732 ->ListNestedCatalogs();
1733 for (auto it = nested_catalogs.begin(); it != nested_catalogs.end(); ++it) {
1734 // schedule relevant child nested catalogs for download
1735 if (context.dirs->find(it->mountpoint.ToString()) != context.dirs->end()) {
1736 Catalog *child_catalog = CreateCatalog(it->mountpoint, it->hash,
1737 NULL /* parent */);
1738 {
1739 MutexLockGuard const guard(catalog_download_lock_);
1740 catalog_download_map_.insert(
1741 std::make_pair(it->hash.ToString(), child_catalog));
1742 }
1743 catalog_download_pipeline_->Process(it->hash);
1744 }
1745 }
1746 delete downloaded_catalog;
1747 }
1748
1749
1750 } // namespace catalog
1751