CernVM-FS  2.13.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cvmfs_fsck.cc
Go to the documentation of this file.
1 
9 #define _FILE_OFFSET_BITS 64
10 
11 
12 #include <dirent.h>
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <pthread.h>
16 #include <stdint.h>
17 #include <sys/stat.h>
18 #include <unistd.h>
19 
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cstring>
23 #include <string>
24 
26 #include "crypto/hash.h"
27 #include "util/atomic.h"
28 #include "util/concurrency.h"
29 #include "util/logging.h"
30 #include "util/platform.h"
31 #include "util/posix.h"
32 #include "util/smalloc.h"
33 
34 using namespace std; // NOLINT
35 
36 enum Errors {
37  kErrorOk = 0,
43 };
44 
45 string *g_cache_dir;
54 pthread_mutex_t g_lock_traverse = PTHREAD_MUTEX_INITIALIZER;
55 DIR *g_DIRP_current = NULL;
56 int g_num_dirs = -1;
57 string *g_current_dir;
59 int g_num_threads = 1;
60 bool g_fix_errors = false;
61 bool g_verbose = false;
64 
65 
66 static void Usage() {
68  "CernVM File System consistency checker, version %s\n\n"
69  "This tool checks a cvmfs cache directory for consistency.\n"
70  "If necessary, the managed cache db is removed so that\n"
71  "it will be rebuilt on next mount.\n\n"
72  "Usage: cvmfs_fsck [-v] [-p] [-f] [-j #threads] <cache directory>\n"
73  "Options:\n"
74  " -v verbose output\n"
75  " -p try to fix automatically\n"
76  " -f force rebuild of managed cache db on next mount\n"
77  " -j number of concurrent integrity check worker threads\n",
78  CVMFS_VERSION);
79 }
80 
81 
82 static bool GetNextFile(string *relative_path, string *hash_name) {
83  platform_dirent64 *d = NULL;
84 
86 get_next_file_again:
87  while (g_DIRP_current && ((d = platform_readdir(g_DIRP_current)) != NULL)) {
88  const string name = d->d_name;
89  if ((name == ".") || (name == ".."))
90  continue;
91 
92  platform_stat64 info;
93  *relative_path = *g_current_dir + "/" + name;
94  *hash_name = *g_current_dir + name;
95  const string path = *g_cache_dir + "/" + *relative_path;
96  if (platform_lstat(relative_path->c_str(), &info) != 0) {
97  LogCvmfs(kLogCvmfs, kLogStdout, "Warning: failed to stat() %s (%d)",
98  path.c_str(), errno);
99  continue;
100  }
101 
102  if (!S_ISREG(info.st_mode)) {
103  LogCvmfs(kLogCvmfs, kLogStdout, "Warning: %s is not a regular file",
104  path.c_str());
105  continue;
106  }
107 
108  break;
109  }
110 
111  if (!d) {
112  if (g_DIRP_current) {
113  closedir(g_DIRP_current);
114  g_DIRP_current = NULL;
115  }
116  g_num_dirs++;
117  if (g_num_dirs < 256) {
118  char hex[3];
119  snprintf(hex, sizeof(hex), "%02x", g_num_dirs);
120  *g_current_dir = string(hex, 2);
121 
122  if (g_verbose)
123  LogCvmfs(kLogCvmfs, kLogStdout, "Entering %s", g_current_dir->c_str());
124  if ((g_DIRP_current = opendir(hex)) == NULL) {
126  "Invalid cache directory, %s/%s does not exist",
127  g_cache_dir->c_str(), g_current_dir->c_str());
128  exit(kErrorUnfixed);
129  }
130  goto get_next_file_again;
131  }
132  }
133 
134  return d != NULL;
135 }
136 
137 
138 static void *MainCheck(void *data __attribute__((unused))) {
139  string relative_path;
140  string hash_name;
141 
142  while (GetNextFile(&relative_path, &hash_name)) {
143  const string path = *g_cache_dir + "/" + relative_path;
144 
145  int n = atomic_xadd32(&g_num_files, 1);
146  if (g_verbose)
147  LogCvmfs(kLogCvmfs, kLogStdout, "Checking file %s", path.c_str());
148  if (!g_verbose && ((n % 1000) == 0))
150 
151  if (relative_path[relative_path.length() - 1] == 'T') {
153  "Warning: temporary file catalog %s found", path.c_str());
154  atomic_inc32(&g_num_tmp_catalog);
155  if (g_fix_errors) {
156  if (unlink(relative_path.c_str()) == 0) {
157  LogCvmfs(kLogCvmfs, kLogStdout, "Fix: %s unlinked", path.c_str());
158  atomic_inc32(&g_num_err_fixed);
159  } else {
160  LogCvmfs(kLogCvmfs, kLogStdout, "Error: failed to unlink %s",
161  path.c_str());
162  atomic_inc32(&g_num_err_unfixed);
163  }
164  }
165  continue;
166  }
167 
168  int fd_src = open(relative_path.c_str(), O_RDONLY);
169  if (fd_src < 0) {
170  LogCvmfs(kLogCvmfs, kLogStdout, "Error: cannot open %s", path.c_str());
171  atomic_inc32(&g_num_err_operational);
172  continue;
173  }
174  // Don't thrash kernel buffers
175  platform_disable_kcache(fd_src);
176 
177  // Compress every file and calculate SHA-1 of stream
178  shash::Any expected_hash = shash::MkFromHexPtr(shash::HexPtr(hash_name));
179  shash::Any hash(expected_hash.algorithm);
180  if (!zlib::CompressFd2Null(fd_src, &hash)) {
181  LogCvmfs(kLogCvmfs, kLogStdout, "Error: could not compress %s",
182  path.c_str());
183  atomic_inc32(&g_num_err_operational);
184  } else {
185  if (hash != expected_hash) {
186  // If the hashes don't match, try hashing the uncompressed file
187  if (!shash::HashFile(relative_path, &hash)) {
188  LogCvmfs(kLogCvmfs, kLogStdout, "Error: could not hash %s",
189  path.c_str());
190  atomic_inc32(&g_num_err_operational);
191  }
192  if (hash != expected_hash) {
193  if (g_fix_errors) {
194  const string quarantaine_path = "./quarantaine/" + hash_name;
195  bool fixed = false;
196  if (rename(relative_path.c_str(), quarantaine_path.c_str()) == 0) {
198  "Fix: %s is corrupted, moved to quarantaine folder",
199  path.c_str());
200  fixed = true;
201  } else {
203  "Warning: failed to move %s into quarantaine folder",
204  path.c_str());
205  if (unlink(relative_path.c_str()) == 0) {
207  "Fix: %s is corrupted, file unlinked", path.c_str());
208  fixed = true;
209  } else {
211  "Error: %s is corrupted, could not unlink",
212  path.c_str());
213  }
214  }
215 
216  if (fixed) {
217  atomic_inc32(&g_num_err_fixed);
218 
219  // Changes made, we have to rebuild the managed cache db
220  atomic_cas32(&g_force_rebuild, 0, 1);
221  atomic_cas32(&g_modified_cache, 0, 1);
222  } else {
223  atomic_inc32(&g_num_err_unfixed);
224  }
225  } else {
227  "Error: %s has compressed checksum %s, "
228  "delete this file from cache directory!",
229  path.c_str(), hash.ToString().c_str());
230  atomic_inc32(&g_num_err_unfixed);
231  }
232  }
233  }
234  }
235  close(fd_src);
236  }
237 
238  return NULL;
239 }
240 
241 
242 int main(int argc, char **argv) {
243  atomic_init32(&g_force_rebuild);
244  atomic_init32(&g_modified_cache);
245  g_current_dir = new string();
246 
247  int c;
248  while ((c = getopt(argc, argv, "hvpfj:")) != -1) {
249  switch (c) {
250  case 'h':
251  Usage();
252  return kErrorOk;
253  case 'v':
254  g_verbose = true;
255  break;
256  case 'p':
257  g_fix_errors = true;
258  break;
259  case 'f':
260  atomic_cas32(&g_force_rebuild, 0, 1);
261  break;
262  case 'j':
263  g_num_threads = atoi(optarg);
264  if (g_num_threads < 1) {
266  "There is at least one worker thread required");
267  return kErrorUsage;
268  }
269  break;
270  case '?':
271  default:
272  Usage();
273  return kErrorUsage;
274  }
275  }
276 
277  // Switch to cache directory
278  if (optind >= argc) {
279  Usage();
280  return kErrorUsage;
281  }
282  g_cache_dir = new string(MakeCanonicalPath(argv[optind]));
283  if (chdir(g_cache_dir->c_str()) != 0) {
284  LogCvmfs(kLogCvmfs, kLogStderr, "Could not chdir to %s",
285  g_cache_dir->c_str());
286  return kErrorOperational;
287  }
288 
289  // Check if txn directory is empty
290  DIR *dirp_txn;
291  if ((dirp_txn = opendir("txn")) == NULL) {
293  "Invalid cache directory, %s/txn does not exist",
294  g_cache_dir->c_str());
295  return kErrorOperational;
296  }
298  while ((d = platform_readdir(dirp_txn)) != NULL) {
299  const string name = d->d_name;
300  if ((name == ".") || (name == ".."))
301  continue;
302 
304  "Warning: temporary directory %s/txn is not empty\n"
305  "If this repository is currently _not_ mounted, "
306  "you can remove its contents",
307  g_cache_dir->c_str());
308  break;
309  }
310  closedir(dirp_txn);
311 
312  // Run workers to recalculate checksums
313  atomic_init32(&g_num_files);
314  atomic_init32(&g_num_err_fixed);
315  atomic_init32(&g_num_err_unfixed);
316  atomic_init32(&g_num_err_operational);
317  atomic_init32(&g_num_tmp_catalog);
318  pthread_t *workers = reinterpret_cast<pthread_t *>(
319  smalloc(g_num_threads * sizeof(pthread_t)));
320  if (!g_verbose)
321  LogCvmfs(kLogCvmfs, kLogStdout | kLogNoLinebreak, "Verifying: ");
322  for (int i = 0; i < g_num_threads; ++i) {
323  if (g_verbose)
324  LogCvmfs(kLogCvmfs, kLogStdout, "Starting worker %d", i + 1);
325  if (pthread_create(&workers[i], NULL, MainCheck, NULL) != 0) {
326  LogCvmfs(kLogCvmfs, kLogStdout, "Fatal: could not create worker thread");
327  return kErrorOperational;
328  }
329  }
330  for (int i = g_num_threads - 1; i >= 0; --i) {
331  pthread_join(workers[i], NULL);
332  if (g_verbose)
333  LogCvmfs(kLogCvmfs, kLogStdout, "Stopping worker %d", i + 1);
334  }
335  free(workers);
336  if (!g_verbose)
338  LogCvmfs(kLogCvmfs, kLogStdout, "Verified %d files",
339  atomic_read32(&g_num_files));
340 
341  if (atomic_read32(&g_num_tmp_catalog) > 0)
342  LogCvmfs(kLogCvmfs, kLogStdout, "Temporary file catalogs were found.");
343 
344  if (atomic_read32(&g_force_rebuild)) {
345  if (unlink("cachedb") == 0) {
347  "Fix: managed cache db unlinked, will be rebuilt on next mount");
348  atomic_inc32(&g_num_err_fixed);
349  } else {
350  if (errno != ENOENT) {
352  "Error: could not unlink managed cache database (%d)", errno);
353  atomic_inc32(&g_num_err_unfixed);
354  }
355  }
356  }
357 
358  if (atomic_read32(&g_modified_cache)) {
360  "\n"
361  "WARNING: There might by corrupted files in the kernel buffers.\n"
362  "Remount CernVM-FS or run 'echo 3 > /proc/sys/vm/drop_caches'"
363  "\n\n");
364  }
365 
366  int retval = 0;
367  if (atomic_read32(&g_num_err_fixed) > 0)
368  retval |= kErrorFixed;
369  if (atomic_read32(&g_num_err_unfixed) > 0)
370  retval |= kErrorUnfixed;
371  if (atomic_read32(&g_num_err_operational) > 0)
372  retval |= kErrorOperational;
373 
374  return retval;
375 }
struct stat64 platform_stat64
int g_num_threads
Definition: cvmfs_fsck.cc:59
atomic_int32 g_num_files
Definition: cvmfs_fsck.cc:46
int g_num_dirs
Definition: cvmfs_fsck.cc:56
string * g_current_dir
Definition: cvmfs_fsck.cc:57
bool HashFile(const std::string &filename, Any *any_digest)
Definition: hash.cc:341
static void Usage(const char *progname)
std::string ToString(const bool with_suffix=false) const
Definition: hash.h:241
bool g_fix_errors
Definition: cvmfs_fsck.cc:60
string * g_cache_dir
Definition: cvmfs_fsck.cc:45
Algorithms algorithm
Definition: hash.h:122
struct cvmcache_object_info __attribute__
Definition: atomic.h:24
int main()
Definition: helper_allow.cc:16
int32_t atomic_int32
Definition: atomic.h:17
atomic_int32 g_num_err_fixed
Definition: cvmfs_fsck.cc:47
atomic_int32 g_num_tmp_catalog
Definition: cvmfs_fsck.cc:50
bool CompressFd2Null(int fd_src, shash::Any *compressed_hash, uint64_t *processed_bytes)
Definition: compression.cc:461
int platform_lstat(const char *path, platform_stat64 *buf)
static bool GetNextFile(string *relative_path, string *hash_name)
Definition: cvmfs_fsck.cc:82
static void * MainCheck(void *data __attribute__((unused)))
Definition: cvmfs_fsck.cc:138
void platform_disable_kcache(int filedes)
pthread_mutex_t g_lock_traverse
Definition: cvmfs_fsck.cc:54
bool g_verbose
Definition: cvmfs_fsck.cc:61
atomic_int32 g_force_rebuild
Definition: cvmfs_fsck.cc:62
atomic_int32 g_modified_cache
Definition: cvmfs_fsck.cc:63
Definition: mutex.h:42
Any MkFromHexPtr(const HexPtr hex, const char suffix)
Definition: hash.cc:82
platform_dirent64 * platform_readdir(DIR *dirp)
DIR * g_DIRP_current
Definition: cvmfs_fsck.cc:55
std::string MakeCanonicalPath(const std::string &path)
Definition: posix.cc:98
atomic_int32 g_num_err_operational
Definition: cvmfs_fsck.cc:49
Errors
Definition: cvmfs_fsck.cc:36
atomic_int32 g_num_err_unfixed
Definition: cvmfs_fsck.cc:48
struct dirent64 platform_dirent64
CVMFS_EXPORT void LogCvmfs(const LogSource source, const int mask, const char *format,...)
Definition: logging.cc:545