diff --git a/HISTORY.md b/HISTORY.md index f3ffe4cdb..bf6bf618a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,7 +5,8 @@ * Added BlobDB options to `ldb` ### Bug Fixes -* * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) +* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) +* Fixed a bug caused by race among flush, incoming writes and taking snapshots. Queries to snapshots created with these race condition can return incorrect result, e.g. resurfacing deleted data. ### Public API changes * Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 2fe3c559d..c7fb112b7 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -676,6 +676,7 @@ class TestFlushListener : public EventListener { ~TestFlushListener() override { prev_fc_info_.status.PermitUncheckedError(); // Ignore the status } + void OnTableFileCreated(const TableFileCreationInfo& info) override { // remember the info for later checking the FlushJobInfo. prev_fc_info_ = info; @@ -2002,6 +2003,61 @@ TEST_P(DBFlushTestBlobError, FlushError) { } #ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) { + class SimpleTestFlushListener : public EventListener { + public: + explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {} + ~SimpleTestFlushListener() override {} + + void OnFlushBegin(DB* db, const FlushJobInfo& info) override { + ASSERT_EQ(static_cast(0), info.cf_id); + + ASSERT_OK(db->Delete(WriteOptions(), "foo")); + snapshot_ = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "foo", "value")); + + auto* dbimpl = static_cast_with_check(db); + assert(dbimpl); + + ColumnFamilyHandle* cfh = db->DefaultColumnFamily(); + auto* cfhi = static_cast_with_check(cfh); + assert(cfhi); + ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd())); + } + + DBFlushTest* test_ = nullptr; + const Snapshot* snapshot_ = nullptr; + }; + + Options options = CurrentOptions(); + options.create_if_missing = true; + auto* listener = new SimpleTestFlushListener(this); + options.listeners.emplace_back(listener); + DestroyAndReopen(options); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0")); + + ManagedSnapshot snapshot_guard(db_); + + ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily(); + ASSERT_OK(db_->Flush(FlushOptions(), default_cf)); + + const Snapshot* snapshot = listener->snapshot_; + assert(snapshot); + + ReadOptions read_opts; + read_opts.snapshot = snapshot; + + // Using snapshot should not see "foo". + { + std::string value; + Status s = db_->Get(read_opts, "foo", &value); + ASSERT_TRUE(s.IsNotFound()); + } + + db_->ReleaseSnapshot(snapshot); +} + TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 05520d426..e716bac9b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -170,6 +170,7 @@ Status DBImpl::FlushMemTableToOutputFile( const bool needs_to_sync_closed_wals = logfile_number_ > 0 && versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1; + // If needs_to_sync_closed_wals is true, we need to record the current // maximum memtable ID of this column family so that a later PickMemtables() // call will not pick memtables whose IDs are higher. This is due to the fact @@ -177,9 +178,33 @@ Status DBImpl::FlushMemTableToOutputFile( // happen for this column family in the meantime. The newly created memtables // have their data backed by unsynced WALs, thus they cannot be included in // this flush job. + // Another reason why we must record the current maximum memtable ID of this + // column family: SyncClosedLogs() may release db mutex, thus it's possible + // for application to continue to insert into memtables increasing db's + // sequence number. The application may take a snapshot, but this snapshot is + // not included in `snapshot_seqs` which will be passed to flush job because + // `snapshot_seqs` has already been computed before this function starts. + // Recording the max memtable ID ensures that the flush job does not flush + // a memtable without knowing such snapshot(s). uint64_t max_memtable_id = needs_to_sync_closed_wals ? cfd->imm()->GetLatestMemTableID() : port::kMaxUint64; + + // If needs_to_sync_closed_wals is false, then the flush job will pick ALL + // existing memtables of the column family when PickMemTable() is called + // later. Although we won't call SyncClosedLogs() in this case, we may still + // call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also + // releases and re-acquires the db mutex. In the meantime, the application + // can still insert into the memtables and increase the db's sequence number. + // The application can take a snapshot, hoping that the latest visible state + // to this snapshto is preserved. This is hard to guarantee since db mutex + // not held. This newly-created snapshot is not included in `snapshot_seqs` + // and the flush job is unaware of its presence. Consequently, the flush job + // may drop certain keys when generating the L0, causing incorrect data to be + // returned for snapshot read using this snapshot. + // To address this, we make sure NotifyOnFlushBegin() executes after memtable + // picking so that no new snapshot can be taken between the two functions. + FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, @@ -192,11 +217,6 @@ Status DBImpl::FlushMemTableToOutputFile( &blob_callback_); FileMetaData file_meta; -#ifndef ROCKSDB_LITE - // may temporarily unlock and lock the mutex. - NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); -#endif // ROCKSDB_LITE - Status s; bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); @@ -221,6 +241,12 @@ Status DBImpl::FlushMemTableToOutputFile( } TEST_SYNC_POINT_CALLBACK( "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", &flush_job); + +#ifndef ROCKSDB_LITE + // may temporarily unlock and lock the mutex. + NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); +#endif // ROCKSDB_LITE + bool switched_to_mempurge = false; // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion.