From 659a16d52be1e5675046d3549d390488ac080781 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 2 Mar 2022 21:03:14 -0800 Subject: [PATCH] Fix bug causing incorrect data returned by snapshot read (#9648) Summary: This bug affects use cases that meet the following conditions - (has only the default column family or disables WAL) and - has at least one event listener - atomic flush is NOT affected. If the above conditions meet, then RocksDB can release the db mutex before picking all the existing memtables to flush. In the meantime, a snapshot can be created and db's sequence number can still be incremented. The upcoming flush will ignore this snapshot. A later read using this snapshot can return incorrect result. To fix this issue, we call the listeners callbacks after picking the memtables so that we avoid creating snapshots during this interval. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9648 Test Plan: make check Reviewed By: ajkr Differential Revision: D34555456 Pulled By: riversand963 fbshipit-source-id: 1438981e9f069a5916686b1a0ad7627f734cf0ee --- HISTORY.md | 3 +- db/db_flush_test.cc | 56 ++++++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 36 ++++++++++++++--- 3 files changed, 89 insertions(+), 6 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f3ffe4cdb..bf6bf618a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,7 +5,8 @@ * Added BlobDB options to `ldb` ### Bug Fixes -* * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) +* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) +* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data. ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 2fe3c559d..c7fb112b7 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -676,6 +676,7 @@ class TestFlushListener : public EventListener { ~TestFlushListener() override { prev_fc_info_.status.PermitUncheckedError(); // Ignore the status } + void OnTableFileCreated(const TableFileCreationInfo& info) override { // remember the info for later checking the FlushJobInfo. prev_fc_info_ = info; @@ -2002,6 +2003,61 @@ TEST_P(DBFlushTestBlobError, FlushError) { } #ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) { + class SimpleTestFlushListener : public EventListener { + public: + explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {} + ~SimpleTestFlushListener() override {} + + void OnFlushBegin(DB* db, const FlushJobInfo& info) override { + ASSERT_EQ(static_cast(0), info.cf_id); + + ASSERT_OK(db->Delete(WriteOptions(), "foo")); + snapshot_ = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); + + auto* dbimpl = static_cast_with_check(db); + assert(dbimpl); + + ColumnFamilyHandle* cfh = db->DefaultColumnFamily(); + auto* cfhi = static_cast_with_check(cfh); + assert(cfhi); + ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd())); + } + + DBFlushTest* test_ = nullptr; + const Snapshot* snapshot_ = nullptr; + }; + + Options options = CurrentOptions(); + options.create_if_missing = true; + auto* listener = new SimpleTestFlushListener(this); + options.listeners.emplace_back(listener); + DestroyAndReopen(options); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0")); + + ManagedSnapshot snapshot_guard(db_); + + ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily(); + ASSERT_OK(db_->Flush(FlushOptions(), default_cf)); + + const Snapshot* snapshot = listener->snapshot_; + assert(snapshot); + + ReadOptions read_opts; + read_opts.snapshot = snapshot; + + // Using snapshot should not see "foo". + { + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + } + + db_->ReleaseSnapshot(snapshot); +} + TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 05520d426..e716bac9b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -170,6 +170,7 @@ Status DBImpl::FlushMemTableToOutputFile( const bool needs_to_sync_closed_wals = logfile_number_ > 0 && versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1; + // If needs_to_sync_closed_wals is true, we need to record the current // maximum memtable ID of this column family so that a later PickMemtables() // call will not pick memtables whose IDs are higher. This is due to the fact @@ -177,9 +178,33 @@ Status DBImpl::FlushMemTableToOutputFile( // happen for this column family in the meantime. The newly created memtables // have their data backed by unsynced WALs, thus they cannot be included in // this flush job. + // Another reason why we must record the current maximum memtable ID of this + // column family: SyncClosedLogs() may release db mutex, thus it's possible + // for application to continue to insert into memtables increasing db's + // sequence number. The application may take a snapshot, but this snapshot is + // not included in `snapshot_seqs` which will be passed to flush job because + // `snapshot_seqs` has already been computed before this function starts. + // Recording the max memtable ID ensures that the flush job does not flush + // a memtable without knowing such snapshot(s). uint64_t max_memtable_id = needs_to_sync_closed_wals ? cfd->imm()->GetLatestMemTableID() : port::kMaxUint64; + + // If needs_to_sync_closed_wals is false, then the flush job will pick ALL + // existing memtables of the column family when PickMemTable() is called + // later. Although we won't call SyncClosedLogs() in this case, we may still + // call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also + // releases and re-acquires the db mutex. In the meantime, the application + // can still insert into the memtables and increase the db's sequence number. + // The application can take a snapshot, hoping that the latest visible state + // to this snapshto is preserved. This is hard to guarantee since db mutex + // not held. This newly-created snapshot is not included in `snapshot_seqs` + // and the flush job is unaware of its presence. Consequently, the flush job + // may drop certain keys when generating the L0, causing incorrect data to be + // returned for snapshot read using this snapshot. + // To address this, we make sure NotifyOnFlushBegin() executes after memtable + // picking so that no new snapshot can be taken between the two functions. + FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, @@ -192,11 +217,6 @@ Status DBImpl::FlushMemTableToOutputFile( &blob_callback_); FileMetaData file_meta; -#ifndef ROCKSDB_LITE - // may temporarily unlock and lock the mutex. - NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); -#endif // ROCKSDB_LITE - Status s; bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); @@ -221,6 +241,12 @@ Status DBImpl::FlushMemTableToOutputFile( } TEST_SYNC_POINT_CALLBACK( "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job); + +#ifndef ROCKSDB_LITE + // may temporarily unlock and lock the mutex. + NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); +#endif // ROCKSDB_LITE + bool switched_to_mempurge = false; // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion.