GCC Code Coverage Report
Directory: cvmfs/ Exec Total Coverage
File: cvmfs/sync_mediator.cc Lines: 1 433 0.2 %
Date: 2019-02-03 02:48:13 Branches: 1 306 0.3 %

Line Branch Exec Source
1
/**
2
 * This file is part of the CernVM File System.
3
 */
4
5
#define __STDC_FORMAT_MACROS
6
7
#include "sync_mediator.h"
8
9
#include <fcntl.h>
10
#include <inttypes.h>
11
#include <unistd.h>
12
13
#include <cassert>
14
#include <cstdio>
15
#include <cstdlib>
16
17
#include "catalog_virtual.h"
18
#include "compression.h"
19
#include "fs_traversal.h"
20
#include "hash.h"
21
#include "smalloc.h"
22
#include "sync_union.h"
23
#include "upload.h"
24
#include "util/posix.h"
25
#include "util/string.h"
26
#include "util_concurrency.h"
27
28
using namespace std;  // NOLINT
29
30
struct Counters {
31
  perf::Counter *n_files_added;
32
  perf::Counter *n_files_removed;
33
  perf::Counter *n_files_changed;
34
  perf::Counter *n_directories_added;
35
  perf::Counter *n_directories_removed;
36
  perf::Counter *n_directories_changed;
37
  perf::Counter *sz_added_bytes;
38
  perf::Counter *sz_removed_bytes;
39
40
  explicit Counters(perf::StatisticsTemplate statistics) {
41
    n_files_added = statistics.RegisterTemplated("n_files_added",
42
        "Number of files added");
43
    n_files_removed = statistics.RegisterTemplated("n_files_removed",
44
        "Number of files removed");
45
    n_files_changed = statistics.RegisterTemplated("n_files_changed",
46
        "Number of files changed");
47
    n_directories_added = statistics.RegisterTemplated("n_directories_added",
48
        "Number of directories added");
49
    n_directories_removed =
50
                  statistics.RegisterTemplated("n_directories_removed",
51
                                            "Number of directories removed");
52
    n_directories_changed =
53
                  statistics.RegisterTemplated("n_directories_changed",
54
                                            "Number of directories changed");
55
    sz_added_bytes = statistics.RegisterTemplated("sz_added_bytes",
56
                                            "Number of bytes added");
57
    sz_removed_bytes = statistics.RegisterTemplated("sz_removed_bytes",
58
                                            "Number of bytes removed");
59
  }
60
};  // Counters
61
62
namespace publish {
63
64
1
AbstractSyncMediator::~AbstractSyncMediator() {}
65
66
SyncMediator::SyncMediator(catalog::WritableCatalogManager *catalog_manager,
67
                           const SyncParameters *params,
68
                           perf::StatisticsTemplate statistics) :
69
  catalog_manager_(catalog_manager),
70
  union_engine_(NULL),
71
  handle_hardlinks_(false),
72
  params_(params),
73
  changed_items_(0)
74
{
75
  int retval = pthread_mutex_init(&lock_file_queue_, NULL);
76
  assert(retval == 0);
77
78
  params->spooler->RegisterListener(&SyncMediator::PublishFilesCallback, this);
79
80
  LogCvmfs(kLogPublish, kLogStdout, "Processing changes...");
81
  counters_ = new Counters(statistics);
82
}
83
84
85
SyncMediator::~SyncMediator() {
86
  pthread_mutex_destroy(&lock_file_queue_);
87
}
88
89
90
void SyncMediator::RegisterUnionEngine(SyncUnion *engine) {
91
  union_engine_     = engine;
92
  handle_hardlinks_ = engine->SupportsHardlinks();
93
}
94
95
96
/**
97
 * The entry /.cvmfs or entries in /.cvmfs/ must not be added, removed or
98
 * modified manually.  The directory /.cvmfs is generated by the VirtualCatalog
99
 * class if requested.
100
 */
101
void SyncMediator::EnsureAllowed(SharedPtr<SyncItem> entry) {
102
  const bool ignore_case_setting = false;
103
  string relative_path = entry->GetRelativePath();
104
  if ( (relative_path == string(catalog::VirtualCatalog::kVirtualPath)) ||
105
       (HasPrefix(relative_path,
106
                  string(catalog::VirtualCatalog::kVirtualPath) + "/",
107
                  ignore_case_setting)) )
108
  {
109
    PrintError("invalid attempt to modify '" + relative_path + "'");
110
    abort();
111
  }
112
}
113
114
115
/**
116
 * Add an entry to the repository.
117
 * Added directories will be traversed in order to add the complete subtree.
118
 */
119
void SyncMediator::Add(SharedPtr<SyncItem> entry) {
120
  EnsureAllowed(entry);
121
122
  if (entry->IsDirectory()) {
123
    AddDirectoryRecursively(entry);
124
    return;
125
  }
126
127
  if (entry->IsRegularFile() || entry->IsSymlink()) {
128
    // A file is a hard link if the link count is greater than 1
129
    if (entry->HasHardlinks() && handle_hardlinks_)
130
      InsertHardlink(entry);
131
    else
132
      AddFile(entry);
133
    return;
134
  } else if (entry->IsGraftMarker()) {
135
    LogCvmfs(kLogPublish, kLogDebug, "Ignoring graft marker file.");
136
    return;  // Ignore markers.
137
  }
138
139
  // In OverlayFS whiteouts can be represented as character devices with major
140
  // and minor numbers equal to 0. Special files will be ignored except if they
141
  // are whiteout files.
142
  if (entry->IsSpecialFile() && !entry->IsWhiteout()) {
143
    if (params_->ignore_special_files) {
144
      PrintWarning("'" + entry->GetRelativePath() + "' "
145
                  "is a special file, ignoring.");
146
    } else {
147
      if (entry->HasHardlinks() && handle_hardlinks_)
148
        InsertHardlink(entry);
149
      else
150
        AddFile(entry);
151
    }
152
    return;
153
  }
154
155
  PrintWarning("'" + entry->GetRelativePath() +
156
               "' cannot be added. Unrecognized file type.");
157
}
158
159
160
/**
161
 * Touch an entry in the repository.
162
 */
163
void SyncMediator::Touch(SharedPtr<SyncItem> entry) {
164
  EnsureAllowed(entry);
165
166
  if (entry->IsGraftMarker()) {return;}
167
  if (entry->IsDirectory()) {
168
    TouchDirectory(entry);
169
    perf::Inc(counters_->n_directories_changed);
170
    return;
171
  }
172
173
  if (entry->IsRegularFile() || entry->IsSymlink() || entry->IsSpecialFile()) {
174
    Replace(entry);  // This way, hardlink processing is correct
175
    // Replace calls Remove; cancel Remove's actions:
176
    perf::Dec(counters_->n_files_removed);
177
    perf::Xadd(counters_->sz_removed_bytes, -entry->GetRdOnlySize());
178
179
    perf::Inc(counters_->n_files_changed);
180
    // Count only the diference between the old and new file
181
    int64_t dif = entry->GetScratchSize() - entry->GetRdOnlySize();
182
    if (dif > 0) {                            // added bytes
183
      perf::Xadd(counters_->sz_added_bytes, dif);
184
    } else {                                  // removed bytes
185
      perf::Xadd(counters_->sz_removed_bytes, -dif);
186
    }
187
    return;
188
  }
189
190
  PrintWarning("'" + entry->GetRelativePath() +
191
               "' cannot be touched. Unrecognized file type.");
192
}
193
194
195
/**
196
 * Remove an entry from the repository. Directories will be recursively removed.
197
 */
198
void SyncMediator::Remove(SharedPtr<SyncItem> entry) {
199
  EnsureAllowed(entry);
200
201
  if (entry->WasDirectory()) {
202
    RemoveDirectoryRecursively(entry);
203
    return;
204
  }
205
206
  if (entry->WasRegularFile() || entry->WasSymlink() ||
207
      entry->WasSpecialFile()) {
208
    RemoveFile(entry);
209
    return;
210
  }
211
212
  PrintWarning("'" + entry->GetRelativePath() +
213
               "' cannot be deleted. Unrecognized file type.");
214
}
215
216
217
/**
218
 * Remove the old entry and add the new one.
219
 */
220
void SyncMediator::Replace(SharedPtr<SyncItem> entry) {
221
  // EnsureAllowed(entry); <-- Done by Remove() and Add()
222
  Remove(entry);
223
  Add(entry);
224
}
225
226
void SyncMediator::Clone(const std::string from, const std::string to) {
227
  catalog_manager_->Clone(from, to);
228
}
229
230
void SyncMediator::EnterDirectory(SharedPtr<SyncItem> entry) {
231
  if (!handle_hardlinks_) {
232
    return;
233
  }
234
235
  HardlinkGroupMap new_map;
236
  hardlink_stack_.push(new_map);
237
}
238
239
240
void SyncMediator::LeaveDirectory(SharedPtr<SyncItem> entry)
241
{
242
  if (!handle_hardlinks_) {
243
    return;
244
  }
245
246
  CompleteHardlinks(entry);
247
  AddLocalHardlinkGroups(GetHardlinkMap());
248
  hardlink_stack_.pop();
249
}
250
251
252
/**
253
 * Do any pending processing and commit all changes to the catalogs.
254
 * To be called after change set traversal is finished.
255
 */
256
bool SyncMediator::Commit(manifest::Manifest *manifest) {
257
  if (!params_->print_changeset && changed_items_ >= processing_dot_interval) {
258
    // line break the 'progress bar', see SyncMediator::PrintChangesetNotice()
259
    LogCvmfs(kLogPublish, kLogStdout, "");
260
  }
261
262
  LogCvmfs(kLogPublish, kLogStdout,
263
           "Waiting for upload of files before committing...");
264
  params_->spooler->WaitForUpload();
265
266
  if (!hardlink_queue_.empty()) {
267
    assert(handle_hardlinks_);
268
269
    LogCvmfs(kLogPublish, kLogStdout, "Processing hardlinks...");
270
    params_->spooler->UnregisterListeners();
271
    params_->spooler->RegisterListener(&SyncMediator::PublishHardlinksCallback,
272
                                       this);
273
274
    // TODO(rmeusel): Revise that for Thread Safety!
275
    //       This loop will spool hardlinks into the spooler, which will then
276
    //       process them.
277
    //       On completion of every hardlink the spooler will asynchronously
278
    //       emit callbacks (SyncMediator::PublishHardlinksCallback) which
279
    //       might happen while this for-loop goes through the hardlink_queue_
280
    //
281
    //       For the moment this seems not to be a problem, but it's an accident
282
    //       just waiting to happen.
283
    //
284
    //       Note: Just wrapping this loop in a mutex might produce a dead lock
285
    //             since the spooler does not fill it's processing queue to an
286
    //             unlimited size. Meaning that it might be flooded with hard-
287
    //             links and waiting for the queue to be processed while proces-
288
    //             sing is stalled because the callback is waiting for this
289
    //             mutex.
290
    for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
291
         iEnd = hardlink_queue_.end(); i != iEnd; ++i)
292
    {
293
      LogCvmfs(kLogPublish, kLogVerboseMsg, "Spooling hardlink group %s",
294
               i->master->GetUnionPath().c_str());
295
      IngestionSource *source =
296
          new FileIngestionSource(i->master->GetUnionPath());
297
      params_->spooler->Process(source);
298
    }
299
300
    params_->spooler->WaitForUpload();
301
302
    for (HardlinkGroupList::const_iterator i = hardlink_queue_.begin(),
303
         iEnd = hardlink_queue_.end(); i != iEnd; ++i)
304
    {
305
      LogCvmfs(kLogPublish, kLogVerboseMsg, "Processing hardlink group %s",
306
               i->master->GetUnionPath().c_str());
307
      AddHardlinkGroup(*i);
308
    }
309
  }
310
311
  if (union_engine_) union_engine_->PostUpload();
312
313
  params_->spooler->UnregisterListeners();
314
315
  LogCvmfs(kLogPublish, kLogStdout, "Committing file catalogs...");
316
  if (params_->spooler->GetNumberOfErrors() > 0) {
317
    LogCvmfs(kLogPublish, kLogStderr, "failed to commit files");
318
    return false;
319
  }
320
321
  if (catalog_manager_->IsBalanceable() ||
322
      (params_->virtual_dir_actions != catalog::VirtualCatalog::kActionNone))
323
  {
324
    if (catalog_manager_->IsBalanceable())
325
      catalog_manager_->Balance();
326
    // Commit empty string to ensure that the "content" of the auto catalog
327
    // markers is present in the repository.
328
    string empty_file = CreateTempPath(params_->dir_temp + "/empty", 0600);
329
    IngestionSource* source = new FileIngestionSource(empty_file);
330
    params_->spooler->Process(source);
331
    params_->spooler->WaitForUpload();
332
    unlink(empty_file.c_str());
333
    if (params_->spooler->GetNumberOfErrors() > 0) {
334
      LogCvmfs(kLogPublish, kLogStderr, "failed to commit auto catalog marker");
335
      return false;
336
    }
337
  }
338
  catalog_manager_->PrecalculateListings();
339
  return catalog_manager_->Commit(params_->stop_for_catalog_tweaks,
340
                                  params_->manual_revision,
341
                                  manifest);
342
}
343
344
345
void SyncMediator::InsertHardlink(SharedPtr<SyncItem> entry) {
346
  assert(handle_hardlinks_);
347
348
  uint64_t inode = entry->GetUnionInode();
349
  LogCvmfs(kLogPublish, kLogVerboseMsg, "found hardlink %" PRIu64 " at %s",
350
           inode, entry->GetUnionPath().c_str());
351
352
  // Find the hard link group in the lists
353
  HardlinkGroupMap::iterator hardlink_group = GetHardlinkMap().find(inode);
354
355
  if (hardlink_group == GetHardlinkMap().end()) {
356
    // Create a new hardlink group
357
    GetHardlinkMap().insert(
358
      HardlinkGroupMap::value_type(inode, HardlinkGroup(entry)));
359
  } else {
360
    // Append the file to the appropriate hardlink group
361
    hardlink_group->second.AddHardlink(entry);
362
  }
363
}
364
365
366
void SyncMediator::InsertLegacyHardlink(SharedPtr<SyncItem> entry) {
367
  // Check if found file has hardlinks (nlink > 1)
368
  // As we are looking through all files in one directory here, there might be
369
  // completely untouched hardlink groups, which we can safely skip.
370
  // Finally we have to see if the hardlink is already part of this group
371
372
  assert(handle_hardlinks_);
373
374
  if (entry->GetUnionLinkcount() < 2)
375
    return;
376
377
  uint64_t inode = entry->GetUnionInode();
378
  HardlinkGroupMap::iterator hl_group;
379
  hl_group = GetHardlinkMap().find(inode);
380
381
  if (hl_group != GetHardlinkMap().end()) {  // touched hardlinks in this group?
382
    bool found = false;
383
384
    // search for the entry in this group
385
    for (SyncItemList::const_iterator i = hl_group->second.hardlinks.begin(),
386
         iEnd = hl_group->second.hardlinks.end(); i != iEnd; ++i)
387
    {
388
      if (*(i->second) == *entry) {
389
        found = true;
390
        break;
391
      }
392
    }
393
394
    if (!found) {
395
      // Hardlink already in the group?
396
      // If one element of a hardlink group is edited, all elements must be
397
      // replaced.  Here, we remove an untouched hardlink and add it to its
398
      // hardlink group for re-adding later
399
      LogCvmfs(kLogPublish, kLogVerboseMsg, "Picked up legacy hardlink %s",
400
               entry->GetUnionPath().c_str());
401
      Remove(entry);
402
      hl_group->second.AddHardlink(entry);
403
    }
404
  }
405
}
406
407
408
/**
409
 * Create a recursion engine which DOES NOT recurse into directories.
410
 * It basically goes through the current directory (in the union volume) and
411
 * searches for legacy hardlinks which has to be connected to the new
412
 * or edited ones.
413
 */
