From a16e00b7b9f65e881c3530908095d86b4ffecf3f Mon Sep 17 00:00:00 2001 From: Manuel Ung Date: Wed, 27 Jun 2018 12:05:29 -0700 Subject: [PATCH] WriteUnPrepared Txn: Disable seek to snapshot optimization (#3955) Summary: This is implemented by extending ReadCallback with another function `MaxUnpreparedSequenceNumber` which returns the largest visible sequence number for the current transaction, if there is uncommitted data written to DB. Otherwise, it returns zero, indicating no uncommitted data. There are the places where reads had to be modified. - Get and Seek/Next was just updated to seek to max(snapshot_seq, MaxUnpreparedSequenceNumber()) instead, and iterate until a key was visible. - Prev did not need need updates since it did not use the Seek to sequence number optimization. Assuming that locks were held when writing unprepared keys, and ValidateSnapshot runs, there should only be committed keys and unprepared keys of the current transaction, all of which are visible. Prev will simply iterate to get the last visible key. - Reseeking to skip keys optimization was also disabled for write unprepared, since it's possible to hit the max_skip condition even while reseeking. There needs to be some way to resolve infinite looping in this case. Closes https://github.com/facebook/rocksdb/pull/3955 Differential Revision: D8286688 Pulled By: lth fbshipit-source-id: 25e42f47fdeb5f7accea0f4fd350ef35198caafe --- db/db_impl.cc | 23 ++- db/db_iter.cc | 41 +++- db/db_iterator_test.cc | 4 +- db/db_merge_operator_test.cc | 2 +- db/db_test2.cc | 2 +- db/memtable.cc | 2 +- db/read_callback.h | 15 +- .../utilities/write_batch_with_index.h | 1 + table/get_context.h | 2 +- utilities/transactions/transaction_util.cc | 2 +- .../transactions/write_prepared_txn_db.cc | 7 +- .../transactions/write_prepared_txn_db.h | 6 +- .../write_unprepared_transaction_test.cc | 179 +++++++++++++++++- .../transactions/write_unprepared_txn.cc | 71 +++++++ utilities/transactions/write_unprepared_txn.h | 57 +++++- .../transactions/write_unprepared_txn_db.cc | 58 +++++- .../transactions/write_unprepared_txn_db.h | 12 +- 17 files changed, 448 insertions(+), 36 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 84c9dca75..adb0652ad 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1037,12 +1037,19 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SequenceNumber snapshot; if (read_options.snapshot != nullptr) { - // Note: In WritePrepared txns this is not necessary but not harmful either. - // Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is - // specified we should be fine with skipping seq numbers that are greater - // than that. + // Note: In WritePrepared txns this is not necessary but not harmful + // either. Because prep_seq > snapshot => commit_seq > snapshot so if + // a snapshot is specified we should be fine with skipping seq numbers + // that are greater than that. + // + // In WriteUnprepared, we cannot set snapshot in the lookup key because we + // may skip uncommitted data that should be visible to the transaction for + // reading own writes. snapshot = reinterpret_cast(read_options.snapshot)->number_; + if (callback) { + snapshot = std::max(snapshot, callback->MaxUnpreparedSequenceNumber()); + } } else { // Since we get and reference the super version before getting // the snapshot number, without a mutex protection, it is possible @@ -1050,10 +1057,10 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // data for this snapshot is available. But it will contain all // the data available in the super version we have, which is also // a valid snapshot to read from. - // We shouldn't get snapshot before finding and referencing the - // super versipon because a flush happening in between may compact - // away data for the snapshot, but the snapshot is earlier than the - // data overwriting it, so users may see wrong results. + // We shouldn't get snapshot before finding and referencing the super + // version because a flush happening in between may compact away data for + // the snapshot, but the snapshot is earlier than the data overwriting it, + // so users may see wrong results. snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); diff --git a/db/db_iter.cc b/db/db_iter.cc index e24db0fb4..90ad2cfd6 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -260,6 +260,18 @@ class DBIter final: public Iterator { bool TooManyInternalKeysSkipped(bool increment = true); bool IsVisible(SequenceNumber sequence); + // CanReseekToSkip() returns whether the iterator can use the optimization + // where it reseek by sequence number to get the next key when there are too + // many versions. This is disabled for write unprepared because seeking to + // sequence number does not guarantee that it is visible. + inline bool CanReseekToSkip(); + + // MaxVisibleSequenceNumber() returns the maximum visible sequence number + // for this snapshot. This sequence number may be greater than snapshot + // seqno because uncommitted data written to DB for write unprepared will + // have a higher sequence number. + inline SequenceNumber MaxVisibleSequenceNumber(); + // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called void TempPinData() { @@ -578,7 +590,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { // If we have sequentially iterated via numerous equal keys, then it's // better to seek so that we can avoid too many key comparisons. - if (num_skipped > max_skip_) { + if (num_skipped > max_skip_ && CanReseekToSkip()) { num_skipped = 0; std::string last_key; if (skipping) { @@ -895,7 +907,7 @@ bool DBIter::FindValueForCurrentKey() { // This user key has lots of entries. // We're going from old to new, and it's taking too long. Let's do a Seek() // and go from new to old. This helps when a key was overwritten many times. - if (num_skipped >= max_skip_) { + if (num_skipped >= max_skip_ && CanReseekToSkip()) { return FindValueForCurrentKeyUsingSeek(); } @@ -1194,7 +1206,7 @@ bool DBIter::FindUserKeyBeforeSavedKey() { PERF_COUNTER_ADD(internal_key_skipped_count, 1); } - if (num_skipped >= max_skip_) { + if (num_skipped >= max_skip_ && CanReseekToSkip()) { num_skipped = 0; IterKey last_key; last_key.SetInternalKey(ParsedInternalKey( @@ -1234,8 +1246,21 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) { } bool DBIter::IsVisible(SequenceNumber sequence) { - return sequence <= sequence_ && - (read_callback_ == nullptr || read_callback_->IsCommitted(sequence)); + return sequence <= MaxVisibleSequenceNumber() && + (read_callback_ == nullptr || read_callback_->IsVisible(sequence)); +} + +bool DBIter::CanReseekToSkip() { + return read_callback_ == nullptr || + read_callback_->MaxUnpreparedSequenceNumber() == 0; +} + +SequenceNumber DBIter::MaxVisibleSequenceNumber() { + if (read_callback_ == nullptr) { + return sequence_; + } + + return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber()); } void DBIter::Seek(const Slice& target) { @@ -1243,14 +1268,16 @@ void DBIter::Seek(const Slice& target) { status_ = Status::OK(); ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); + + SequenceNumber seq = MaxVisibleSequenceNumber(); saved_key_.Clear(); - saved_key_.SetInternalKey(target, sequence_); + saved_key_.SetInternalKey(target, seq); if (iterate_lower_bound_ != nullptr && user_comparator_->Compare(saved_key_.GetUserKey(), *iterate_lower_bound_) < 0) { saved_key_.Clear(); - saved_key_.SetInternalKey(*iterate_lower_bound_, sequence_); + saved_key_.SetInternalKey(*iterate_lower_bound_, seq); } { diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 7eeb12118..b040a02b3 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -20,7 +20,7 @@ namespace rocksdb { // A dumb ReadCallback which saying every key is committed. class DummyReadCallback : public ReadCallback { - bool IsCommitted(SequenceNumber /*seq*/) { return true; } + bool IsVisible(SequenceNumber /*seq*/) override { return true; } }; // Test param: @@ -2417,7 +2417,7 @@ TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) { explicit TestReadCallback(SequenceNumber last_visible_seq) : last_visible_seq_(last_visible_seq) {} - bool IsCommitted(SequenceNumber seq) override { + bool IsVisible(SequenceNumber seq) override { return seq <= last_visible_seq_; } diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 7b9808d66..2c9634755 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -20,7 +20,7 @@ class TestReadCallback : public ReadCallback { SequenceNumber snapshot_seq) : snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {} - bool IsCommitted(SequenceNumber seq) override { + bool IsVisible(SequenceNumber seq) override { return snapshot_checker_->IsInSnapshot(seq, snapshot_seq_); } diff --git a/db/db_test2.cc b/db/db_test2.cc index 5043b38a9..61f261369 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2422,7 +2422,7 @@ TEST_F(DBTest2, ReadCallbackTest) { class TestReadCallback : public ReadCallback { public: explicit TestReadCallback(SequenceNumber snapshot) : snapshot_(snapshot) {} - virtual bool IsCommitted(SequenceNumber seq) override { + virtual bool IsVisible(SequenceNumber seq) override { return seq <= snapshot_; } diff --git a/db/memtable.cc b/db/memtable.cc index 91559a596..c68827ad9 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -579,7 +579,7 @@ struct Saver { bool CheckCallback(SequenceNumber _seq) { if (callback_) { - return callback_->IsCommitted(_seq); + return callback_->IsVisible(_seq); } return true; } diff --git a/db/read_callback.h b/db/read_callback.h index f3fe35dfc..440f7848d 100644 --- a/db/read_callback.h +++ b/db/read_callback.h @@ -13,9 +13,18 @@ class ReadCallback { public: virtual ~ReadCallback() {} - // Will be called to see if the seq number accepted; if not it moves on to the - // next seq number. - virtual bool IsCommitted(SequenceNumber seq) = 0; + // Will be called to see if the seq number visible; if not it moves on to + // the next seq number. + virtual bool IsVisible(SequenceNumber seq) = 0; + + // This is called to determine the maximum visible sequence number for the + // current transaction for read-your-own-write semantics. This is so that + // for write unprepared, we will not skip keys that are written by the + // current transaction with the seek to snapshot optimization. + // + // For other uses, this returns zero, meaning that the current snapshot + // sequence number is the maximum visible sequence number. + inline virtual SequenceNumber MaxUnpreparedSequenceNumber() { return 0; }; }; } // namespace rocksdb diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 5fd7700f1..e79f5985f 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -229,6 +229,7 @@ class WriteBatchWithIndex : public WriteBatchBase { private: friend class PessimisticTransactionDB; friend class WritePreparedTxn; + friend class WriteUnpreparedTxn; friend class WriteBatchWithIndex_SubBatchCnt_Test; // Returns the number of sub-batches inside the write batch. A sub-batch // starts right before inserting a key that is a duplicate of a key in the diff --git a/table/get_context.h b/table/get_context.h index 90a5ff35c..2b9135676 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -72,7 +72,7 @@ class GetContext { bool CheckCallback(SequenceNumber seq) { if (callback_) { - return callback_->IsCommitted(seq); + return callback_->IsVisible(seq); } return true; } diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 545b92f7f..1d511880b 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -108,7 +108,7 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, } else if (found_record_for_key) { bool write_conflict = snap_checker == nullptr ? snap_seq < seq - : !snap_checker->IsCommitted(seq); + : !snap_checker->IsVisible(seq); if (write_conflict) { result = Status::Busy(); } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 13ab8679a..105c37df7 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -306,9 +306,10 @@ Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options, SequenceNumber min_uncommitted = 0; if (options.snapshot != nullptr) { snapshot_seq = options.snapshot->GetSequenceNumber(); - min_uncommitted = static_cast_with_check( - options.snapshot) - ->min_uncommitted_; + min_uncommitted = + static_cast_with_check( + options.snapshot) + ->min_uncommitted_; } else { auto* snapshot = GetSnapshot(); // We take a snapshot to make sure that the related data in the commit map diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index b62a6f74e..65797aa77 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -612,9 +612,9 @@ class WritePreparedTxnReadCallback : public ReadCallback { SequenceNumber min_uncommitted) : db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {} - // Will be called to see if the seq number accepted; if not it moves on to the - // next seq number. - inline virtual bool IsCommitted(SequenceNumber seq) override { + // Will be called to see if the seq number visible; if not it moves on to + // the next seq number. + inline virtual bool IsVisible(SequenceNumber seq) override { return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); } diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 96fe9bac9..115695cd0 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -10,8 +10,185 @@ #endif #include "utilities/transactions/transaction_test.h" +#include "utilities/transactions/write_unprepared_txn.h" +#include "utilities/transactions/write_unprepared_txn_db.h" -namespace rocksdb {} // namespace rocksdb +namespace rocksdb { + +class WriteUnpreparedTransactionTestBase : public TransactionTestBase { + public: + WriteUnpreparedTransactionTestBase(bool use_stackable_db, + bool two_write_queue, + TxnDBWritePolicy write_policy) + : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){} +}; + +class WriteUnpreparedTransactionTest + : public WriteUnpreparedTransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { + public: + WriteUnpreparedTransactionTest() + : WriteUnpreparedTransactionTestBase(std::get<0>(GetParam()), + std::get<1>(GetParam()), + std::get<2>(GetParam())){} +}; + +INSTANTIATE_TEST_CASE_P( + WriteUnpreparedTransactionTest, WriteUnpreparedTransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED), + std::make_tuple(false, true, WRITE_UNPREPARED))); + +TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { + auto verify_state = [](Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); + }; + + options.disable_auto_compactions = true; + ReOpen(); + + // The following tests checks whether reading your own write for + // a transaction works for write unprepared, when there are uncommitted + // values written into DB. + // + // Although the values written by DB::Put are technically committed, we add + // their seq num to unprep_seqs_ to pretend that they were written into DB + // as part of an unprepared batch, and then check if they are visible to the + // transaction. + auto snapshot0 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v1")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v2")); + auto snapshot2 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v3")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v4")); + auto snapshot4 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v5")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v6")); + auto snapshot6 = db->GetSnapshot(); + ASSERT_OK(db->Put(WriteOptions(), "a", "v7")); + ASSERT_OK(db->Put(WriteOptions(), "b", "v8")); + auto snapshot8 = db->GetSnapshot(); + + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast(txn); + + ReadOptions roptions; + roptions.snapshot = snapshot0; + + auto iter = txn->GetIterator(roptions); + + // Test Get(). + std::string value; + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + + ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); + ASSERT_EQ(value, "v3"); + + ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); + ASSERT_EQ(value, "v4"); + + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + + ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); + ASSERT_EQ(value, "v7"); + + ASSERT_OK(txn->Get(roptions, Slice("b"), &value)); + ASSERT_EQ(value, "v8"); + + wup_txn->unprep_seqs_.clear(); + + // Test Next(). + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + + iter->Seek("a"); + verify_state(iter, "a", "v3"); + + iter->Next(); + verify_state(iter, "b", "v4"); + + iter->SeekToFirst(); + verify_state(iter, "a", "v3"); + + iter->Next(); + verify_state(iter, "b", "v4"); + + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + + iter->Seek("a"); + verify_state(iter, "a", "v7"); + + iter->Next(); + verify_state(iter, "b", "v8"); + + iter->SeekToFirst(); + verify_state(iter, "a", "v7"); + + iter->Next(); + verify_state(iter, "b", "v8"); + + wup_txn->unprep_seqs_.clear(); + + // Test Prev(). For Prev(), we need to adjust the snapshot to match what is + // possible in WriteUnpreparedTxn. + // + // Because of row locks and ValidateSnapshot, there cannot be any committed + // entries after snapshot, but before the first prepared key. + delete iter; + roptions.snapshot = snapshot2; + iter = txn->GetIterator(roptions); + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + + iter->SeekForPrev("b"); + verify_state(iter, "b", "v4"); + + iter->Prev(); + verify_state(iter, "a", "v3"); + + iter->SeekToLast(); + verify_state(iter, "b", "v4"); + + iter->Prev(); + verify_state(iter, "a", "v3"); + + delete iter; + roptions.snapshot = snapshot6; + iter = txn->GetIterator(roptions); + wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = + snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + + iter->SeekForPrev("b"); + verify_state(iter, "b", "v8"); + + iter->Prev(); + verify_state(iter, "a", "v7"); + + iter->SeekToLast(); + verify_state(iter, "b", "v8"); + + iter->Prev(); + verify_state(iter, "a", "v7"); + + db->ReleaseSnapshot(snapshot0); + db->ReleaseSnapshot(snapshot2); + db->ReleaseSnapshot(snapshot4); + db->ReleaseSnapshot(snapshot6); + db->ReleaseSnapshot(snapshot8); + delete iter; + delete txn; +} + +} // namespace rocksdb int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 3ed263039..001c2444d 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -6,6 +6,9 @@ #ifndef ROCKSDB_LITE #include "utilities/transactions/write_unprepared_txn.h" +#include "db/db_impl.h" +#include "util/cast_util.h" +#include "utilities/transactions/write_unprepared_txn_db.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS @@ -13,6 +16,74 @@ namespace rocksdb { +bool WriteUnpreparedTxnReadCallback::IsVisible(SequenceNumber seq) { + auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); + + // Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is + // in unprep_seqs, we have to check if seq is equal to prep_seq or any of + // the prepare_batch_cnt seq nums after it. + // + // TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is + // large. + for (const auto& it : unprep_seqs) { + if (it.first <= seq && seq < it.first + it.second) { + return true; + } + } + + return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); +} + +SequenceNumber WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() { + auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); + if (unprep_seqs.size()) { + return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; + } + + return 0; +} + +WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, + const WriteOptions& write_options, + const TransactionOptions& txn_options) + : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) {} + +Status WriteUnpreparedTxn::Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) { + auto snapshot = options.snapshot; + auto snap_seq = + snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; + SequenceNumber min_uncommitted = 0; // by default disable the optimization + if (snapshot != nullptr) { + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; + } + + WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, + this); + return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value, + &callback); +} + +Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { + return GetIterator(options, wupt_db_->DefaultColumnFamily()); +} + +Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) { + // Make sure to get iterator from WriteUnprepareTxnDB, not the root db. + Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this); + assert(db_iter); + + return write_batch_.NewIteratorWithBase(column_family, db_iter); +} + +const std::map& +WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() { + return unprep_seqs_; +} } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index bc5141437..65eb7ad98 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -8,12 +8,67 @@ #ifndef ROCKSDB_LITE #include "utilities/transactions/write_prepared_txn.h" +#include "utilities/transactions/write_unprepared_txn_db.h" namespace rocksdb { +class WriteUnpreparedTxnDB; +class WriteUnpreparedTxn; + +class WriteUnpreparedTxnReadCallback : public ReadCallback { + public: + WriteUnpreparedTxnReadCallback(WritePreparedTxnDB* db, + SequenceNumber snapshot, + SequenceNumber min_uncommitted, + WriteUnpreparedTxn* txn) + : db_(db), + snapshot_(snapshot), + min_uncommitted_(min_uncommitted), + txn_(txn) {} + + virtual bool IsVisible(SequenceNumber seq) override; + virtual SequenceNumber MaxUnpreparedSequenceNumber() override; + + private: + WritePreparedTxnDB* db_; + SequenceNumber snapshot_; + SequenceNumber min_uncommitted_; + WriteUnpreparedTxn* txn_; +}; + class WriteUnpreparedTxn : public WritePreparedTxn { - using WritePreparedTxn::WritePreparedTxn; + public: + WriteUnpreparedTxn(WriteUnpreparedTxnDB* db, + const WriteOptions& write_options, + const TransactionOptions& txn_options); + + virtual ~WriteUnpreparedTxn() {} + + // Get and GetIterator needs to be overridden so that a ReadCallback to + // handle read-your-own-write is used. + using Transaction::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override; + + using Transaction::GetIterator; + virtual Iterator* GetIterator(const ReadOptions& options) override; + virtual Iterator* GetIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override; + + const std::map& GetUnpreparedSequenceNumbers(); + + private: + friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test; + + WriteUnpreparedTxnDB* wupt_db_; + // Ordered list of unprep_seq sequence numbers that we have already written + // to DB. + // + // This maps unprep_seq => prepare_batch_cnt for each prepared batch written + // by this transactioin. + std::map unprep_seqs_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 036e96d34..913ee3bd4 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -11,12 +11,13 @@ #include "utilities/transactions/write_unprepared_txn_db.h" #include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" namespace rocksdb { Transaction* WriteUnpreparedTxnDB::BeginTransaction( -const WriteOptions& write_options, const TransactionOptions& txn_options, -Transaction* old_txn) { + const WriteOptions& write_options, const TransactionOptions& txn_options, + Transaction* old_txn) { if (old_txn != nullptr) { ReinitializeTransaction(old_txn, write_options, txn_options); return old_txn; @@ -25,5 +26,58 @@ Transaction* old_txn) { } } +// Struct to hold ownership of snapshot and read callback for iterator cleanup. +struct WriteUnpreparedTxnDB::IteratorState { + IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, + std::shared_ptr s, + SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) + : callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {} + + WriteUnpreparedTxnReadCallback callback; + std::shared_ptr snapshot; +}; + +namespace { +static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) { + delete reinterpret_cast(arg1); +} +} // anonymous namespace + +Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family, + WriteUnpreparedTxn* txn) { + // TODO(lth): Refactor so that this logic is shared with WritePrepared. + constexpr bool ALLOW_BLOB = true; + constexpr bool ALLOW_REFRESH = true; + std::shared_ptr own_snapshot = nullptr; + SequenceNumber snapshot_seq; + SequenceNumber min_uncommitted = 0; + if (options.snapshot != nullptr) { + snapshot_seq = options.snapshot->GetSequenceNumber(); + min_uncommitted = + static_cast_with_check( + options.snapshot) + ->min_uncommitted_; + } else { + auto* snapshot = GetSnapshot(); + // We take a snapshot to make sure that the related data in the commit map + // are not deleted. + snapshot_seq = snapshot->GetSequenceNumber(); + min_uncommitted = + static_cast_with_check(snapshot) + ->min_uncommitted_; + own_snapshot = std::make_shared(db_impl_, snapshot); + } + assert(snapshot_seq != kMaxSequenceNumber); + auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* state = + new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn); + auto* db_iter = + db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, + !ALLOW_BLOB, !ALLOW_REFRESH); + db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr); + return db_iter; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index 9bb6ad3d2..10393d59e 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -11,16 +11,26 @@ #endif #include "utilities/transactions/write_prepared_txn_db.h" - #include "utilities/transactions/write_unprepared_txn.h" namespace rocksdb { +class WriteUnpreparedTxn; + class WriteUnpreparedTxnDB : public WritePreparedTxnDB { + public: using WritePreparedTxnDB::WritePreparedTxnDB; Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) override; + + // Struct to hold ownership of snapshot and read callback for cleanup. + struct IteratorState; + + using WritePreparedTxnDB::NewIterator; + Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family, + WriteUnpreparedTxn* txn); }; } // namespace rocksdb