diff --git a/db/db_impl.h b/db/db_impl.h index 462b8c9b9..24d4f8c86 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -669,6 +669,9 @@ class DBImpl : public DB { uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); + // write cached_recoverable_state_ to memtable if it is not empty + // The writer must be the leader in write_thread_ and holding mutex_ + Status WriteRecoverableState(); private: friend class DB; @@ -800,7 +803,8 @@ class DBImpl : public DB { WriteContext* write_context); WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group, - WriteBatch* tmp_batch, size_t* write_with_wal); + WriteBatch* tmp_batch, size_t* write_with_wal, + WriteBatch** to_be_cached_state); Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size); @@ -990,6 +994,15 @@ class DBImpl : public DB { std::deque logs_; // Signaled when getting_synced becomes false for some of the logs_. InstrumentedCondVar log_sync_cv_; + // This is the app-level state that is written to the WAL but will be used + // only during recovery. Using this feature enables not writing the state to + // memtable on normal writes and hence improving the throughput. Each new + // write of the state will replace the previous state entirely even if the + // keys in the two consecuitive states do not overlap. + // It is protected by log_write_mutex_ when concurrent_prepare_ is enabled. + // Otherwise only the heaad of write_thread_ can access it. + WriteBatch cached_recoverable_state_; + std::atomic cached_recoverable_state_empty_ = {true}; std::atomic total_log_size_; // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 4436d3acf..6dc28f969 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -947,7 +947,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, WriteContext context; InstrumentedMutexLock guard_lock(&mutex_); - if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) { + if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() && + cached_recoverable_state_empty_.load()) { // Nothing to flush return Status::OK(); } @@ -957,8 +958,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, write_thread_.EnterUnbatched(&w, &mutex_); } - // SwitchMemtable() will release and reacquire mutex - // during execution + // SwitchMemtable() will release and reacquire mutex during execution s = SwitchMemtable(cfd, &context); if (!writes_stopped) { diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 55801e827..67ebc08fa 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -75,6 +75,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with seq_per_batch"); } + // Otherwise IsLatestPersistentState optimization does not make sense + assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || + disable_memtable); Status status; if (write_options.low_pri) { @@ -678,9 +681,11 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, } WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, - WriteBatch* tmp_batch, size_t* write_with_wal) { + WriteBatch* tmp_batch, size_t* write_with_wal, + WriteBatch** to_be_cached_state) { assert(write_with_wal != nullptr); assert(tmp_batch != nullptr); + assert(*to_be_cached_state == nullptr); WriteBatch* merged_batch = nullptr; *write_with_wal = 0; auto* leader = write_group.leader; @@ -690,6 +695,9 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated merged_batch = leader->batch; + if (WriteBatchInternal::IsLatestPersistentState(merged_batch)) { + *to_be_cached_state = merged_batch; + } *write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. @@ -700,6 +708,10 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, if (writer->ShouldWriteToWAL()) { WriteBatchInternal::Append(merged_batch, writer->batch, /*WAL_only*/ true); + if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { + // We only need to cache the last of such write batch + *to_be_cached_state = writer->batch; + } (*write_with_wal)++; } } @@ -734,8 +746,9 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, Status status; size_t write_with_wal = 0; - WriteBatch* merged_batch = - MergeBatch(write_group, &tmp_batch_, &write_with_wal); + WriteBatch* to_be_cached_state = nullptr; + WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_, + &write_with_wal, &to_be_cached_state); if (merged_batch == write_group.leader->batch) { write_group.leader->log_used = logfile_number_; } else if (write_with_wal > 1) { @@ -748,6 +761,10 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t log_size; status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + if (to_be_cached_state) { + cached_recoverable_state_ = *to_be_cached_state; + cached_recoverable_state_empty_ = false; + } if (status.ok() && need_log_sync) { StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); @@ -797,8 +814,9 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, WriteBatch tmp_batch; size_t write_with_wal = 0; + WriteBatch* to_be_cached_state = nullptr; WriteBatch* merged_batch = - MergeBatch(write_group, &tmp_batch, &write_with_wal); + MergeBatch(write_group, &tmp_batch, &write_with_wal, &to_be_cached_state); // We need to lock log_write_mutex_ since logs_ and alive_log_files might be // pushed back concurrently @@ -817,6 +835,10 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer = logs_.back().writer; uint64_t log_size; status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + if (to_be_cached_state) { + cached_recoverable_state_ = *to_be_cached_state; + cached_recoverable_state_empty_ = false; + } log_write_mutex_.Unlock(); if (status.ok()) { @@ -831,6 +853,34 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, return status; } +Status DBImpl::WriteRecoverableState() { + mutex_.AssertHeld(); + if (!cached_recoverable_state_empty_) { + bool dont_care_bool; + SequenceNumber next_seq; + if (concurrent_prepare_) { + log_write_mutex_.Lock(); + } + SequenceNumber seq = versions_->LastSequence(); + WriteBatchInternal::SetSequence(&cached_recoverable_state_, ++seq); + auto status = WriteBatchInternal::InsertInto( + &cached_recoverable_state_, column_family_memtables_.get(), + &flush_scheduler_, true, 0 /*recovery_log_number*/, this, + false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, + seq_per_batch_); + versions_->SetLastSequence(--next_seq); + if (concurrent_prepare_) { + log_write_mutex_.Unlock(); + } + if (status.ok()) { + cached_recoverable_state_.Clear(); + cached_recoverable_state_empty_ = true; + } + return status; + } + return Status::OK(); +} + Status DBImpl::SwitchWAL(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); @@ -1069,6 +1119,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; + // Recoverable state is persisted in WAL. After memtable switch, WAL might + // be deleted, so we write the state to memtable to be persisted as well. + Status s = WriteRecoverableState(); + if (!s.ok()) { + return s; + } + // In case of pipelined write is enabled, wait for all pending memtable // writers. if (immutable_db_options_.enable_pipelined_write) { @@ -1112,7 +1169,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { const auto preallocate_block_size = GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); mutex_.Unlock(); - Status s; { if (creating_new_log) { EnvOptions opt_env_opt = diff --git a/db/write_batch.cc b/db/write_batch.cc index 18475e944..76203ea1d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -494,6 +494,14 @@ Status WriteBatch::Iterate(Handler* handler) const { } } +bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) { + return b->is_latest_persistent_state_; +} + +void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) { + b->is_latest_persistent_state_ = true; +} + int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 7c2f42d49..3cca5435d 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -189,6 +189,11 @@ class WriteBatchInternal { // Returns the byte size of appending a WriteBatch with ByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); + + // This write batch includes the latest state that should be persisted. Such + // state meant to be used only during recovery. + static void SetAsLastestPersistentState(WriteBatch* b); + static bool IsLatestPersistentState(const WriteBatch* b); }; // LocalSavePoint is similar to a scope guard diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 77043897a..b5a33d18b 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -97,6 +97,12 @@ struct TransactionOptions { // Status::Busy. The user should retry their transaction. bool deadlock_detect = false; + // If set, it states that the CommitTimeWriteBatch represents the latest state + // of the application and meant to be used later during recovery. It enables + // an optimization to postpone updating the memtable with CommitTimeWriteBatch + // to only SwithcMamtable or recovery. + bool use_only_the_last_commit_time_batch_for_recovery = false; + // TODO(agiardullo): TransactionDB does not yet support comparators that allow // two non-equal keys to be equivalent. Ie, cmp->Compare(a,b) should only // return 0 if diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 41f491b72..b612dd442 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -347,6 +347,12 @@ class WriteBatch : public WriteBatchBase { // Maximum size of rep_. size_t max_bytes_; + // Is the content of the batch the application's latest state that meant only + // to be used for recovery? Refer to + // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery for + // more details. + bool is_latest_persistent_state_ = false; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ diff --git a/options/options_parser.h b/options/options_parser.h index 5545c0b0f..5aab3e7e9 100644 --- a/options/options_parser.h +++ b/options/options_parser.h @@ -9,7 +9,6 @@ #include #include -#include "options/options_helper.h" #include "options/options_sanity_check.h" #include "rocksdb/env.h" #include "rocksdb/options.h" diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 33e9d4c18..58d0d8a4e 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -13,7 +13,7 @@ #include -#include "options/options_parser.h" +#include "options/options_helper.h" #include "rocksdb/convenience.h" #include "util/testharness.h" @@ -427,7 +427,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "hard_pending_compaction_bytes_limit=0;" "disable_auto_compactions=false;" "report_bg_io_stats=true;" - "compaction_options_fifo={max_table_files_size=3;ttl=100;allow_compaction=false;};", + "compaction_options_fifo={max_table_files_size=3;ttl=100;allow_" + "compaction=false;};", new_options)); ASSERT_EQ(unset_bytes_base, diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index c2bd61379..4a2d4d84b 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -82,6 +82,8 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) { if (expiration_time_ > 0) { txn_db_impl_->InsertExpirableTransaction(txn_id_, this); } + use_only_the_last_commit_time_batch_for_recovery_ = + txn_options.use_only_the_last_commit_time_batch_for_recovery; } PessimisticTransaction::~PessimisticTransaction() { diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 95045c04b..be7487a83 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -113,6 +113,10 @@ class PessimisticTransaction : public TransactionBaseImpl { int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } protected: + // Refer to + // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery + bool use_only_the_last_commit_time_batch_for_recovery_ = false; + virtual Status PrepareInternal() = 0; virtual Status CommitWithoutPrepareInternal() = 0; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 0031fc3ae..312bd9efc 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -114,16 +114,17 @@ class PessimisticTransactionDB : public TransactionDB { void SetDeadlockInfoBufferSize(uint32_t target_size) override; protected: + DBImpl* db_impl_; + std::shared_ptr info_log_; + const TransactionDBOptions txn_db_options_; + void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, const TransactionOptions& txn_options = TransactionOptions()); - DBImpl* db_impl_; - std::shared_ptr info_log_; private: friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; - const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; // Must be held when adding/dropping column families. diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 6bdb9ffe4..335a756a1 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -303,7 +303,8 @@ class TransactionBaseImpl : public Transaction { WriteBatchWithIndex write_batch_; private: - // batch to be written at commit time + // Extra data to be persisted with the commit. Note this is only used when + // prepare phase is not skipped. WriteBatch commit_time_batch_; // Stack of the Snapshot saved at each save point. Saved snapshots may be diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index de3199db8..9c7d08641 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -43,13 +43,10 @@ INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionTest, ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), std::make_tuple(false, true, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( StackableDBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(true, false, WRITE_COMMITTED), - std::make_tuple(true, true, WRITE_COMMITTED), - std::make_tuple(true, false, WRITE_PREPARED), + ::testing::Values(std::make_tuple(true, true, WRITE_COMMITTED), std::make_tuple(true, true, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, @@ -707,112 +704,131 @@ TEST_P(TransactionTest, CommitTimeBatchFailTest) { } TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { - WriteOptions write_options; - ReadOptions read_options; + for (bool cwb4recovery : {true, false}) { + ReOpen(); + WriteOptions write_options; + ReadOptions read_options; - TransactionOptions txn_options; + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = cwb4recovery; - string value; - Status s; + string value; + Status s; - DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - Transaction* txn = db->BeginTransaction(write_options, txn_options); - s = txn->SetName("xid"); - ASSERT_OK(s); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); - ASSERT_EQ(db->GetTransactionByName("xid"), txn); + ASSERT_EQ(db->GetTransactionByName("xid"), txn); - // transaction put - s = txn->Put(Slice("foo"), Slice("bar")); - ASSERT_OK(s); - ASSERT_EQ(1, txn->GetNumPuts()); + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); - // regular db put - s = db->Put(write_options, Slice("foo2"), Slice("bar2")); - ASSERT_OK(s); - ASSERT_EQ(1, txn->GetNumPuts()); + // regular db put + s = db->Put(write_options, Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); - // regular db read - db->Get(read_options, "foo2", &value); - ASSERT_EQ(value, "bar2"); + // regular db read + db->Get(read_options, "foo2", &value); + ASSERT_EQ(value, "bar2"); - // commit time put - txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")); - txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")); + // commit time put + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid"), Slice("dogs")); + txn->GetCommitTimeWriteBatch()->Put(Slice("gtid2"), Slice("cats")); - // nothing has been prepped yet - ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + // nothing has been prepped yet + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); - s = txn->Prepare(); - ASSERT_OK(s); + s = txn->Prepare(); + ASSERT_OK(s); - // data not im mem yet - s = db->Get(read_options, Slice("foo"), &value); - ASSERT_TRUE(s.IsNotFound()); - s = db->Get(read_options, Slice("gtid"), &value); - ASSERT_TRUE(s.IsNotFound()); + // data not im mem yet + s = db->Get(read_options, Slice("foo"), &value); + ASSERT_TRUE(s.IsNotFound()); + s = db->Get(read_options, Slice("gtid"), &value); + ASSERT_TRUE(s.IsNotFound()); - // find trans in list of prepared transactions - std::vector prepared_trans; - db->GetAllPreparedTransactions(&prepared_trans); - ASSERT_EQ(prepared_trans.size(), 1); - ASSERT_EQ(prepared_trans.front()->GetName(), "xid"); + // find trans in list of prepared transactions + std::vector prepared_trans; + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 1); + ASSERT_EQ(prepared_trans.front()->GetName(), "xid"); - auto log_containing_prep = - db_impl->TEST_FindMinLogContainingOutstandingPrep(); - ASSERT_GT(log_containing_prep, 0); + auto log_containing_prep = + db_impl->TEST_FindMinLogContainingOutstandingPrep(); + ASSERT_GT(log_containing_prep, 0); - // make commit - s = txn->Commit(); - ASSERT_OK(s); + // make commit + s = txn->Commit(); + ASSERT_OK(s); - // value is now available - s = db->Get(read_options, "foo", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "bar"); + // value is now available + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); - s = db->Get(read_options, "gtid", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "dogs"); + if (!cwb4recovery) { + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); - s = db->Get(read_options, "gtid2", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "cats"); + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } - // we already committed - s = txn->Commit(); - ASSERT_EQ(s, Status::InvalidArgument()); + // we already committed + s = txn->Commit(); + ASSERT_EQ(s, Status::InvalidArgument()); - // no longer is prpared results - db->GetAllPreparedTransactions(&prepared_trans); - ASSERT_EQ(prepared_trans.size(), 0); - ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); + // no longer is prpared results + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); - // heap should not care about prepared section anymore - ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + // heap should not care about prepared section anymore + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); - switch (txn_db_options.write_policy) { - case WRITE_COMMITTED: - // but now our memtable should be referencing the prep section - ASSERT_EQ(log_containing_prep, - db_impl->TEST_FindMinPrepLogReferencedByMemTable()); - break; - case WRITE_PREPARED: - case WRITE_UNPREPARED: - // In these modes memtable do not ref the prep sections - ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); - break; - default: - assert(false); - } + switch (txn_db_options.write_policy) { + case WRITE_COMMITTED: + // but now our memtable should be referencing the prep section + ASSERT_EQ(log_containing_prep, + db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + case WRITE_PREPARED: + case WRITE_UNPREPARED: + // In these modes memtable do not ref the prep sections + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + break; + default: + assert(false); + } - db_impl->TEST_FlushMemTable(true); + db_impl->TEST_FlushMemTable(true); - // after memtable flush we can now relese the log - ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); + // after memtable flush we can now relese the log + ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); - delete txn; + delete txn; + + if (cwb4recovery) { + // kill and reopen to trigger recovery + s = ReOpenNoDelete(); + ASSERT_OK(s); + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } + } } TEST_P(TransactionTest, TwoPhaseNameTest) { @@ -873,44 +889,67 @@ TEST_P(TransactionTest, TwoPhaseNameTest) { } TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { - Status s; - std::string value; - - WriteOptions write_options; - ReadOptions read_options; - TransactionOptions txn_options; - Transaction* txn1 = db->BeginTransaction(write_options, txn_options); - ASSERT_TRUE(txn1); - Transaction* txn2 = db->BeginTransaction(write_options, txn_options); - ASSERT_TRUE(txn2); - - s = txn1->SetName("joe"); - ASSERT_OK(s); - - s = txn2->SetName("bob"); - ASSERT_OK(s); + for (bool cwb4recovery : {true, false}) { + for (bool test_with_empty_wal : {true, false}) { + if (!cwb4recovery && test_with_empty_wal) { + continue; + } + ReOpen(); + Status s; + std::string value; + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = + cwb4recovery; + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn1); + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2); + + s = txn1->SetName("joe"); + ASSERT_OK(s); - s = txn1->Prepare(); - ASSERT_OK(s); + s = txn2->SetName("bob"); + ASSERT_OK(s); - s = txn1->Commit(); - ASSERT_OK(s); + s = txn1->Prepare(); + ASSERT_OK(s); - delete txn1; + s = txn1->Commit(); + ASSERT_OK(s); - txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")); + delete txn1; - s = txn2->Prepare(); - ASSERT_OK(s); + txn2->GetCommitTimeWriteBatch()->Put(Slice("foo"), Slice("bar")); - s = txn2->Commit(); - ASSERT_OK(s); + s = txn2->Prepare(); + ASSERT_OK(s); - s = db->Get(read_options, "foo", &value); - ASSERT_OK(s); - ASSERT_EQ(value, "bar"); + s = txn2->Commit(); + ASSERT_OK(s); - delete txn2; + delete txn2; + if (!cwb4recovery) { + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + } else { + if (test_with_empty_wal) { + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + db_impl->TEST_FlushMemTable(true); + } + db->FlushWAL(true); + // kill and reopen to trigger recovery + s = ReOpenNoDelete(); + ASSERT_OK(s); + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); + } + } + } } TEST_P(TransactionTest, TwoPhaseExpirationTest) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 16499cc33..8ececbcb1 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -122,8 +122,13 @@ Status WritePreparedTxn::CommitInternal() { const bool empty = working_batch->Count() == 0; WriteBatchInternal::MarkCommit(working_batch, name_); - // any operations appended to this working_batch will be ignored from WAL - working_batch->MarkWalTerminationPoint(); + const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; + if (!empty && for_recovery) { + // When not writing to memtable, we can still cache the latest write batch. + // The cached batch will be written to memtable in WriteRecoverableState + // during FlushMemTable + WriteBatchInternal::SetAsLastestPersistentState(working_batch); + } const bool disable_memtable = true; uint64_t seq_used = kMaxSequenceNumber; @@ -133,14 +138,14 @@ Status WritePreparedTxn::CommitInternal() { const uint64_t zero_log_number = 0ull; auto s = db_impl_->WriteImpl( write_options_, working_batch, nullptr, nullptr, zero_log_number, - empty ? disable_memtable : !disable_memtable, &seq_used); + empty || for_recovery ? disable_memtable : !disable_memtable, &seq_used); assert(seq_used != kMaxSequenceNumber); uint64_t& commit_seq = seq_used; // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode // commit_seq. This happens if prep_seq <<< commit_seq. auto prepare_seq = GetId(); wpt_db_->AddCommitted(prepare_seq, commit_seq); - if (!empty) { + if (!empty && !for_recovery) { // Commit the data that is accompnaied with the commit marker // TODO(myabandeh): skip AddPrepared wpt_db_->AddPrepared(commit_seq);