414
void SyncMediator::CompleteHardlinks(SharedPtr<SyncItem> entry) {
415
  assert(handle_hardlinks_);
416
417
  // If no hardlink in this directory was changed, we can skip this
418
  if (GetHardlinkMap().empty())
419
    return;
420
421
  LogCvmfs(kLogPublish, kLogVerboseMsg, "Post-processing hard links in %s",
422
           entry->GetUnionPath().c_str());
423
424
  // Look for legacy hardlinks
425
  FileSystemTraversal<SyncMediator> traversal(this, union_engine_->union_path(),
426
                                              false);
427
  traversal.fn_new_file =
428
    &SyncMediator::LegacyRegularHardlinkCallback;
429
  traversal.fn_new_symlink = &SyncMediator::LegacySymlinkHardlinkCallback;
430
  traversal.fn_new_character_dev =
431
    &SyncMediator::LegacyCharacterDeviceHardlinkCallback;
432
  traversal.fn_new_block_dev = &SyncMediator::LegacyBlockDeviceHardlinkCallback;
433
  traversal.fn_new_fifo = &SyncMediator::LegacyFifoHardlinkCallback;
434
  traversal.fn_new_socket = &SyncMediator::LegacySocketHardlinkCallback;
435
  traversal.Recurse(entry->GetUnionPath());
436
}
437
438
439
void SyncMediator::LegacyRegularHardlinkCallback(const string &parent_dir,
440
                                                 const string &file_name)
