From 0377ff9dea5adb99c442f264269068f76b8a2c03 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 28 Mar 2018 12:01:09 -0700 Subject: [PATCH] WritePrepared Txn: make recoverable state visible after flush Summary: Currently if the CommitTimeWriteBatch is set to be used only as a state that is required only for recovery , the user cannot see that in DB until it is restarted. This while the state is already inserted into the DB after the memtable flush. It would be useful for debugging if make this state visible to the user after the flush by committing it. The patch does it by a invoking a callback that does the commit on the recoverable state. Closes https://github.com/facebook/rocksdb/pull/3661 Differential Revision: D7424577 Pulled By: maysamyabandeh fbshipit-source-id: 137f9408662f0853938b33fa440f27f04c1bbf5c --- db/db_impl.h | 7 +++++++ db/db_impl_write.cc | 13 +++++++++++++ utilities/transactions/transaction_test.cc | 14 ++++++++++++++ .../transactions/write_prepared_txn_db.cc | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+) diff --git a/db/db_impl.h b/db/db_impl.h index 3bf8d50f0..fc80fe39a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -621,6 +621,9 @@ class DBImpl : public DB { void SetSnapshotChecker(SnapshotChecker* snapshot_checker); + // Not thread-safe. + void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback); + InstrumentedMutex* mutex() { return &mutex_; } Status NewDB(); @@ -1354,6 +1357,10 @@ class DBImpl : public DB { // REQUIRES: mutex held std::unique_ptr snapshot_checker_; + // Callback for when the cached_recoverable_state_ is written to memtable + // Only to be set during initialization + std::unique_ptr recoverable_state_pre_release_callback_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index b4b92567a..3fb61010a 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -45,6 +45,11 @@ Status DBImpl::SingleDelete(const WriteOptions& write_options, return DB::SingleDelete(write_options, column_family, key); } +void DBImpl::SetRecoverableStatePreReleaseCallback( + PreReleaseCallback* callback) { + recoverable_state_pre_release_callback_.reset(callback); +} + Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { return WriteImpl(write_options, my_batch, nullptr, nullptr); } @@ -976,6 +981,14 @@ Status DBImpl::WriteRecoverableState() { if (two_write_queues_) { log_write_mutex_.Unlock(); } + if (status.ok() && recoverable_state_pre_release_callback_) { + const bool DISABLE_MEMTABLE = true; + for (uint64_t sub_batch_seq = seq + 1; + sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) { + status = recoverable_state_pre_release_callback_->Callback( + sub_batch_seq, !DISABLE_MEMTABLE); + } + } if (status.ok()) { cached_recoverable_state_.Clear(); cached_recoverable_state_empty_ = true; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 9d9d06082..8b34717ae 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -913,6 +913,16 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { } db_impl->TEST_FlushMemTable(true); + // After flush the recoverable state must be visible + if (cwb4recovery) { + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } // after memtable flush we can now relese the log ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); @@ -1044,6 +1054,10 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { if (test_with_empty_wal) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); db_impl->TEST_FlushMemTable(true); + // After flush the state must be visible + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); } db->FlushWAL(true); // kill and reopen to trigger recovery diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 48f185682..c30ab46fd 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -46,6 +46,24 @@ Status WritePreparedTxnDB::Initialize( AdvanceMaxEvictedSeq(prev_max, last_seq); db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); + // A callback to commit a single sub-batch + class CommitSubBatchPreReleaseCallback : public PreReleaseCallback { + public: + explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) + : db_(db) {} + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled) override { + assert(!is_mem_disabled); + const bool PREPARE_SKIPPED = true; + db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED); + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + }; + db_impl_->SetRecoverableStatePreReleaseCallback( + new CommitSubBatchPreReleaseCallback(this)); auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, handles);