Fix a bug caused by secondary not skipping the beginning of new MANIFEST (#5472)

Summary:
While the secondary is replaying after the primary, the primary may switch to a new MANIFEST. The secondary is already able to detect and follow the primary to the new MANIFEST. However, the current implementation has a bug, described as follows.
The new MANIFEST's first records have been generated by VersionSet::WriteSnapshot to describe the current state of the column families and the db as of the MANIFEST creation. Since the secondary instance has already finished recovering upon start, there is no need for the secondary to process these records. Actually, if the secondary were to replay these records, the secondary may end up adding the same SST files **again** to each column family, causing consistency checks done by VersionBuilder to fail. Therefore, we record the number of records to skip at the beginning of the new MANIFEST and ignore them.

Test plan (on dev server)
```
$make clean && make -j32 all
$./db_secondary_test
```
All existing unit tests must pass as well.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5472

Differential Revision: D15866771

Pulled By: riversand963

fbshipit-source-id: a1eec4837fb2ad13059398efb0f437e74fd53bed
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent ddd088c8b9
commit f287f8dc93
  1. 1
      HISTORY.md
  2. 28
      db/db_impl/db_secondary_test.cc
  3. 46
      db/version_set.cc
  4. 3
      db/version_set.h

@ -28,6 +28,7 @@
* Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number. * Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number.
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level. * Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family. * Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family.
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
## 6.2.0 (4/30/2019) ## 6.2.0 (4/30/2019)
### New Features ### New Features

@ -525,6 +525,34 @@ TEST_F(DBSecondaryTest, SwitchManifest) {
range_scan_db(); range_scan_db();
} }
// Here, "Snapshot" refers to the version edits written by
// VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after
// switching from the old one.
TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
Reopen(options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
ASSERT_OK(Put("0", "value0"));
ASSERT_OK(Flush());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
std::string value;
ReadOptions ropts;
ropts.verify_checksums = true;
ASSERT_OK(db_secondary_->Get(ropts, "0", &value));
ASSERT_EQ("value0", value);
Reopen(options);
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
}
TEST_F(DBSecondaryTest, SwitchWAL) { TEST_F(DBSecondaryTest, SwitchWAL) {
const int kNumKeysPerMemtable = 1; const int kNumKeysPerMemtable = 1;
Options options; Options options;

@ -5217,7 +5217,8 @@ ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
WriteController* write_controller) WriteController* write_controller)
: VersionSet(dbname, _db_options, _env_options, table_cache, : VersionSet(dbname, _db_options, _env_options, table_cache,
write_buffer_manager, write_controller, write_buffer_manager, write_controller,
/*block_cache_tracer=*/nullptr) {} /*block_cache_tracer=*/nullptr),
number_of_edits_to_skip_(0) {}
ReactiveVersionSet::~ReactiveVersionSet() {} ReactiveVersionSet::~ReactiveVersionSet() {}
@ -5415,6 +5416,17 @@ Status ReactiveVersionSet::ReadAndApply(
break; break;
} }
// Skip the first VersionEdits of each MANIFEST generated by
// VersionSet::WriteSnapshot.
if (number_of_edits_to_skip_ > 0) {
ColumnFamilyData* cfd =
column_family_set_->GetColumnFamily(edit.column_family_);
if (cfd != nullptr && !cfd->IsDropped()) {
--number_of_edits_to_skip_;
}
continue;
}
s = read_buffer_.AddEdit(&edit); s = read_buffer_.AddEdit(&edit);
if (!s.ok()) { if (!s.ok()) {
break; break;
@ -5463,8 +5475,33 @@ Status ReactiveVersionSet::ReadAndApply(
// find the next MANIFEST, we should exit the loop. // find the next MANIFEST, we should exit the loop.
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader); s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
reader = manifest_reader->get(); reader = manifest_reader->get();
if (s.ok() && reader->file()->file_name() == old_manifest_path) { if (s.ok()) {
if (reader->file()->file_name() == old_manifest_path) {
// Still processing the same MANIFEST, thus no need to continue this
// loop since no record is available if we have reached here.
break; break;
} else {
// We have switched to a new MANIFEST whose first records have been
// generated by VersionSet::WriteSnapshot. Since the secondary instance
// has already finished recovering upon start, there is no need for the
// secondary to process these records. Actually, if the secondary were
// to replay these records, the secondary may end up adding the same
// SST files AGAIN to each column family, causing consistency checks
// done by VersionBuilder to fail. Therefore, we record the number of
// records to skip at the beginning of the new MANIFEST and ignore
// them.
number_of_edits_to_skip_ = 0;
for (auto* cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
// Increase number_of_edits_to_skip by 2 because WriteSnapshot()
// writes 2 version edits for each column family at the beginning of
// the newly-generated MANIFEST.
// TODO(yanqin) remove hard-coded value.
number_of_edits_to_skip_ += 2;
}
}
} }
} }
@ -5504,7 +5541,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
return Status::OK(); return Status::OK();
} }
if (active_version_builders_.find(edit.column_family_) == if (active_version_builders_.find(edit.column_family_) ==
active_version_builders_.end()) { active_version_builders_.end() && !cfd->IsDropped()) {
std::unique_ptr<BaseReferencedVersionBuilder> builder_guard( std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
new BaseReferencedVersionBuilder(cfd)); new BaseReferencedVersionBuilder(cfd));
active_version_builders_.insert( active_version_builders_.insert(
@ -5532,6 +5569,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
delete cfd; delete cfd;
cfd = nullptr; cfd = nullptr;
} }
active_version_builders_.erase(builder_iter);
} else { } else {
builder->Apply(&edit); builder->Apply(&edit);
} }
@ -5543,7 +5581,7 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
return s; return s;
} }
if (cfd != nullptr) { if (cfd != nullptr && !cfd->IsDropped()) {
s = builder->LoadTableHandlers( s = builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads, cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */, false /* prefetch_index_and_filter_in_cache */,

@ -1195,6 +1195,9 @@ class ReactiveVersionSet : public VersionSet {
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
active_version_builders_; active_version_builders_;
AtomicGroupReadBuffer read_buffer_; AtomicGroupReadBuffer read_buffer_;
// Number of version edits to skip by ReadAndApply at the beginning of a new
// MANIFEST created by primary.
int number_of_edits_to_skip_;
using VersionSet::LogAndApply; using VersionSet::LogAndApply;
using VersionSet::Recover; using VersionSet::Recover;

Loading…
Cancel
Save