From 1cda273dc3c6b7fcd4fc1ea41f9774e575181d7d Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 16 Feb 2022 23:07:48 -0800 Subject: [PATCH] Fix a silent data loss for write-committed txn (#9571) Summary: The following sequence of events can cause silent data loss for write-committed transactions. ``` Time thread 1 bg flush | db->Put("a") | txn = NewTxn() | txn->Put("b", "v") | txn->Prepare() // writes only to 5.log | db->SwitchMemtable() // memtable 1 has "a" | // close 5.log, | // creates 8.log | trigger flush | pick memtable 1 | unlock db mutex | write new sst | txn->ctwb->Put("gtid", "1") // writes 8.log | txn->Commit() // writes to 8.log | // writes to memtable 2 | compute min_log_number_to_keep_2pc, this | will be 8 (incorrect). | | Purge obsolete wals, including 5.log | V ``` At this point, writes of txn exists only in memtable. Close db without flush because db thinks the data in memtable are backed by log. Then reopen, the writes are lost except key-value pair {"gtid"->"1"}, only the commit marker of txn is in 8.log The reason lies in `PrecomputeMinLogNumberToKeep2PC()` which calls `FindMinPrepLogReferencedByMemTable()`. In the above example, when bg flush thread tries to find obsolete wals, it uses the information computed by `PrecomputeMinLogNumberToKeep2PC()`. The return value of `PrecomputeMinLogNumberToKeep2PC()` depends on three components - `PrecomputeMinLogNumberToKeepNon2PC()`. This represents the WAL that has unflushed data. As the name of this method suggests, it does not account for 2PC. Although the keys reside in the prepare section of a previous WAL, the column family references the current WAL when they are actually inserted into the memtable during txn commit. - `prep_tracker->FindMinLogContainingOutstandingPrep()`. This represents the WAL with a prepare section but the txn hasn't committed. - `FindMinPrepLogReferencedByMemTable()`. This represents the WAL on which some memtables (mutable and immutable) depend for their unflushed data. The bug lies in `FindMinPrepLogReferencedByMemTable()`. Originally, this function skips checking the column families that are being flushed, but the unit test added in this PR shows that they should not be. In this unit test, there is only the default column family, and one of its memtables has unflushed data backed by a prepare section in 5.log. We should return this information via `FindMinPrepLogReferencedByMemTable()`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9571 Test Plan: ``` ./transaction_test --gtest_filter=*/TransactionTest.SwitchMemtableDuringPrepareAndCommit_WC/* make check ``` Reviewed By: siying Differential Revision: D34235236 Pulled By: riversand963 fbshipit-source-id: 120eb21a666728a38dda77b96276c6af72b008b1 --- HISTORY.md | 1 + db/db_impl/db_impl.h | 5 +- db/db_impl/db_impl_debug.cc | 3 +- db/db_impl/db_impl_files.cc | 19 +++--- db/db_impl/db_impl_open.cc | 6 ++ utilities/transactions/transaction_test.cc | 72 ++++++++++++++++++++++ 6 files changed, 90 insertions(+), 16 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 55d922902..2ae0a3715 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ * Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.) * Fixed more cases of EventListener::OnTableFileCreated called with OK status, file_size==0, and no SST file kept. Now the status is Aborted. * Fixed a read-after-free bug in `DB::GetMergeOperands()`. +* Fix a data loss bug for 2PC write-committed transaction caused by concurrent transaction commit and memtable switch (#9571). ### Performance Improvements * Mitigated the overhead of building the file location hash table used by the online LSM tree consistency checks, which can improve performance for certain workloads (see #9351). diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4e25121c7..9c07e81fc 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2399,11 +2399,10 @@ extern uint64_t PrecomputeMinLogNumberToKeepNon2PC( // will not depend on any WAL file. nullptr means no memtable is being flushed. // The function is only applicable to 2pc mode. extern uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const ColumnFamilyData* cfd_to_flush, - const autovector& memtables_to_flush); + VersionSet* vset, const autovector& memtables_to_flush); // For atomic flush. extern uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& cfds_to_flush, + VersionSet* vset, const autovector*>& memtables_to_flush); // Fix user-supplied options to be reasonable diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index e05e82c2c..783037b4e 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -263,8 +263,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() { uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { autovector empty_list; - return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr, - empty_list); + return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list); } Status DBImpl::TEST_GetLatestMutableCFOptions( diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 7ce1a0ae4..b50380d92 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -670,8 +670,7 @@ void DBImpl::DeleteObsoleteFiles() { } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const ColumnFamilyData* cfd_to_flush, - const autovector& memtables_to_flush) { + VersionSet* vset, const autovector& memtables_to_flush) { uint64_t min_log = 0; // we must look through the memtables for two phase transactions @@ -679,7 +678,7 @@ uint64_t FindMinPrepLogReferencedByMemTable( std::unordered_set memtables_to_flush_set( memtables_to_flush.begin(), memtables_to_flush.end()); for (auto loop_cfd : *vset->GetColumnFamilySet()) { - if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { + if (loop_cfd->IsDropped()) { continue; } @@ -701,18 +700,16 @@ uint64_t FindMinPrepLogReferencedByMemTable( } uint64_t FindMinPrepLogReferencedByMemTable( - VersionSet* vset, const autovector& cfds_to_flush, + VersionSet* vset, const autovector*>& memtables_to_flush) { uint64_t min_log = 0; - std::unordered_set cfds_to_flush_set(cfds_to_flush.begin(), - cfds_to_flush.end()); std::unordered_set memtables_to_flush_set; for (const autovector* memtables : memtables_to_flush) { memtables_to_flush_set.insert(memtables->begin(), memtables->end()); } for (auto loop_cfd : *vset->GetColumnFamilySet()) { - if (loop_cfd->IsDropped() || cfds_to_flush_set.count(loop_cfd)) { + if (loop_cfd->IsDropped()) { continue; } @@ -828,8 +825,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( min_log_number_to_keep = min_log_in_prep_heap; } - uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( - vset, &cfd_to_flush, memtables_to_flush); + uint64_t min_log_refed_by_mem = + FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < min_log_number_to_keep) { @@ -859,8 +856,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( min_log_number_to_keep = min_log_in_prep_heap; } - uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( - vset, cfds_to_flush, memtables_to_flush); + uint64_t min_log_refed_by_mem = + FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < min_log_number_to_keep) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 4d14e0300..f15e6b62e 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1374,6 +1374,12 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); + assert(cfd); + assert(cfd->imm()); + // The immutable memtable list must be empty. + assert(std::numeric_limits::max() == + cfd->imm()->GetEarliestMemTableID()); + const uint64_t start_micros = immutable_db_options_.clock->NowMicros(); FileMetaData meta; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 286c05163..1970cf6b2 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -148,6 +148,78 @@ TEST_P(TransactionTest, SuccessTest) { delete txn; } +TEST_P(TransactionTest, SwitchMemtableDuringPrepareAndCommit_WC) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + ROCKSDB_GTEST_BYPASS("Test applies to write-committed only"); + return; + } + + ASSERT_OK(db->Put(WriteOptions(), "key0", "value")); + + TransactionOptions txn_opts; + txn_opts.use_only_the_last_commit_time_batch_for_recovery = true; + Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opts); + assert(txn); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table", [&](void* arg) { + // db mutex not held. + auto* mems = reinterpret_cast*>(arg); + assert(mems); + ASSERT_EQ(1, mems->size()); + auto* ctwb = txn->GetCommitTimeWriteBatch(); + ASSERT_OK(ctwb->Put("gtid", "123")); + ASSERT_OK(txn->Commit()); + delete txn; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(txn->Put("key1", "value")); + ASSERT_OK(txn->SetName("txn1")); + + ASSERT_OK(txn->Prepare()); + + auto dbimpl = static_cast_with_check(db->GetRootDB()); + ASSERT_OK(dbimpl->TEST_SwitchMemtable(nullptr)); + ASSERT_OK(dbimpl->TEST_FlushMemTable( + /*wait=*/false, /*allow_write_stall=*/true, /*cfh=*/nullptr)); + + ASSERT_OK(dbimpl->TEST_WaitForFlushMemTable()); + + { + std::string value; + ASSERT_OK(db->Get(ReadOptions(), "key1", &value)); + ASSERT_EQ("value", value); + } + + delete db; + db = nullptr; + Status s; + if (use_stackable_db_ == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, &db); + } else { + s = OpenWithStackableDB(); + } + ASSERT_OK(s); + assert(db); + + { + std::string value; + ASSERT_OK(db->Get(ReadOptions(), "gtid", &value)); + ASSERT_EQ("123", value); + + ASSERT_OK(db->Get(ReadOptions(), "key1", &value)); + ASSERT_EQ("value", value); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // The test clarifies the contract of do_validate and assume_tracked // in GetForUpdate and Put/Merge/Delete TEST_P(TransactionTest, AssumeExclusiveTracked) {