441
{
442
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
443
  InsertLegacyHardlink(entry);
444
}
445
446
447
void SyncMediator::LegacySymlinkHardlinkCallback(const string &parent_dir,
448
                                                  const string &file_name)
449
{
450
  SharedPtr<SyncItem> entry =
451
      CreateSyncItem(parent_dir, file_name, kItemSymlink);
452
  InsertLegacyHardlink(entry);
453
}
454
455
void SyncMediator::LegacyCharacterDeviceHardlinkCallback(
456
  const string &parent_dir,
457
  const string &file_name)
458
{
459
  SharedPtr<SyncItem> entry =
460
      CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
461
  InsertLegacyHardlink(entry);
462
}
463
464
void SyncMediator::LegacyBlockDeviceHardlinkCallback(
465
  const string &parent_dir,
466
  const string &file_name)
467
{
468
  SharedPtr<SyncItem> entry =
469
      CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
470
  InsertLegacyHardlink(entry);
471
}
472
473
void SyncMediator::LegacyFifoHardlinkCallback(const string &parent_dir,
474
                                              const string &file_name)
475
{
476
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
477
  InsertLegacyHardlink(entry);
478
}
479
480
void SyncMediator::LegacySocketHardlinkCallback(const string &parent_dir,
481
                                                const string &file_name)
