diff --git a/HISTORY.md b/HISTORY.md index ff6d05d5b..c23e6b4b0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ ### Bug Fixes * Fix a bug in 2PC where a sequence of txn prepare, memtable flush, and crash could result in losing the prepared transaction. * Fix a bug in Encryption Env which could cause encrypted files to be read beyond file boundaries. +* Fix a race condition between WritePrepared::Get and ::Put with duplicate keys. ## 6.1.0 (3/27/2019) ### New Features diff --git a/db/db_impl.cc b/db/db_impl.cc index 97cd2ac79..0900dddba 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1384,33 +1384,25 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, SequenceNumber snapshot; if (read_options.snapshot != nullptr) { - // Note: In WritePrepared txns this is not necessary but not harmful - // either. Because prep_seq > snapshot => commit_seq > snapshot so if - // a snapshot is specified we should be fine with skipping seq numbers - // that are greater than that. - // - // In WriteUnprepared, we cannot set snapshot in the lookup key because we - // may skip uncommitted data that should be visible to the transaction for - // reading own writes. - snapshot = - reinterpret_cast(read_options.snapshot)->number_; if (callback) { - snapshot = std::max(snapshot, callback->max_visible_seq()); + // Already calculated based on read_options.snapshot + snapshot = callback->max_visible_seq(); + } else { + snapshot = + reinterpret_cast(read_options.snapshot)->number_; } } else { - // Since we get and reference the super version before getting - // the snapshot number, without a mutex protection, it is possible - // that a memtable switch happened in the middle and not all the - // data for this snapshot is available. But it will contain all - // the data available in the super version we have, which is also - // a valid snapshot to read from. - // We shouldn't get snapshot before finding and referencing the super - // version because a flush happening in between may compact away data for - // the snapshot, but the snapshot is earlier than the data overwriting it, - // so users may see wrong results. + // Note that the snapshot is assigned AFTER referencing the super + // version because otherwise a flush happening in between may compact away + // data for the snapshot, so the reader would see neither data that was be + // visible to the snapshot before compaction nor the newer data inserted + // afterwards. snapshot = last_seq_same_as_publish_seq_ ? versions_->LastSequence() : versions_->LastPublishedSequence(); + if (callback) { + callback->Refresh(snapshot); + } } TEST_SYNC_POINT("DBImpl::GetImpl:3"); TEST_SYNC_POINT("DBImpl::GetImpl:4"); diff --git a/db/read_callback.h b/db/read_callback.h index 52573be19..60f91ef87 100644 --- a/db/read_callback.h +++ b/db/read_callback.h @@ -39,6 +39,7 @@ class ReadCallback { inline SequenceNumber max_visible_seq() { return max_visible_seq_; } + // Refresh to a more recent visible seq virtual void Refresh(SequenceNumber seq) { max_visible_seq_ = seq; } // Refer to DBIter::CanReseekToSkip diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 732d4c812..79f7f8899 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5331,6 +5331,35 @@ class ThreeBytewiseComparator : public Comparator { } }; +TEST_P(TransactionTest, GetWithoutSnapshot) { + WriteOptions write_options; + std::atomic finish = {false}; + db->Put(write_options, "key", "value"); + rocksdb::port::Thread commit_thread([&]() { + for (int i = 0; i < 100; i++) { + TransactionOptions txn_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName("xid")); + ASSERT_OK(txn->Put("key", "overridedvalue")); + ASSERT_OK(txn->Put("key", "value")); + ASSERT_OK(txn->Prepare()); + ASSERT_OK(txn->Commit()); + delete txn; + } + finish = true; + }); + rocksdb::port::Thread read_thread([&]() { + while (!finish) { + ReadOptions ropt; + PinnableSlice pinnable_val; + ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val)); + ASSERT_TRUE(pinnable_val == ("value")); + } + }); + commit_thread.join(); + read_thread.join(); +} + // Test that the transactional db can handle duplicate keys in the write batch TEST_P(TransactionTest, DuplicateKeys) { ColumnFamilyOptions cf_options; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index c0f5a1068..f2f3f30e2 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1288,6 +1288,13 @@ TEST_P(WritePreparedTransactionTest, TxnInitialize) { // for example the txn does not add the prepared seq for the second sub-batch to // the PreparedHeap structure. TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 1; // disable commit cache + UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits); + ReOpen(); + + ReadOptions ropt; + PinnableSlice pinnable_val; WriteOptions write_options; TransactionOptions txn_options; Transaction* txn0 = db->BeginTransaction(write_options, txn_options); @@ -1296,24 +1303,19 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) { ASSERT_OK(txn0->Put(Slice("key"), Slice("value2"))); ASSERT_OK(txn0->Prepare()); - WritePreparedTxnDB* wp_db = dynamic_cast(db); - // Ensure that all the prepared sequence numbers will be removed from the - // PreparedHeap. - SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE; - wp_db->AdvanceMaxEvictedSeq(0, new_max); + ASSERT_OK(db->Put(write_options, "key2", "value")); + // Will cause max advance due to disabled commit cache + ASSERT_OK(db->Put(write_options, "key3", "value")); - ReadOptions ropt; - PinnableSlice pinnable_val; auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); ASSERT_TRUE(s.IsNotFound()); delete txn0; + WritePreparedTxnDB* wp_db = dynamic_cast(db); wp_db->db_impl_->FlushWAL(true); wp_db->TEST_Crash(); ReOpenNoDelete(); assert(db != nullptr); - wp_db = dynamic_cast(db); - wp_db->AdvanceMaxEvictedSeq(0, new_max); s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); ASSERT_TRUE(s.IsNotFound()); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 4100925c5..6c7cb359d 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -44,23 +44,21 @@ void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { prepare_batch_cnt_ = 0; } -Status WritePreparedTxn::Get(const ReadOptions& read_options, +Status WritePreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val) { - auto snapshot = read_options.snapshot; - auto snap_seq = - snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; - SequenceNumber min_uncommitted = - kMinUnCommittedSeq; // by default disable the optimization - if (snapshot != nullptr) { - min_uncommitted = - static_cast_with_check(snapshot) - ->min_uncommitted_; - } - + SequenceNumber min_uncommitted, snap_seq; + const bool backed_by_snapshot = + wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted); - return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, - pinnable_val, &callback); + auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, + pinnable_val, &callback); + if (LIKELY(wpt_db_->ValidateSnapshot(callback.max_visible_seq(), + backed_by_snapshot))) { + return res; + } else { + return Status::TryAgain(); + } } Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) { diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 5364a9e05..8a7883c05 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -220,24 +220,19 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, Status WritePreparedTxnDB::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { - // We are fine with the latest committed value. This could be done by - // specifying the snapshot as kMaxSequenceNumber. - SequenceNumber seq = kMaxSequenceNumber; - SequenceNumber min_uncommitted = 0; - if (options.snapshot != nullptr) { - seq = options.snapshot->GetSequenceNumber(); - min_uncommitted = static_cast_with_check( - options.snapshot) - ->min_uncommitted_; + SequenceNumber min_uncommitted, snap_seq; + const bool backed_by_snapshot = + AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); + WritePreparedTxnReadCallback callback(this, snap_seq, min_uncommitted); + bool* dont_care = nullptr; + auto res = db_impl_->GetImpl(options, column_family, key, value, dont_care, + &callback); + if (LIKELY( + ValidateSnapshot(callback.max_visible_seq(), backed_by_snapshot))) { + return res; } else { - min_uncommitted = SmallestUnCommittedSeq(); + return Status::TryAgain(); } - WritePreparedTxnReadCallback callback(this, seq, min_uncommitted); - bool* dont_care = nullptr; - // Note: no need to specify a snapshot for read options as no specific - // snapshot is requested by the user. - return db_impl_->GetImpl(options, column_family, key, value, dont_care, - &callback); } void WritePreparedTxnDB::UpdateCFComparatorMap( diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 10d1dbf60..25b9b9a1b 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -25,6 +25,7 @@ #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction_db.h" +#include "util/cast_util.h" #include "util/set_comparator.h" #include "util/string_util.h" #include "utilities/transactions/pessimistic_transaction.h" @@ -445,6 +446,20 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { protected: virtual Status VerifyCFOptions( const ColumnFamilyOptions& cf_options) override; + // Assign the min and max sequence numbers for reading from the db. A seq > + // max is not valid, and a seq < min is valid, and a min <= seq < max requires + // further checkings. Normally max is defined by the snapshot and min is by + // minimum uncommitted seq. + inline bool AssignMinMaxSeqs(const Snapshot* snapshot, SequenceNumber* min, + SequenceNumber* max); + // Validate is a snapshot sequence number is still valid based on the latest + // db status. backed_by_snapshot specifies if the number is baked by an actual + // snapshot object. order specified the memory order with which we load the + // atomic variables: relax is enough for the default since we care about last + // value seen by same thread. + inline bool ValidateSnapshot( + const SequenceNumber snap_seq, const bool backed_by_snapshot, + std::memory_order order = std::memory_order_relaxed); private: friend class PreparedHeap_BasicsTest_Test; @@ -479,6 +494,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_NonAtomicUpdateOfMaxEvictedSeq_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; + friend class WriteUnpreparedTxn; friend class WriteUnpreparedTxnDB; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; @@ -953,5 +969,38 @@ struct SubBatchCounter : public WriteBatch::Handler { bool WriteAfterCommit() const override { return false; } }; +bool WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, + SequenceNumber* min, + SequenceNumber* max) { + if (snapshot != nullptr) { + *min = static_cast_with_check(snapshot) + ->min_uncommitted_; + *max = static_cast_with_check(snapshot) + ->number_; + return true; + } else { + *min = SmallestUnCommittedSeq(); + *max = 0; // to be assigned later after sv is referenced. + return false; + } +} + +bool WritePreparedTxnDB::ValidateSnapshot(const SequenceNumber snap_seq, + const bool backed_by_snapshot, + std::memory_order order) { + if (backed_by_snapshot) { + return true; + } else { + SequenceNumber max = max_evicted_seq_.load(order); + // Validate that max has not advanced the snapshot seq that is not backed + // by a real snapshot. This is a very rare case that should not happen in + // real workloads. + if (UNLIKELY(snap_seq <= max && snap_seq != 0)) { + return false; + } + } + return true; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 731460eda..823b12ea1 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -462,21 +462,18 @@ Status WriteUnpreparedTxn::RollbackInternal() { Status WriteUnpreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { - auto snapshot = options.snapshot; - auto snap_seq = - snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; - SequenceNumber min_uncommitted = - kMinUnCommittedSeq; // by default disable the optimization - if (snapshot != nullptr) { - min_uncommitted = - static_cast_with_check(snapshot) - ->min_uncommitted_; - } - + SequenceNumber min_uncommitted, snap_seq; + const bool backed_by_snapshot = + wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq); WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted, this); - return write_batch_.GetFromBatchAndDB(db_, options, column_family, key, value, - &callback); + auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key, + value, &callback); + if (LIKELY(wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) { + return res; + } else { + return Status::TryAgain(); + } } Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 68567eb6e..751d36c23 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -40,13 +40,15 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { // behind reseek optimizations are no longer valid. } - // TODO(myabandeh): override Refresh when Iterator::Refresh is supported + void Refresh(SequenceNumber seq) override { + max_visible_seq_ = std::max(max_visible_seq_, seq); + wup_snapshot_ = seq; + } + private: static SequenceNumber CalcMaxVisibleSeq(WriteUnpreparedTxn* txn, SequenceNumber snapshot_seq) { SequenceNumber max_unprepared = CalcMaxUnpreparedSequenceNumber(txn); - assert(snapshot_seq < max_unprepared || max_unprepared == 0 || - snapshot_seq == kMaxSequenceNumber); return std::max(max_unprepared, snapshot_seq); } static SequenceNumber CalcMaxUnpreparedSequenceNumber( diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 55ca2b3ea..4fcbfbc37 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -31,23 +31,18 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( class InvalidSnapshotReadCallback : public ReadCallback { public: - InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) - : ReadCallback(snapshot), db_(db) {} - - // Will be called to see if the seq number visible; if not it moves on to - // the next seq number. - inline bool IsVisibleFullCheck(SequenceNumber seq) override { - // Becomes true if it cannot tell by comparing seq with snapshot seq since - // the snapshot is not a real snapshot. - auto snapshot = max_visible_seq_; - bool released = false; - auto ret = db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &released); - assert(!released || ret); - return ret; + InvalidSnapshotReadCallback(SequenceNumber snapshot) + : ReadCallback(snapshot) {} + + inline bool IsVisibleFullCheck(SequenceNumber) override { + // The seq provided as snapshot is the seq right before we have locked and + // wrote to it, so whatever is there, it is committed. + return true; } - private: - WritePreparedTxnDB* db_; + // Ignore the refresh request since we are confident that our snapshot seq + // is not going to be affected by concurrent compactions (not enabled yet.) + void Refresh(SequenceNumber) override {} }; // Iterate starting with largest sequence number. @@ -67,13 +62,12 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( std::map keys_; bool rollback_merge_operands_; RollbackWriteBatchBuilder( - DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, - WriteBatch* dst_batch, + DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch, std::map& comparators, std::map& handles, bool rollback_merge_operands) : db_(db), - callback(wpt_db, snap_seq), + callback(snap_seq), // disable min_uncommitted optimization rollback_batch_(dst_batch), comparators_(comparators), @@ -149,7 +143,7 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( Status MarkRollback(const Slice&) override { return Status::InvalidArgument(); } - } rollback_handler(db_impl_, this, last_visible_txn, &rollback_batch, + } rollback_handler(db_impl_, last_visible_txn, &rollback_batch, *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), txn_db_options_.rollback_merge_operands); @@ -311,12 +305,7 @@ Status WriteUnpreparedTxnDB::Initialize( db_impl_->versions_->SetLastPublishedSequence(last_seq + 1); } - // Compaction should start only after max_evicted_seq_ is set. - Status s = EnableAutoCompaction(compaction_enabled_cf_handles); - if (!s.ok()) { - return s; - } - + Status s; // Rollback unprepared transactions. for (auto rtxn : rtxns) { auto recovered_trx = rtxn.second; @@ -331,6 +320,10 @@ Status WriteUnpreparedTxnDB::Initialize( if (s.ok()) { dbimpl->DeleteAllRecoveredTransactions(); + + // Compaction should start only after max_evicted_seq_ is set AND recovered + // transactions are either added to PrepareHeap or rolled back. + s = EnableAutoCompaction(compaction_enabled_cf_handles); } return s;