From d50c10ed372bcd2330561c49a855663204a6841e Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 15 Jan 2019 09:13:49 -0800 Subject: [PATCH] WritePrepared: Fix SmallestUnCommittedSeq() doesn't check delayed_prepared (#4867) Summary: When prepared_txns_ heap is empty, SmallestUnCommittedSeq() should check delayed_prepared_ set as well. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4867 Differential Revision: D13632134 Pulled By: maysamyabandeh fbshipit-source-id: b0423bb0a58dc95f1e636d5ed3f6e619df801fb7 --- .../write_prepared_transaction_test.cc | 78 +++++++++++++++++++ .../transactions/write_prepared_txn_db.h | 23 +++--- 2 files changed, 90 insertions(+), 11 deletions(-) diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 6e3895d84..400cd2084 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -27,6 +28,7 @@ #include "rocksdb/utilities/transaction_db.h" #include "table/mock_table.h" #include "util/fault_injection_test_env.h" +#include "util/mutexlock.h" #include "util/random.h" #include "util/string_util.h" #include "util/sync_point.h" @@ -351,6 +353,39 @@ class WritePreparedTransactionTestBase : public TransactionTestBase { : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; protected: + // TODO(mayabndeh): Avoid duplicating PessimisticTransaction::Open logic here. + void DestroyAndReopenWithExtraOptions(size_t snapshot_cache_bits, + size_t commit_cache_bits) { + delete db; + db = nullptr; + DestroyDB(dbname, options); + + options.create_if_missing = true; + ColumnFamilyOptions cf_options(options); + std::vector column_families; + std::vector handles; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + std::vector compaction_enabled_cf_indices; + TransactionDB::PrepareWrap(&options, &column_families, + &compaction_enabled_cf_indices); + DB* base_db = nullptr; + ASSERT_OK(DBImpl::Open(options, dbname, column_families, &handles, &base_db, + true /*use_seq_per_batch*/, + false /*use_batch_for_txn*/)); + + // The following is equivalent of WrapDB(). + txn_db_options.write_policy = WRITE_PREPARED; + auto* wp_db = new WritePreparedTxnDB(base_db, txn_db_options, snapshot_cache_bits, + commit_cache_bits); + wp_db->UpdateCFComparatorMap(handles); + ASSERT_OK(wp_db->Initialize(compaction_enabled_cf_indices, handles)); + + ASSERT_EQ(1, handles.size()); + delete handles[0]; + db = wp_db; + } + // If expect_update is set, check if it actually updated old_commit_map_. If // it did not and yet suggested not to check the next snapshot, do the // opposite to check if it was not a bad suggestion. @@ -2023,6 +2058,49 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { db->ReleaseSnapshot(snapshot2); } +TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 0; // disable commit cache + for (bool has_recent_prepare : {true, false}) { + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + + ASSERT_OK(db->Put(WriteOptions(), "key1", "value1")); + auto* transaction = + db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); + ASSERT_OK(transaction->SetName("txn")); + ASSERT_OK(transaction->Delete("key1")); + ASSERT_OK(transaction->Prepare()); + // snapshot1 should get min_uncommitted from prepared_txns_ heap. + auto snapshot1 = db->GetSnapshot(); + ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_); + // Add a commit to advance max_evicted_seq and move the prepared transaction + // into delayed_prepared_ set. + ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); + Transaction* txn2 = nullptr; + if (has_recent_prepare) { + txn2 = + db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr); + ASSERT_OK(txn2->SetName("txn2")); + ASSERT_OK(txn2->Put("key3", "value3")); + ASSERT_OK(txn2->Prepare()); + } + // snapshot2 should get min_uncommitted from delayed_prepared_ set. + auto snapshot2 = db->GetSnapshot(); + ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_); + ASSERT_OK(transaction->Commit()); + delete transaction; + if (has_recent_prepare) { + ASSERT_OK(txn2->Commit()); + delete txn2; + } + VerifyKeys({{"key1", "NOT_FOUND"}}); + VerifyKeys({{"key1", "value1"}}, snapshot1); + VerifyKeys({{"key1", "value1"}}, snapshot2); + db->ReleaseSnapshot(snapshot1); + db->ReleaseSnapshot(snapshot2); + } +} + // A more complex test to verify compaction/flush should keep keys visible // to snapshots. TEST_P(WritePreparedTransactionTest, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index e892c6320..e77422f70 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -143,6 +143,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { prep_seq, snapshot_seq, 0); return false; } + if (prep_seq < min_uncommitted) { + ROCKS_LOG_DETAILS(info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 + " returns %" PRId32 + " because of min_uncommitted %" PRIu64, + prep_seq, snapshot_seq, 1, min_uncommitted); + return true; + } if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { // We should not normally reach here WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); @@ -158,17 +166,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { return false; } } - // Note: since min_uncommitted does not include the delayed_prepared_ we - // should check delayed_prepared_ first before applying this optimization. - // TODO(myabandeh): include delayed_prepared_ in min_uncommitted - if (prep_seq < min_uncommitted) { - ROCKS_LOG_DETAILS(info_log_, - "IsInSnapshot %" PRIu64 " in %" PRIu64 - " returns %" PRId32 - " because of min_uncommitted %" PRIu64, - prep_seq, snapshot_seq, 1, min_uncommitted); - return true; - } auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; CommitEntry64b dont_care; CommitEntry cached; @@ -512,6 +509,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // reflect any uncommitted data that is not added to prepared_txns_ yet. // Otherwise, if there is no concurrent txn, this value simply reflects that // latest value in the memtable. + if (!delayed_prepared_.empty()) { + assert(!delayed_prepared_empty_.load()); + return *delayed_prepared_.begin(); + } if (prepared_txns_.empty()) { return db_impl_->GetLatestSequenceNumber() + 1; } else {