482
{
483
  SharedPtr<SyncItem> entry =
484
      CreateSyncItem(parent_dir, file_name, kItemSocket);
485
  InsertLegacyHardlink(entry);
486
}
487
488
489
void SyncMediator::AddDirectoryRecursively(SharedPtr<SyncItem> entry) {
490
  AddDirectory(entry);
491
492
  // Create a recursion engine, which recursively adds all entries in a newly
493
  // created directory
494
  FileSystemTraversal<SyncMediator> traversal(
495
    this, union_engine_->scratch_path(), true);
496
  traversal.fn_enter_dir      = &SyncMediator::EnterAddedDirectoryCallback;
497
  traversal.fn_leave_dir      = &SyncMediator::LeaveAddedDirectoryCallback;
498
  traversal.fn_new_file       = &SyncMediator::AddFileCallback;
499
  traversal.fn_new_symlink    = &SyncMediator::AddSymlinkCallback;
500
  traversal.fn_new_dir_prefix = &SyncMediator::AddDirectoryCallback;
501
  traversal.fn_ignore_file    = &SyncMediator::IgnoreFileCallback;
502
  traversal.fn_new_character_dev = &SyncMediator::AddCharacterDeviceCallback;
503
  traversal.fn_new_block_dev = &SyncMediator::AddBlockDeviceCallback;
504
  traversal.fn_new_fifo      = &SyncMediator::AddFifoCallback;
505
  traversal.fn_new_socket    = &SyncMediator::AddSocketCallback;
506
  traversal.Recurse(entry->GetScratchPath());
507
}
508
509
510
bool SyncMediator::AddDirectoryCallback(const std::string &parent_dir,
511
                                        const std::string &dir_name)
512
{
513
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
514
  AddDirectory(entry);
515
  return true;  // The recursion engine should recurse deeper here
516
}
517
518
519
void SyncMediator::AddFileCallback(const std::string &parent_dir,
520
                                   const std::string &file_name)
521
{
522
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
523
  Add(entry);
524
}
525
526
527
void SyncMediator::AddCharacterDeviceCallback(const std::string &parent_dir,
528
                                    const std::string &file_name)
529
{
530
  SharedPtr<SyncItem> entry =
531
      CreateSyncItem(parent_dir, file_name, kItemCharacterDevice);
532
  Add(entry);
533
}
534
535
void SyncMediator::AddBlockDeviceCallback(const std::string &parent_dir,
536
                                    const std::string &file_name)
537
{
538
  SharedPtr<SyncItem> entry =
539
      CreateSyncItem(parent_dir, file_name, kItemBlockDevice);
540
  Add(entry);
541
}
542
543
void SyncMediator::AddFifoCallback(const std::string &parent_dir,
544
                                   const std::string &file_name)
545
{
546
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFifo);
547
  Add(entry);
548
}
549
550
void SyncMediator::AddSocketCallback(const std::string &parent_dir,
551
                                     const std::string &file_name) {
552
  SharedPtr<SyncItem> entry =
553
      CreateSyncItem(parent_dir, file_name, kItemSocket);
554
  Add(entry);
555
}
556
557
void SyncMediator::AddSymlinkCallback(const std::string &parent_dir,
558
                                      const std::string &link_name)
559
{
560
  SharedPtr<SyncItem> entry =
561
      CreateSyncItem(parent_dir, link_name, kItemSymlink);
562
  Add(entry);
563
}
564
565
566
void SyncMediator::EnterAddedDirectoryCallback(const std::string &parent_dir,
567
                                               const std::string &dir_name)
568
{
569
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
570
  EnterDirectory(entry);
571
}
572
573
574
void SyncMediator::LeaveAddedDirectoryCallback(const std::string &parent_dir,
575
                                               const std::string &dir_name)
576
{
577
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
578
  LeaveDirectory(entry);
579
}
580
581
582
void SyncMediator::RemoveDirectoryRecursively(SharedPtr<SyncItem> entry) {
583
  // Delete a directory AFTER it was emptied here,
584
  // because it would start up another recursion
585
586
  const bool recurse = false;
587
  FileSystemTraversal<SyncMediator> traversal(
588
    this, union_engine_->rdonly_path(), recurse);
589
  traversal.fn_new_file = &SyncMediator::RemoveFileCallback;
590
  traversal.fn_new_dir_postfix =
591
    &SyncMediator::RemoveDirectoryCallback;
592
  traversal.fn_new_symlink = &SyncMediator::RemoveSymlinkCallback;
593
  traversal.fn_new_character_dev = &SyncMediator::RemoveCharacterDeviceCallback;
594
  traversal.fn_new_block_dev = &SyncMediator::RemoveBlockDeviceCallback;
595
  traversal.fn_new_fifo      = &SyncMediator::RemoveFifoCallback;
596
  traversal.fn_new_socket    = &SyncMediator::RemoveSocketCallback;
597
  traversal.Recurse(entry->GetRdOnlyPath());
598
599
  // The given directory was emptied recursively and can now itself be deleted
600
  RemoveDirectory(entry);
601
}
602
603
604
void SyncMediator::RemoveFileCallback(const std::string &parent_dir,
605
                                      const std::string &file_name)
606
{
607
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, file_name, kItemFile);
608
  Remove(entry);
609
}
610
611
612
void SyncMediator::RemoveSymlinkCallback(const std::string &parent_dir,
613
                                         const std::string &link_name)
614
{
615
  SharedPtr<SyncItem> entry =
616
      CreateSyncItem(parent_dir, link_name, kItemSymlink);
617
  Remove(entry);
618
}
619
620
void SyncMediator::RemoveCharacterDeviceCallback(
621
  const std::string &parent_dir,
622
  const std::string &link_name)
623
{
624
  SharedPtr<SyncItem> entry =
625
      CreateSyncItem(parent_dir, link_name, kItemCharacterDevice);
626
  Remove(entry);
627
}
628
629
void SyncMediator::RemoveBlockDeviceCallback(
630
  const std::string &parent_dir,
631
  const std::string &link_name)
632
{
633
  SharedPtr<SyncItem> entry =
634
      CreateSyncItem(parent_dir, link_name, kItemBlockDevice);
635
  Remove(entry);
636
}
637
638
void SyncMediator::RemoveFifoCallback(const std::string &parent_dir,
639
                                      const std::string &link_name)
640
{
641
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, link_name, kItemFifo);
642
  Remove(entry);
643
}
644
645
void SyncMediator::RemoveSocketCallback(const std::string &parent_dir,
646
                                        const std::string &link_name)
647
{
648
  SharedPtr<SyncItem> entry =
649
      CreateSyncItem(parent_dir, link_name, kItemSocket);
650
  Remove(entry);
651
}
652
653
void SyncMediator::RemoveDirectoryCallback(const std::string &parent_dir,
654
                                           const std::string &dir_name)
655
{
656
  SharedPtr<SyncItem> entry = CreateSyncItem(parent_dir, dir_name, kItemDir);
657
  RemoveDirectoryRecursively(entry);
658
}
659
660
661
bool SyncMediator::IgnoreFileCallback(const std::string &parent_dir,
662
                                      const std::string &file_name)
663
{
664
  if (union_engine_->IgnoreFilePredicate(parent_dir, file_name)) {
665
    return true;
666
  }
667
668
  SharedPtr<SyncItem> entry =
669
      CreateSyncItem(parent_dir, file_name, kItemUnknown);
670
  return entry->IsWhiteout();
671
}
672
673
SharedPtr<SyncItem> SyncMediator::CreateSyncItem(
674
    const std::string &relative_parent_path, const std::string &filename,
675
    const SyncItemType entry_type) const {
676
  return union_engine_->CreateSyncItem(relative_parent_path, filename,
677
                                       entry_type);
678
}
679
680
void SyncMediator::PublishFilesCallback(const upload::SpoolerResult &result) {
681
  LogCvmfs(kLogPublish, kLogVerboseMsg,
682
           "Spooler callback for %s, digest %s, produced %d chunks, retval %d",
683
           result.local_path.c_str(),
684
           result.content_hash.ToString().c_str(),
685
           result.file_chunks.size(),
686
           result.return_code);
687
  if (result.return_code != 0) {
688
    LogCvmfs(kLogPublish, kLogStderr, "Spool failure for %s (%d)",
689
             result.local_path.c_str(), result.return_code);
690
    abort();
691
  }
692
693
  SyncItemList::iterator itr;
694
  {
695
    MutexLockGuard guard(lock_file_queue_);
696
    itr = file_queue_.find(result.local_path);
697
  }
698
699
  assert(itr != file_queue_.end());
700
701
  SyncItem &item = *itr->second;
702
  item.SetContentHash(result.content_hash);
703
  item.SetCompressionAlgorithm(result.compression_alg);
704
705
  XattrList *xattrs = &default_xattrs;
706
  if (params_->include_xattrs) {
707
    xattrs = XattrList::CreateFromFile(result.local_path);
708
    assert(xattrs != NULL);
709
  }
710
711
  if (result.IsChunked()) {
712
    catalog_manager_->AddChunkedFile(
713
      item.CreateBasicCatalogDirent(),
714
      *xattrs,
715
      item.relative_parent_path(),
716
      result.file_chunks);
717
  } else {
718
    catalog_manager_->AddFile(
719
      item.CreateBasicCatalogDirent(),
720
      *xattrs,
721
      item.relative_parent_path());
722
  }
723
724
  if (xattrs != &default_xattrs)
725
    free(xattrs);
726
}
727
728
729
void SyncMediator::PublishHardlinksCallback(
730
  const upload::SpoolerResult &result)
731
{
732
  LogCvmfs(kLogPublish, kLogVerboseMsg,
733
           "Spooler callback for hardlink %s, digest %s, retval %d",
734
           result.local_path.c_str(),
735
           result.content_hash.ToString().c_str(),
736
           result.return_code);
737
  if (result.return_code != 0) {
738
    LogCvmfs(kLogPublish, kLogStderr, "Spool failure for %s (%d)",
739
             result.local_path.c_str(), result.return_code);
740
    abort();
741
  }
742
743
  bool found = false;
744
  for (unsigned i = 0; i < hardlink_queue_.size(); ++i) {
745
    if (hardlink_queue_[i].master->GetUnionPath() == result.local_path) {
746
      found = true;
747
      hardlink_queue_[i].master->SetContentHash(result.content_hash);
748
      SyncItemList::iterator j, jend;
749
      for (j = hardlink_queue_[i].hardlinks.begin(),
750
           jend = hardlink_queue_[i].hardlinks.end();
751
           j != jend; ++j)
752
      {
753
        j->second->SetContentHash(result.content_hash);
754
        j->second->SetCompressionAlgorithm(result.compression_alg);
755
      }
756
      if (result.IsChunked())
757
        hardlink_queue_[i].file_chunks = result.file_chunks;
758
759
      break;
760
    }
761
  }
762
763
  assert(found);
764
}
765
766
767
void SyncMediator::CreateNestedCatalog(SharedPtr<SyncItem> directory) {
768
  const std::string notice = "Nested catalog at " + directory->GetUnionPath();
769
  PrintChangesetNotice(kAddCatalog, notice);
770
771
  if (!params_->dry_run) {
772
    catalog_manager_->CreateNestedCatalog(directory->GetRelativePath());
773
  }
774
}
775
776
777
void SyncMediator::RemoveNestedCatalog(SharedPtr<SyncItem> directory) {
778
  const std::string notice =  "Nested catalog at " + directory->GetUnionPath();
779
  PrintChangesetNotice(kRemoveCatalog, notice);
780
781
  if (!params_->dry_run) {
782
    catalog_manager_->RemoveNestedCatalog(directory->GetRelativePath());
783
  }
784
}
785
786
787
void SyncMediator::PrintChangesetNotice(const ChangesetAction  action,
788
                                        const std::string     &extra) const {
789
  if (!params_->print_changeset) {
790
    ++changed_items_;
791
    if (changed_items_ % processing_dot_interval == 0) {
792
      LogCvmfs(kLogPublish, kLogStdout | kLogNoLinebreak, ".");
793
    }
794
    return;
795
  }
796
797
  const char *action_label = NULL;
798
  switch (action) {
799
    case kAdd:
800
    case kAddCatalog:
801
    case kAddHardlinks:
802
      action_label = "[add]";
803
      break;
804
    case kRemove:
805
    case kRemoveCatalog:
806
      action_label = "[rem]";
807
      break;
808
    case kTouch:
809
      action_label = "[tou]";
810
      break;
811
    default:
812
      assert(false && "unknown sync mediator action");
813
  }
814
815
  LogCvmfs(kLogPublish, kLogStdout, "%s %s", action_label, extra.c_str());
816
}
817
818
void SyncMediator::AddFile(SharedPtr<SyncItem> entry) {
819
  PrintChangesetNotice(kAdd, entry->GetUnionPath());
820
821
  if ((entry->IsSymlink() || entry->IsSpecialFile()) && !params_->dry_run) {
822
    assert(!entry->HasGraftMarker());
823
    // Symlinks and special files are completely stored in the catalog
824
    catalog_manager_->AddFile(entry->CreateBasicCatalogDirent(), default_xattrs,
825
                              entry->relative_parent_path());
826
  } else if (entry->HasGraftMarker() && !params_->dry_run) {
827
    if (entry->IsValidGraft()) {
828
      // Graft files are added to catalog immediately.
829
      if (entry->IsChunkedGraft()) {
830
        catalog_manager_->AddChunkedFile(
831
            entry->CreateBasicCatalogDirent(), default_xattrs,
832
            entry->relative_parent_path(), *(entry->GetGraftChunks()));
833
      } else {
834
        catalog_manager_->AddFile(
835
            entry->CreateBasicCatalogDirent(),
836
            default_xattrs,  // TODO(bbockelm): For now, use default xattrs
837
                             // on grafted files.
838
            entry->relative_parent_path());
839
      }
840
    } else {
841
      // Unlike with regular files, grafted files can be "unpublishable" - i.e.,
842
      // the graft file is missing information.  It's not clear that continuing
843
      // forward with the publish is the correct thing to do; abort for now.
844
      LogCvmfs(kLogPublish, kLogStderr,
845
               "Encountered a grafted file (%s) with "
846
               "invalid grafting information; check contents of .cvmfsgraft-*"
847
               " file.  Aborting publish.",
848
               entry->GetRelativePath().c_str());
849
      abort();
850
    }
851
  } else if (entry->relative_parent_path().empty() &&
852
             entry->IsCatalogMarker()) {
853
    LogCvmfs(kLogPublish, kLogStderr,
854
             "Error: nested catalog marker in root directory");
855
    abort();
856
  } else {
857
    // Push the file to the spooler, remember the entry for the path
858
    pthread_mutex_lock(&lock_file_queue_);
859
    file_queue_[entry->GetUnionPath()] = entry;
860
    pthread_mutex_unlock(&lock_file_queue_);
861
    // Spool the file
862
    params_->spooler->Process(entry->CreateIngestionSource());
863
  }
864
865
  // publish statistics counting for new file
866
  if (entry->IsNew()) {
867
    perf::Inc(counters_->n_files_added);
868
    perf::Xadd(counters_->sz_added_bytes, entry->GetScratchSize());
869
  }
870
}
871
872
void SyncMediator::RemoveFile(SharedPtr<SyncItem> entry) {
873
  PrintChangesetNotice(kRemove, entry->GetUnionPath());
874
875
  if (!params_->dry_run) {
876
    if (handle_hardlinks_ && entry->GetRdOnlyLinkcount() > 1) {
877
      LogCvmfs(kLogPublish, kLogVerboseMsg, "remove %s from hardlink group",
878
               entry->GetUnionPath().c_str());
879
      catalog_manager_->ShrinkHardlinkGroup(entry->GetRelativePath());
880
    }
881
    catalog_manager_->RemoveFile(entry->GetRelativePath());
882
  }
883
884
  // Counting nr of removed files and removed bytes
885
  perf::Inc(counters_->n_files_removed);
886
  perf::Xadd(counters_->sz_removed_bytes, entry->GetRdOnlySize());
887
}
888
889
void SyncMediator::AddUnmaterializedDirectory(SharedPtr<SyncItem> entry) {
890
  AddDirectory(entry);
891
}
892
893
void SyncMediator::AddDirectory(SharedPtr<SyncItem> entry) {
894
  PrintChangesetNotice(kAdd, entry->GetUnionPath());
895
896
  perf::Inc(counters_->n_directories_added);
897
  assert(!entry->HasGraftMarker());
898
  if (!params_->dry_run) {
899
    catalog_manager_->AddDirectory(entry->CreateBasicCatalogDirent(),
900
                                   entry->relative_parent_path());
901
  }
902
903
  if (entry->HasCatalogMarker() &&
904
      !catalog_manager_->IsTransitionPoint(entry->GetRelativePath())) {
905
    CreateNestedCatalog(entry);
906
  }
907
}
908
909
910
/**
911
 * this method deletes a single directory entry! Make sure to empty it
912
 * before you call this method or simply use
913
 * SyncMediator::RemoveDirectoryRecursively instead.
914
 */
