Fix a silent data loss for write-committed txn (#9571)

Summary:
The following sequence of events can cause silent data loss for write-committed
transactions.
```
Time    thread 1                                       bg flush
 |   db->Put("a")
 |   txn = NewTxn()
 |   txn->Put("b", "v")
 |   txn->Prepare()       // writes only to 5.log
 |   db->SwitchMemtable() // memtable 1 has "a"
 |                        // close 5.log,
 |                        // creates 8.log
 |   trigger flush
 |                                                  pick memtable 1
 |                                                  unlock db mutex
 |                                                  write new sst
 |   txn->ctwb->Put("gtid", "1") // writes 8.log
 |   txn->Commit() // writes to 8.log
 |                 // writes to memtable 2
 |                                               compute min_log_number_to_keep_2pc, this
 |                                               will be 8 (incorrect).
 |
 |                                             Purge obsolete wals, including 5.log
 |
 V
```

At this point, writes of txn exists only in memtable. Close db without flush because db thinks the data in
memtable are backed by log. Then reopen, the writes are lost except key-value pair {"gtid"->"1"},
only the commit marker of txn is in 8.log

The reason lies in `PrecomputeMinLogNumberToKeep2PC()` which calls `FindMinPrepLogReferencedByMemTable()`.
In the above example, when bg flush thread tries to find obsolete wals, it uses the information
computed by `PrecomputeMinLogNumberToKeep2PC()`. The return value of `PrecomputeMinLogNumberToKeep2PC()`
depends on three components
- `PrecomputeMinLogNumberToKeepNon2PC()`. This represents the WAL that has unflushed data. As the name of this method suggests, it does not account for 2PC. Although the keys reside in the prepare section of a previous WAL, the column family references the current WAL when they are actually inserted into the memtable during txn commit.
- `prep_tracker->FindMinLogContainingOutstandingPrep()`. This represents the WAL with a prepare section but the txn hasn't committed.
- `FindMinPrepLogReferencedByMemTable()`. This represents the WAL on which some memtables (mutable and immutable) depend for their unflushed data.

The bug lies in `FindMinPrepLogReferencedByMemTable()`. Originally, this function skips checking the column families
that are being flushed, but the unit test added in this PR shows that they should not be. In this unit test, there is
only the default column family, and one of its memtables has unflushed data backed by a prepare section in 5.log.
We should return this information via `FindMinPrepLogReferencedByMemTable()`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9571

Test Plan:
```
./transaction_test --gtest_filter=*/TransactionTest.SwitchMemtableDuringPrepareAndCommit_WC/*
make check
```

Reviewed By: siying

Differential Revision: D34235236

Pulled By: riversand963

fbshipit-source-id: 120eb21a666728a38dda77b96276c6af72b008b1
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 1e403a0c6c
commit 1cda273dc3
  1. 1
      HISTORY.md
  2. 5
      db/db_impl/db_impl.h
  3. 3
      db/db_impl/db_impl_debug.cc
  4. 19
      db/db_impl/db_impl_files.cc
  5. 6
      db/db_impl/db_impl_open.cc
  6. 72
      utilities/transactions/transaction_test.cc

@ -4,6 +4,7 @@
* Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.) * Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.)
* Fixed more cases of EventListener::OnTableFileCreated called with OK status, file_size==0, and no SST file kept. Now the status is Aborted. * Fixed more cases of EventListener::OnTableFileCreated called with OK status, file_size==0, and no SST file kept. Now the status is Aborted.
* Fixed a read-after-free bug in `DB::GetMergeOperands()`. * Fixed a read-after-free bug in `DB::GetMergeOperands()`.
* Fix a data loss bug for 2PC write-committed transaction caused by concurrent transaction commit and memtable switch (#9571).
### Performance Improvements ### Performance Improvements
* Mitigated the overhead of building the file location hash table used by the online LSM tree consistency checks, which can improve performance for certain workloads (see #9351). * Mitigated the overhead of building the file location hash table used by the online LSM tree consistency checks, which can improve performance for certain workloads (see #9351).

@ -2399,11 +2399,10 @@ extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
// will not depend on any WAL file. nullptr means no memtable is being flushed. // will not depend on any WAL file. nullptr means no memtable is being flushed.
// The function is only applicable to 2pc mode. // The function is only applicable to 2pc mode.
extern uint64_t FindMinPrepLogReferencedByMemTable( extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const ColumnFamilyData* cfd_to_flush, VersionSet* vset, const autovector<MemTable*>& memtables_to_flush);
const autovector<MemTable*>& memtables_to_flush);
// For atomic flush. // For atomic flush.
extern uint64_t FindMinPrepLogReferencedByMemTable( extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush); const autovector<const autovector<MemTable*>*>& memtables_to_flush);
// Fix user-supplied options to be reasonable // Fix user-supplied options to be reasonable

