diff --git a/HISTORY.md b/HISTORY.md index 55d922902..2ae0a3715 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.) * Fixed more cases of EventListener::OnTableFileCreated called with OK status, file_size==0, and no SST file kept. Now the status is Aborted. * Fixed a read-after-free bug in `DB::GetMergeOperands()`. +* Fix a data loss bug for 2PC write-committed transaction caused by concurrent transaction commit and memtable switch (#9571). ### Performance Improvements * Mitigated the overhead of building the file location hash table used by the online LSM tree consistency checks, which can improve performance for certain workloads (see #9351). diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4e25121c7..9c07e81fc 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2399,11 +2399,10 @@ extern uint64_t PrecomputeMinLogNumberToKeepNon2PC( // will not depend on any WAL file. nullptr means no memtable is being flushed. // The function is only applicable to 2pc mode. extern uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const ColumnFamilyData* cfd_to_flush, - const autovector& memtables_to_flush); + VersionSet* vset, const autovector& memtables_to_flush); // For atomic flush. extern uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& cfds_to_flush, + VersionSet* vset, const autovector*>& memtables_to_flush); // Fix user-supplied options to be reasonable diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index e05e82c2c..783037b4e 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -263,8 +263,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() { uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { autovector empty_list; - return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr, - empty_list); + return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list); } Status DBImpl::TEST_GetLatestMutableCFOptions( diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 7ce1a0ae4..b50380d92 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -670,8 +670,7 @@ void DBImpl::DeleteObsoleteFiles() { } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const ColumnFamilyData* cfd_to_flush, - const autovector& memtables_to_flush) { + VersionSet* vset, const autovector& memtables_to_flush) { uint64_t min_log = 0; // we must look through the memtables for two phase transactions @@ -679,7 +678,7 @@ uint64_t FindMinPrepLogReferencedByMemTable( std::unordered_set memtables_to_flush_set( memtables_to_flush.begin(), memtables_to_flush.end()); for (auto loop_cfd : *vset->GetColumnFamilySet()) { - if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { + if (loop_cfd->IsDropped()) { continue; } @@ -701,18 +700,16 @@ uint64_t FindMinPrepLogReferencedByMemTable( } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& cfds_to_flush, + VersionSet* vset, const autovector*>& memtables_to_flush) { uint64_t min_log = 0; - std::unordered_set cfds_to_flush_set(cfds_to_flush.begin(), - cfds_to_flush.end()); std::unordered_set memtables_to_flush_set; for (const autovector* memtables : memtables_to_flush) { memtables_to_flush_set.insert(memtables->begin(), memtables->end()); } for (auto loop_cfd : *vset->GetColumnFamilySet()) { - if (loop_cfd->IsDropped() || cfds_to_flush_set.count(loop_cfd)) { + if (loop_cfd->IsDropped()) { continue; } @@ -828,8 +825,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( min_log_number_to_keep = min_log_in_prep_heap; } - uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( - vset, &cfd_to_flush, memtables_to_flush); + uint64_t min_log_refed_by_mem = + FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < min_log_number_to_keep) { @@ -859,8 +856,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( min_log_number_to_keep = min_log_in_prep_heap; } - uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( - vset, cfds_to_flush, memtables_to_flush); + uint64_t min_log_refed_by_mem = + FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < min_log_number_to_keep) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 4d14e0300..f15e6b62e 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1374,6 +1374,12 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); + assert(cfd); + assert(cfd->imm()); + // The immutable memtable list must be empty. + assert(std::numeric_limits::max() == + cfd->imm()->GetEarliestMemTableID()); + const uint64_t start_micros = immutable_db_options_.clock->NowMicros(); FileMetaData meta; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 286c05163..1970cf6b2 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -148,6 +148,78 @@ TEST_P(TransactionTest, SuccessTest) { delete txn; } +TEST_P(TransactionTest, SwitchMemtableDuringPrepareAndCommit_WC) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + ROCKSDB_GTEST_BYPASS("Test applies to write-committed only"); + return; + } + + ASSERT_OK(db->Put(WriteOptions(), "key0", "value")); + + TransactionOptions txn_opts; + txn_opts.use_only_the_last_commit_time_batch_for_recovery = true; + Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opts); + assert(txn); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table", [&](void* arg) { + // db mutex not held. + auto* mems = reinterpret_cast*>(arg); + assert(mems); + ASSERT_EQ(1, mems->size()); + auto* ctwb = txn->GetCommitTimeWriteBatch(); + ASSERT_OK(ctwb->Put("gtid", "123")); + ASSERT_OK(txn->Commit()); + delete txn; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(txn->Put("key1", "value")); + ASSERT_OK(txn->SetName("txn1")); + + ASSERT_OK(txn->Prepare()); + + auto dbimpl = static_cast_with_check(db->GetRootDB()); + ASSERT_OK(dbimpl->TEST_SwitchMemtable(nullptr)); + ASSERT_OK(dbimpl->TEST_FlushMemTable( + /*wait=*/false, /*allow_write_stall=*/true, /*cfh=*/nullptr)); + + ASSERT_OK(dbimpl->TEST_WaitForFlushMemTable()); + + { + std::string value; + ASSERT_OK(db->Get(ReadOptions(), "key1", &value)); + ASSERT_EQ("value", value); + } + + delete db; + db = nullptr; + Status s; + if (use_stackable_db_ == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + } else { + s = OpenWithStackableDB(); + } + ASSERT_OK(s); + assert(db); + + { + std::string value; + ASSERT_OK(db->Get(ReadOptions(), "gtid", &value)); + ASSERT_EQ("123", value); + + ASSERT_OK(db->Get(ReadOptions(), "key1", &value)); + ASSERT_EQ("value", value); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // The test clarifies the contract of do_validate and assume_tracked // in GetForUpdate and Put/Merge/Delete TEST_P(TransactionTest, AssumeExclusiveTracked) {