915
void SyncMediator::RemoveDirectory(SharedPtr<SyncItem> entry) {
916
  const std::string directory_path = entry->GetRelativePath();
917
918
  if (catalog_manager_->IsTransitionPoint(directory_path)) {
919
    RemoveNestedCatalog(entry);
920
  }
921
922
  PrintChangesetNotice(kRemove, entry->GetUnionPath());
923
  if (!params_->dry_run) {
924
    catalog_manager_->RemoveDirectory(directory_path);
925
  }
926
927
  perf::Inc(counters_->n_directories_removed);
928
}
929
930
931
void SyncMediator::TouchDirectory(SharedPtr<SyncItem> entry) {
932
  PrintChangesetNotice(kTouch, entry->GetUnionPath());
933
  const std::string directory_path = entry->GetRelativePath();
934
935
  if (!params_->dry_run) {
936
    catalog_manager_->TouchDirectory(entry->CreateBasicCatalogDirent(),
937
                                     directory_path);
938
  }
939
940
  if (entry->HasCatalogMarker() &&
941
      !catalog_manager_->IsTransitionPoint(directory_path))
942
  {
943
    CreateNestedCatalog(entry);
944
  } else if (!entry->HasCatalogMarker() &&
945
             catalog_manager_->IsTransitionPoint(directory_path))
946
  {
947
    RemoveNestedCatalog(entry);
948
  }
949
}
950
951
952
/**
953
 * All hardlinks in the current directory have been picked up.  Now they are
954
 * added to the catalogs.
955
 */
956
void SyncMediator::AddLocalHardlinkGroups(const HardlinkGroupMap &hardlinks) {
957
  assert(handle_hardlinks_);
958
959
  for (HardlinkGroupMap::const_iterator i = hardlinks.begin(),
960
       iEnd = hardlinks.end(); i != iEnd; ++i)
961
  {
962
    if (i->second.hardlinks.size() != i->second.master->GetUnionLinkcount()) {
963
      LogCvmfs(kLogPublish, kLogStdout, "Hardlinks across directories (%s)",
964
               i->second.master->GetUnionPath().c_str());
965
      if (!params_->ignore_xdir_hardlinks)
966
        abort();
967
    }
968
969
    if (params_->print_changeset) {
970
      std::string changeset_notice = "add hardlinks around ("
971
                                   + i->second.master->GetUnionPath() + ")";
972
      for (SyncItemList::const_iterator j = i->second.hardlinks.begin(),
973
           jEnd = i->second.hardlinks.end(); j != jEnd; ++j)
974
      {
975
        changeset_notice += " " + j->second->filename();
976
      }
977
      PrintChangesetNotice(kAddHardlinks, changeset_notice);
978
    }
979
980
    if (params_->dry_run)
981
      continue;
982
983
    if (i->second.master->IsSymlink() || i->second.master->IsSpecialFile())
984
      AddHardlinkGroup(i->second);
985
    else
986
      hardlink_queue_.push_back(i->second);
987
  }
988
}
989
990
991
void SyncMediator::AddHardlinkGroup(const HardlinkGroup &group) {
992
  assert(handle_hardlinks_);
993
994
  // Create a DirectoryEntry list out of the hardlinks
995
  catalog::DirectoryEntryBaseList hardlinks;
996
  for (SyncItemList::const_iterator i = group.hardlinks.begin(),
997
       iEnd = group.hardlinks.end(); i != iEnd; ++i)
998
  {
999
    hardlinks.push_back(i->second->CreateBasicCatalogDirent());
1000
  }
1001
  XattrList *xattrs = &default_xattrs;
1002
  if (params_->include_xattrs) {
1003
    xattrs = XattrList::CreateFromFile(group.master->GetUnionPath());
1004
    assert(xattrs);
1005
  }
1006
  catalog_manager_->AddHardlinkGroup(
1007
    hardlinks,
1008
    *xattrs,
1009
    group.master->relative_parent_path(),
1010
    group.file_chunks);
1011
  if (xattrs != &default_xattrs)
1012
    free(xattrs);
1013
}
1014
1015
}  // namespace publish