Improve memtable earliest seqno assignment for secondary instance (#5413)

Summary:
In regular RocksDB instance, `MemTable::earliest_seqno_` is "db sequence number at the time of creation". However, we cannot use the db sequence number to set the value of `MemTable::earliest_seqno_` for secondary instance, i.e. `DBImplSecondary` due to the logic of MANIFEST and WAL replay.
When replaying the log files of the primary, the secondary instance first replays MANIFEST and updates the db sequence number if necessary. Next, the secondary replays WAL files, creates new memtables if necessary and inserts key-value pairs into memtables. The following can occur when the db has two or more column families.
Assume the db has column family "default" and "cf1". At a certain in time, both "default" and "cf1" have data in memtables.
1. Primary triggers a flush and flushes "cf1". "default" is **not** flushed.
2. Secondary replays the MANIFEST updates its db sequence number to the latest value learned from the MANIFEST.
3. Secondary starts to replay WAL that contains the writes to "default". It is possible that the write batches' sequence numbers are smaller than the db sequence number. In this case, these write batches will be skipped, and these updates will not be visible to reader until "default" is later flushed.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5413

Differential Revision: D15637407

Pulled By: riversand963

fbshipit-source-id: 3de3fe35cfc6f1b9f844f3f926f0df29717b6580
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent c292dc8540
commit 6ce5580882
  1. 1
      HISTORY.md
  2. 36
      db/db_impl/db_impl_secondary.cc
  3. 7
      db/db_impl/db_secondary_test.cc

@ -27,6 +27,7 @@
### Bug Fixes
* Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number.
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family.
## 6.2.0 (4/30/2019)
### New Features

@ -199,16 +199,8 @@ Status DBImplSecondary::RecoverLogFiles(
record.size(), Status::Corruption("log record too small"));
continue;
}
SequenceNumber seq = versions_->LastSequence();
WriteBatchInternal::SetContents(&batch, record);
SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
// If the write batch's sequence number is smaller than the last sequence
// number of the db, then we should skip this write batch because its
// data must reside in an SST that has already been added in the prior
// MANIFEST replay.
if (seq_of_batch < seq) {
continue;
}
std::vector<uint32_t> column_family_ids;
status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
if (status.ok()) {
@ -221,6 +213,17 @@ Status DBImplSecondary::RecoverLogFiles(
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
}
const std::vector<FileMetaData*>& l0_files =
cfd->current()->storage_info()->LevelFiles(0);
SequenceNumber seq =
l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
// If the write batch's sequence number is smaller than the last
// sequence number of the largest sequence persisted for this column
// family, then its data must reside in an SST that has already been
// added in the prior MANIFEST replay.
if (seq_of_batch <= seq) {
continue;
}
auto curr_log_num = port::kMaxUint64;
if (cfd_to_current_log_.count(cfd) > 0) {
curr_log_num = cfd_to_current_log_[cfd];
@ -233,7 +236,7 @@ Status DBImplSecondary::RecoverLogFiles(
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
MemTable* new_mem =
cfd->ConstructNewMemtable(mutable_cf_options, seq);
cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
cfd->mem()->SetNextLogNumber(log_number);
cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
new_mem->Ref();
@ -452,6 +455,21 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast<ReactiveVersionSet*>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n",
cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
if (s.ok()) {

@ -576,6 +576,11 @@ TEST_F(DBSecondaryTest, SwitchWAL) {
TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
const int kNumKeysPerMemtable = 1;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::BackgroundCallFlush:ContextCleanedUp",
"DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}});
SyncPoint::GetInstance()->EnableProcessing();
const std::string kCFName1 = "pikachu";
Options options;
options.env = env_;
@ -629,8 +634,10 @@ TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
ASSERT_OK(
Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k)));
TEST_SYNC_POINT("DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp");
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db(dbfull(), handles_, db_secondary_, handles_secondary_);
SyncPoint::GetInstance()->ClearTrace();
}
}

Loading…
Cancel
Save