diff --git a/HISTORY.md b/HISTORY.md index c88b436e4..ad6c370b5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -27,6 +27,7 @@ ### Bug Fixes * Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number. * Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level. +* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family. ## 6.2.0 (4/30/2019) ### New Features diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 827d99929..eb8c4c987 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -199,16 +199,8 @@ Status DBImplSecondary::RecoverLogFiles( record.size(), Status::Corruption("log record too small")); continue; } - SequenceNumber seq = versions_->LastSequence(); WriteBatchInternal::SetContents(&batch, record); SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); - // If the write batch's sequence number is smaller than the last sequence - // number of the db, then we should skip this write batch because its - // data must reside in an SST that has already been added in the prior - // MANIFEST replay. - if (seq_of_batch < seq) { - continue; - } std::vector column_family_ids; status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); if (status.ok()) { @@ -221,6 +213,17 @@ Status DBImplSecondary::RecoverLogFiles( if (cfds_changed->count(cfd) == 0) { cfds_changed->insert(cfd); } + const std::vector& l0_files = + cfd->current()->storage_info()->LevelFiles(0); + SequenceNumber seq = + l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno; + // If the write batch's sequence number is smaller than the last + // sequence number of the largest sequence persisted for this column + // family, then its data must reside in an SST that has already been + // added in the prior MANIFEST replay. + if (seq_of_batch <= seq) { + continue; + } auto curr_log_num = port::kMaxUint64; if (cfd_to_current_log_.count(cfd) > 0) { curr_log_num = cfd_to_current_log_[cfd]; @@ -233,7 +236,7 @@ Status DBImplSecondary::RecoverLogFiles( const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); MemTable* new_mem = - cfd->ConstructNewMemtable(mutable_cf_options, seq); + cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch); cfd->mem()->SetNextLogNumber(log_number); cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); new_mem->Ref(); @@ -452,6 +455,21 @@ Status DBImplSecondary::TryCatchUpWithPrimary() { InstrumentedMutexLock lock_guard(&mutex_); s = static_cast(versions_.get()) ->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n", + cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); + } + // list wal_dir to discover new WALs and apply new changes to the secondary // instance if (s.ok()) { diff --git a/db/db_impl/db_secondary_test.cc b/db/db_impl/db_secondary_test.cc index 5b375422f..c9aaa3611 100644 --- a/db/db_impl/db_secondary_test.cc +++ b/db/db_impl/db_secondary_test.cc @@ -576,6 +576,11 @@ TEST_F(DBSecondaryTest, SwitchWAL) { TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { const int kNumKeysPerMemtable = 1; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::BackgroundCallFlush:ContextCleanedUp", + "DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"}}); + SyncPoint::GetInstance()->EnableProcessing(); const std::string kCFName1 = "pikachu"; Options options; options.env = env_; @@ -629,8 +634,10 @@ TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) { Put(0 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); ASSERT_OK( Put(1 /*cf*/, "key" + std::to_string(k), "value" + std::to_string(k))); + TEST_SYNC_POINT("DBSecondaryTest::SwitchWALMultipleColumnFamilies:BeforeCatchUp"); ASSERT_OK(db_secondary_->TryCatchUpWithPrimary()); verify_db(dbfull(), handles_, db_secondary_, handles_secondary_); + SyncPoint::GetInstance()->ClearTrace(); } }