@ -263,8 +263,7 @@ size_t DBImpl::TEST_LogsWithPrepSize() {
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
autovector<MemTable*> empty_list; autovector<MemTable*> empty_list;
return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr, return FindMinPrepLogReferencedByMemTable(versions_.get(), empty_list);
empty_list);
} }
Status DBImpl::TEST_GetLatestMutableCFOptions( Status DBImpl::TEST_GetLatestMutableCFOptions(

@ -670,8 +670,7 @@ void DBImpl::DeleteObsoleteFiles() {
} }
uint64_t FindMinPrepLogReferencedByMemTable( uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const ColumnFamilyData* cfd_to_flush, VersionSet* vset, const autovector<MemTable*>& memtables_to_flush) {
const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0; uint64_t min_log = 0;
// we must look through the memtables for two phase transactions // we must look through the memtables for two phase transactions
@ -679,7 +678,7 @@ uint64_t FindMinPrepLogReferencedByMemTable(
std::unordered_set<MemTable*> memtables_to_flush_set( std::unordered_set<MemTable*> memtables_to_flush_set(
memtables_to_flush.begin(), memtables_to_flush.end()); memtables_to_flush.begin(), memtables_to_flush.end());
for (auto loop_cfd : *vset->GetColumnFamilySet()) { for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { if (loop_cfd->IsDropped()) {
continue; continue;
} }
@ -701,18 +700,16 @@ uint64_t FindMinPrepLogReferencedByMemTable(
} }
uint64_t FindMinPrepLogReferencedByMemTable( uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, VersionSet* vset,
const autovector<const autovector<MemTable*>*>& memtables_to_flush) { const autovector<const autovector<MemTable*>*>& memtables_to_flush) {
uint64_t min_log = 0; uint64_t min_log = 0;
std::unordered_set<ColumnFamilyData*> cfds_to_flush_set(cfds_to_flush.begin(),
cfds_to_flush.end());
std::unordered_set<MemTable*> memtables_to_flush_set; std::unordered_set<MemTable*> memtables_to_flush_set;
for (const autovector<MemTable*>* memtables : memtables_to_flush) { for (const autovector<MemTable*>* memtables : memtables_to_flush) {
memtables_to_flush_set.insert(memtables->begin(), memtables->end()); memtables_to_flush_set.insert(memtables->begin(), memtables->end());
} }
for (auto loop_cfd : *vset->GetColumnFamilySet()) { for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped() || cfds_to_flush_set.count(loop_cfd)) { if (loop_cfd->IsDropped()) {
continue; continue;
} }
@ -828,8 +825,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
min_log_number_to_keep = min_log_in_prep_heap; min_log_number_to_keep = min_log_in_prep_heap;
} }
uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( uint64_t min_log_refed_by_mem =
vset, &cfd_to_flush, memtables_to_flush); FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush);
if (min_log_refed_by_mem != 0 && if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < min_log_number_to_keep) { min_log_refed_by_mem < min_log_number_to_keep) {
@ -859,8 +856,8 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
min_log_number_to_keep = min_log_in_prep_heap; min_log_number_to_keep = min_log_in_prep_heap;
} }
uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( uint64_t min_log_refed_by_mem =
vset, cfds_to_flush, memtables_to_flush); FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush);
if (min_log_refed_by_mem != 0 && if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < min_log_number_to_keep) { min_log_refed_by_mem < min_log_number_to_keep) {

@ -1374,6 +1374,12 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) { MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(cfd);
assert(cfd->imm());
// The immutable memtable list must be empty.
assert(std::numeric_limits<uint64_t>::max() ==
cfd->imm()->GetEarliestMemTableID());
const uint64_t start_micros = immutable_db_options_.clock->NowMicros(); const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
FileMetaData meta; FileMetaData meta;

@ -148,6 +148,78 @@ TEST_P(TransactionTest, SuccessTest) {
delete txn; delete txn;
} }
TEST_P(TransactionTest, SwitchMemtableDuringPrepareAndCommit_WC) {
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) {
ROCKSDB_GTEST_BYPASS("Test applies to write-committed only");
return;
}
ASSERT_OK(db->Put(WriteOptions(), "key0", "value"));
TransactionOptions txn_opts;
txn_opts.use_only_the_last_commit_time_batch_for_recovery = true;
Transaction* txn = db->BeginTransaction(WriteOptions(), txn_opts);
assert(txn);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&](void* arg) {
// db mutex not held.
auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
assert(mems);
ASSERT_EQ(1, mems->size());
auto* ctwb = txn->GetCommitTimeWriteBatch();
ASSERT_OK(ctwb->Put("gtid", "123"));
ASSERT_OK(txn->Commit());
delete txn;
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(txn->Put("key1", "value"));
ASSERT_OK(txn->SetName("txn1"));
ASSERT_OK(txn->Prepare());
auto dbimpl = static_cast_with_check<DBImpl>(db->GetRootDB());
ASSERT_OK(dbimpl->TEST_SwitchMemtable(nullptr));
ASSERT_OK(dbimpl->TEST_FlushMemTable(
/*wait=*/false, /*allow_write_stall=*/true, /*cfh=*/nullptr));
ASSERT_OK(dbimpl->TEST_WaitForFlushMemTable());
{
std::string value;
ASSERT_OK(db->Get(ReadOptions(), "key1", &value));
ASSERT_EQ("value", value);
}
delete db;
db = nullptr;
Status s;
if (use_stackable_db_ == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else {
s = OpenWithStackableDB();
}
ASSERT_OK(s);
assert(db);
{
std::string value;
ASSERT_OK(db->Get(ReadOptions(), "gtid", &value));
ASSERT_EQ("123", value);
ASSERT_OK(db->Get(ReadOptions(), "key1", &value));
ASSERT_EQ("value", value);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// The test clarifies the contract of do_validate and assume_tracked // The test clarifies the contract of do_validate and assume_tracked
// in GetForUpdate and Put/Merge/Delete // in GetForUpdate and Put/Merge/Delete
TEST_P(TransactionTest, AssumeExclusiveTracked) { TEST_P(TransactionTest, AssumeExclusiveTracked) {

Loading…
Cancel
Save