Disallow commit-time-batch for write-prepared/write-unprepared txn conditionally (#9794)

Summary:
For write-prepared/write-unprepared transactions,
GetCommitTimeWriteBatch() can be used only if the transaction is started
with `TransactionOptions::use_only_the_last_commit_time_batch_for_recovery` set
to true. Otherwise, it is possible that multiple uncommitted versions of the
same key exist in the database. During bottommost compaction, RocksDB may
set the sequence numbers of both to zero once they become committed, causing
output SST file to have two identical internal keys.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9794

Test Plan:
make check
pay special attention to the following
```
transaction_test --gtest_filter=MySQLStyleTransactionTest/MySQLStyleTransactionTest.TransactionStressTest/*
```

Reviewed By: lth

Differential Revision: D35327214

Pulled By: riversand963

fbshipit-source-id: 3bae00a28359c10e96e4c6f676d20de5610d8a0f
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 6534c6dea4
commit 1a1c5bda23
  1. 3
      HISTORY.md
  2. 2
      db/db_impl/db_impl.h
  3. 12
      include/rocksdb/utilities/transaction.h
  4. 2
      include/rocksdb/utilities/transaction_db.h
  5. 30
      utilities/transactions/transaction_test.cc
  6. 23
      utilities/transactions/write_prepared_txn.cc
  7. 8
      utilities/transactions/write_unprepared_txn.cc

@ -13,6 +13,9 @@
* For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000. * For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000.
* For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated. * For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated.
### Behavior changes
* Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794).
## 7.1.0 (03/23/2022) ## 7.1.0 (03/23/2022)
### New Features ### New Features
* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp. * Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp.

@ -2144,7 +2144,7 @@ class DBImpl : public DB {
// only during recovery. Using this feature enables not writing the state to // only during recovery. Using this feature enables not writing the state to
// memtable on normal writes and hence improving the throughput. Each new // memtable on normal writes and hence improving the throughput. Each new
// write of the state will replace the previous state entirely even if the // write of the state will replace the previous state entirely even if the
// keys in the two consecuitive states do not overlap. // keys in the two consecutive states do not overlap.
// It is protected by log_write_mutex_ when two_write_queues_ is enabled. // It is protected by log_write_mutex_ when two_write_queues_ is enabled.
// Otherwise only the heaad of write_thread_ can access it. // Otherwise only the heaad of write_thread_ can access it.
WriteBatch cached_recoverable_state_; WriteBatch cached_recoverable_state_;

@ -557,6 +557,18 @@ class Transaction {
virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) = 0; virtual Status RebuildFromWriteBatch(WriteBatch* src_batch) = 0;
// Note: data in the commit-time-write-batch bypasses concurrency control,
// thus should be used with great caution.
// For write-prepared/write-unprepared transactions,
// GetCommitTimeWriteBatch() can be used only if the transaction is started
// with
// `TransactionOptions::use_only_the_last_commit_time_batch_for_recovery` set
// to true. Otherwise, it is possible that two uncommitted versions of the
// same key exist in the database due to the current implementation (see the
// explanation in WritePreparedTxn::CommitInternal).
// During bottommost compaction, RocksDB may
// set the sequence numbers of both to zero once becoming committed, causing
// output SST file to have two identical internal keys.
virtual WriteBatch* GetCommitTimeWriteBatch() = 0; virtual WriteBatch* GetCommitTimeWriteBatch() = 0;
virtual void SetLogNumber(uint64_t log) { log_number_ = log; } virtual void SetLogNumber(uint64_t log) { log_number_ = log; }

@ -259,6 +259,8 @@ struct TransactionOptions {
// meant to be used later during recovery. It enables an optimization to // meant to be used later during recovery. It enables an optimization to
// postpone updating the memtable with CommitTimeWriteBatch to only // postpone updating the memtable with CommitTimeWriteBatch to only
// SwitchMemtable or recovery. // SwitchMemtable or recovery.
// This option does not affect write-committed. Only
// write-prepared/write-unprepared transactions will be affected.
bool use_only_the_last_commit_time_batch_for_recovery = false; bool use_only_the_last_commit_time_batch_for_recovery = false;
// TODO(agiardullo): TransactionDB does not yet support comparators that allow // TODO(agiardullo): TransactionDB does not yet support comparators that allow

@ -1018,10 +1018,12 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
ASSERT_EQ(value, "bar2"); ASSERT_EQ(value, "bar2");
// commit time put // commit time put
if (cwb4recovery) {
ASSERT_OK( ASSERT_OK(
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs"))); txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")));
ASSERT_OK( ASSERT_OK(
txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats"))); txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")));
}
// nothing has been prepped yet // nothing has been prepped yet
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
@ -1054,16 +1056,6 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_EQ(value, "bar"); ASSERT_EQ(value, "bar");
if (!cwb4recovery) {
s = db->Get(read_options, "gtid", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "dogs");
s = db->Get(read_options, "gtid2", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "cats");
}
// we already committed // we already committed
s = txn->Commit(); s = txn->Commit();
ASSERT_EQ(s, Status::InvalidArgument()); ASSERT_EQ(s, Status::InvalidArgument());
@ -1219,8 +1211,10 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
delete txn1; delete txn1;
if (cwb4recovery) {
ASSERT_OK( ASSERT_OK(
txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar"))); txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")));
}
s = txn2->Prepare(); s = txn2->Prepare();
ASSERT_OK(s); ASSERT_OK(s);
@ -1229,11 +1223,7 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) {
ASSERT_OK(s); ASSERT_OK(s);
delete txn2; delete txn2;
if (!cwb4recovery) { if (cwb4recovery) {
s = db->Get(read_options, "foo", &value);
ASSERT_OK(s);
ASSERT_EQ(value, "bar");
} else {
if (test_with_empty_wal) { if (test_with_empty_wal) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB()); DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
ASSERT_OK(db_impl->TEST_FlushMemTable(true)); ASSERT_OK(db_impl->TEST_FlushMemTable(true));
@ -5442,9 +5432,8 @@ Status TransactionStressTestInserter(
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options; ReadOptions read_options;
TransactionOptions txn_options; TransactionOptions txn_options;
if (rand->OneIn(2)) {
txn_options.use_only_the_last_commit_time_batch_for_recovery = true; txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
}
// Inside the inserter we might also retake the snapshot. We do both since two // Inside the inserter we might also retake the snapshot. We do both since two
// separte functions are engaged for each. // separte functions are engaged for each.
txn_options.set_snapshot = rand->OneIn(2); txn_options.set_snapshot = rand->OneIn(2);
@ -5887,7 +5876,7 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_OK(ReOpen()); ASSERT_OK(ReOpen());
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
TransactionOptions txn_options; TransactionOptions txn_options;
txn_options.use_only_the_last_commit_time_batch_for_recovery = false; txn_options.use_only_the_last_commit_time_batch_for_recovery = true;
WriteOptions write_options; WriteOptions write_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options); Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto s = txn0->SetName("xid"); auto s = txn0->SetName("xid");
@ -5991,8 +5980,13 @@ TEST_P(TransactionTest, DuplicateKeys) {
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
if (with_commit_batch) { if (with_commit_batch) {
s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val); s = db->Get(ropt, db->DefaultColumnFamily(), "foo6", &pinnable_val);
if (txn_db_options.write_policy ==
TxnDBWritePolicy::WRITE_COMMITTED) {
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == ("bar6b")); ASSERT_TRUE(pinnable_val == ("bar6b"));
} else {
ASSERT_TRUE(s.IsNotFound());
}
s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val); s = db->Get(ropt, db->DefaultColumnFamily(), "foo7", &pinnable_val);
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
} }

@ -154,11 +154,17 @@ Status WritePreparedTxn::CommitInternal() {
assert(s.ok()); assert(s.ok());
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
if (!empty && for_recovery) { if (!empty) {
// When not writing to memtable, we can still cache the latest write batch. // When not writing to memtable, we can still cache the latest write batch.
// The cached batch will be written to memtable in WriteRecoverableState // The cached batch will be written to memtable in WriteRecoverableState
// during FlushMemTable // during FlushMemTable
if (for_recovery) {
WriteBatchInternal::SetAsLatestPersistentState(working_batch); WriteBatchInternal::SetAsLatestPersistentState(working_batch);
} else {
return Status::InvalidArgument(
"Commit-time-batch can only be used if "
"use_only_the_last_commit_time_batch_for_recovery is true");
}
} }
auto prepare_seq = GetId(); auto prepare_seq = GetId();
@ -195,6 +201,21 @@ Status WritePreparedTxn::CommitInternal() {
// redundantly reference the log that contains the prepared data. // redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull; const uint64_t zero_log_number = 0ull;
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
// If `two_write_queues && includes_data`, then `do_one_write` is false. The
// following `WriteImpl` will insert the data of the commit-time-batch into
// the database before updating the commit cache. Therefore, the data of the
// commmit-time-batch is considered uncommitted. Furthermore, since data of
// the commit-time-batch are not locked, it is possible for two uncommitted
// versions of the same key to co-exist for a (short) period of time until
// the commit cache is updated by the second write. If the two uncommitted
// keys are compacted to the bottommost level in the meantime, it is possible
// that compaction iterator will zero out the sequence numbers of both, thus
// violating the invariant that an SST does not have two identical internal
// keys. To prevent this situation, we should allow the usage of
// commit-time-batch only if the user sets
// TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to
// true. See the comments about GetCommitTimeWriteBatch() in
// include/rocksdb/utilities/transaction.h.
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used, zero_log_number, disable_memtable, &seq_used,
batch_cnt, pre_release_callback); batch_cnt, pre_release_callback);

@ -550,11 +550,17 @@ Status WriteUnpreparedTxn::CommitInternal() {
assert(s.ok()); assert(s.ok());
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
if (!empty && for_recovery) { if (!empty) {
// When not writing to memtable, we can still cache the latest write batch. // When not writing to memtable, we can still cache the latest write batch.
// The cached batch will be written to memtable in WriteRecoverableState // The cached batch will be written to memtable in WriteRecoverableState
// during FlushMemTable // during FlushMemTable
if (for_recovery) {
WriteBatchInternal::SetAsLatestPersistentState(working_batch); WriteBatchInternal::SetAsLatestPersistentState(working_batch);
} else {
return Status::InvalidArgument(
"Commit-time-batch can only be used if "
"use_only_the_last_commit_time_batch_for_recovery is true");
}
} }
const bool includes_data = !empty && !for_recovery; const bool includes_data = !empty && !for_recovery;

Loading…
Cancel
Save