diff --git a/db/db_impl.cc b/db/db_impl.cc index d7de69d14..d306cc200 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -743,6 +743,10 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { return versions_->LastSequence(); } +SequenceNumber DBImpl::IncAndFetchSequenceNumber() { + return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull; +} + InternalIterator* DBImpl::NewInternalIterator( Arena* arena, RangeDelAggregator* range_del_agg, ColumnFamilyHandle* column_family) { @@ -957,7 +961,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, // super versipon because a flush happening in between may compact // away data for the snapshot, but the snapshot is earlier than the // data overwriting it, so users may see wrong results. - snapshot = versions_->LastSequence(); + snapshot = concurrent_prepare_ && seq_per_batch_ + ? versions_->LastToBeWrittenSequence() + : versions_->LastSequence(); } TEST_SYNC_POINT("DBImpl::GetImpl:3"); TEST_SYNC_POINT("DBImpl::GetImpl:4"); @@ -1048,7 +1054,9 @@ std::vector DBImpl::MultiGet( snapshot = reinterpret_cast( read_options.snapshot)->number_; } else { - snapshot = versions_->LastSequence(); + snapshot = concurrent_prepare_ && seq_per_batch_ + ? versions_->LastToBeWrittenSequence() + : versions_->LastSequence(); } for (auto mgd_iter : multiget_cf_data) { mgd_iter.second->super_version = @@ -1445,6 +1453,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, read_callback); #endif } else { + // Note: no need to consider the special case of concurrent_prepare_ && + // seq_per_batch_ since NewIterator is overridden in WritePreparedTxnDB auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : versions_->LastSequence(); @@ -1560,6 +1570,8 @@ Status DBImpl::NewIterators( } #endif } else { + // Note: no need to consider the special case of concurrent_prepare_ && + // seq_per_batch_ since NewIterators is overridden in WritePreparedTxnDB auto snapshot = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : versions_->LastSequence(); @@ -1593,8 +1605,10 @@ const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { delete s; return nullptr; } - return snapshots_.New(s, versions_->LastSequence(), unix_time, - is_write_conflict_boundary); + auto snapshot_seq = concurrent_prepare_ && seq_per_batch_ + ? versions_->LastToBeWrittenSequence() + : versions_->LastSequence(); + return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); } void DBImpl::ReleaseSnapshot(const Snapshot* s) { diff --git a/db/db_impl.h b/db/db_impl.h index 5992288a5..462b8c9b9 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -219,6 +219,11 @@ class DBImpl : public DB { virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; + virtual SequenceNumber IncAndFetchSequenceNumber(); + // Returns LastToBeWrittenSequence in concurrent_prepare_ && seq_per_batch_ + // mode and LastSequence otherwise. This is useful when visiblility depends + // also on data written to the WAL but not to the memtable. + SequenceNumber TEST_GetLatestVisibleSequenceNumber() const; bool HasActiveSnapshotLaterThanSN(SequenceNumber sn); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index b20a31719..e45fc6fcd 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -209,5 +209,13 @@ int DBImpl::TEST_BGFlushesAllowed() const { return GetBGJobLimits().max_flushes; } +SequenceNumber DBImpl::TEST_GetLatestVisibleSequenceNumber() const { + if (concurrent_prepare_ && seq_per_batch_) { + return versions_->LastToBeWrittenSequence(); + } else { + return versions_->LastSequence(); + } +} + } // namespace rocksdb #endif // NDEBUG diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 3ed27c717..db8031bff 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -580,7 +580,11 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // consecutive, we continue recovery despite corruption. This could // happen when we open and write to a corrupted DB, where sequence id // will start from the last sequence id we recovered. - if (sequence == *next_sequence) { + if (sequence == *next_sequence || + // With seq_per_batch_, if previous run was with concurrent_prepare_ + // then gap in the sequence numbers is expected by the commits + // without prepares. + (seq_per_batch_ && sequence >= *next_sequence)) { stop_replay_for_corruption = false; } if (stop_replay_for_corruption) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index e22c0c4af..349bf1d5d 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -241,13 +241,57 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { } startingSequenceNumber_ = expectedSeq; // currentStatus_ will be set to Ok if reseek succeeds + // Note: this is still ok in seq_pre_batch_ && concurrent_preparep_ mode + // that allows gaps in the WAL since it will still skip over the gap. currentStatus_ = Status::NotFound("Gap in sequence numbers"); - return SeekToStartSequence(currentFileIndex_, true); + // In seq_per_batch mode, gaps in the seq are possible so the strict mode + // should be disabled + return SeekToStartSequence(currentFileIndex_, !options_->seq_per_batch); } + struct BatchCounter : public WriteBatch::Handler { + SequenceNumber sequence_; + BatchCounter(SequenceNumber sequence) : sequence_(sequence) {} + Status MarkNoop(bool empty_batch) override { + if (!empty_batch) { + sequence_++; + } + return Status::OK(); + } + Status MarkEndPrepare(const Slice&) override { + sequence_++; + return Status::OK(); + } + Status MarkCommit(const Slice&) override { + sequence_++; + return Status::OK(); + } + + Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override { + return Status::OK(); + } + Status DeleteCF(uint32_t cf, const Slice& key) override { + return Status::OK(); + } + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return Status::OK(); + } + Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override { + return Status::OK(); + } + Status MarkBeginPrepare() override { return Status::OK(); } + Status MarkRollback(const Slice&) override { return Status::OK(); } + }; + currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get()); - currentLastSeq_ = currentBatchSeq_ + - WriteBatchInternal::Count(batch.get()) - 1; + if (options_->seq_per_batch) { + BatchCounter counter(currentBatchSeq_); + batch->Iterate(&counter); + currentLastSeq_ = counter.sequence_; + } else { + currentLastSeq_ = + currentBatchSeq_ + WriteBatchInternal::Count(batch.get()) - 1; + } // currentBatchSeq_ can only change here assert(currentLastSeq_ <= versions_->LastSequence()); diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 534325292..eed413778 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -737,6 +737,7 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, void WritePreparedTxnDB::AddPrepared(uint64_t seq) { ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Prepareing", seq); + // TODO(myabandeh): Add a runtime check to ensure the following assert. assert(seq > max_evicted_seq_); WriteLock wl(&prepared_mutex_); prepared_txns_.push(seq); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index e0a831090..de3199db8 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -39,15 +39,18 @@ using std::string; namespace rocksdb { -// TODO(myabandeh): Instantiate the tests with concurrent_prepare INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionTest, ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED))); + std::make_tuple(false, true, WRITE_COMMITTED), + std::make_tuple(false, false, WRITE_PREPARED), + std::make_tuple(false, true, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( StackableDBAsBaseDB, TransactionTest, ::testing::Values(std::make_tuple(true, false, WRITE_COMMITTED), - std::make_tuple(true, false, WRITE_PREPARED))); + std::make_tuple(true, true, WRITE_COMMITTED), + std::make_tuple(true, false, WRITE_PREPARED), + std::make_tuple(true, true, WRITE_PREPARED))); INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), @@ -55,7 +58,9 @@ INSTANTIATE_TEST_CASE_P( std::make_tuple(true, false, WRITE_COMMITTED), std::make_tuple(true, true, WRITE_COMMITTED), std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(true, false, WRITE_PREPARED))); + std::make_tuple(false, true, WRITE_PREPARED), + std::make_tuple(true, false, WRITE_PREPARED), + std::make_tuple(true, true, WRITE_PREPARED))); TEST_P(TransactionTest, DoubleEmptyWrite) { WriteOptions write_options; @@ -4750,6 +4755,9 @@ TEST_P(TransactionTest, SeqAdvanceTest) { WriteOptions wopts; FlushOptions fopt; + options.disable_auto_compactions = true; + ReOpen(); + // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some // of the branches. This is the same as counting a binary number where i-th // bit represents whether we take branch i in the represented by the number. @@ -4768,12 +4776,12 @@ TEST_P(TransactionTest, SeqAdvanceTest) { auto seq = db_impl->GetLatestSequenceNumber(); exp_seq = seq; txn_t0(0); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { @@ -4786,16 +4794,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) { // Doing it twice might detect some bugs txn_t0(1); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); txn_t1(0); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { @@ -4807,12 +4815,13 @@ TEST_P(TransactionTest, SeqAdvanceTest) { } txn_t3(0); - // Since commit marker does not write to memtable, the last seq number is - // not updated immediately. But the advance should be visible after the next - // write. + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); + ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { db_impl->FlushWAL(true); @@ -4823,16 +4832,16 @@ TEST_P(TransactionTest, SeqAdvanceTest) { } txn_t0(0); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); txn_t2(0); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); if (branch_do(n, &branch)) { db_impl->Flush(fopt); - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); } if (branch_do(n, &branch)) { diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index d776003dd..6485e4893 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -144,6 +144,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.concurrent_prepare) { + // Consume one seq for commit + exp_seq++; + } } }; std::function txn_t0 = [&](size_t index) { @@ -162,10 +166,13 @@ class TransactionTest : public ::testing::TestWithParam< if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) { // Consume one seq per key exp_seq += 3; - ; } else { // Consume one seq per batch exp_seq++; + if (options.concurrent_prepare) { + // Consume one seq for commit + exp_seq++; + } } ASSERT_OK(s); }; @@ -190,6 +197,10 @@ class TransactionTest : public ::testing::TestWithParam< } else { // Consume one seq per batch exp_seq++; + if (options.concurrent_prepare) { + // Consume one seq for commit + exp_seq++; + } } auto pdb = reinterpret_cast(db); pdb->UnregisterTransaction(txn); diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 37b25450a..f71bbb66f 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -327,11 +327,10 @@ class WritePreparedTransactionTest : public TransactionTest { } }; -// TODO(myabandeh): enable it for concurrent_prepare -INSTANTIATE_TEST_CASE_P(WritePreparedTransactionTest, - WritePreparedTransactionTest, - ::testing::Values(std::make_tuple(false, false, - WRITE_PREPARED))); +INSTANTIATE_TEST_CASE_P( + WritePreparedTransactionTest, WritePreparedTransactionTest, + ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED), + std::make_tuple(false, true, WRITE_PREPARED))); TEST_P(WritePreparedTransactionTest, CommitMapTest) { WritePreparedTxnDB* wp_db = dynamic_cast(db); @@ -595,113 +594,6 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { } } -// TODO(myabandeh): remove this redundant test after transaction_test is enabled -// with WRITE_PREPARED too This test clarifies the existing expectation from the -// sequence number algorithm. It could detect mistakes in updating the code but -// it is not necessarily the one acceptable way. If the algorithm is -// legitimately changed, this unit test should be updated as well. -TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) { - WriteOptions wopts; - FlushOptions fopt; - - options.disable_auto_compactions = true; - ReOpen(); - - // Do the test with NUM_BRANCHES branches in it. Each run of a test takes some - // of the branches. This is the same as counting a binary number where i-th - // bit represents whether we take branch i in the represented by the number. - const size_t NUM_BRANCHES = 8; - // Helper function that shows if the branch is to be taken in the run - // represented by the number n. - auto branch_do = [&](size_t n, size_t* branch) { - assert(*branch < NUM_BRANCHES); - const size_t filter = static_cast(1) << *branch; - return n & filter; - }; - const size_t max_n = static_cast(1) << NUM_BRANCHES; - for (size_t n = 0; n < max_n; n++, ReOpen()) { - DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - size_t branch = 0; - auto seq = db_impl->GetLatestSequenceNumber(); - exp_seq = seq; - txn_t0(0); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - - if (branch_do(n, &branch)) { - db_impl->Flush(fopt); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - } - if (branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); - db_impl = reinterpret_cast(db->GetRootDB()); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - } - - // Doing it twice might detect some bugs - txn_t0(1); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - - txn_t1(0); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - - if (branch_do(n, &branch)) { - db_impl->Flush(fopt); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - } - if (branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); - db_impl = reinterpret_cast(db->GetRootDB()); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - } - - txn_t3(0); - // Since commit marker does not write to memtable, the last seq number is - // not updated immediately. But the advance should be visible after the next - // write. - - if (branch_do(n, &branch)) { - db_impl->Flush(fopt); - } - if (branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); - db_impl = reinterpret_cast(db->GetRootDB()); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - } - - txn_t0(0); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - - txn_t2(0); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - - if (branch_do(n, &branch)) { - db_impl->Flush(fopt); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - } - if (branch_do(n, &branch)) { - db_impl->FlushWAL(true); - ReOpenNoDelete(); - db_impl = reinterpret_cast(db->GetRootDB()); - seq = db_impl->GetLatestSequenceNumber(); - ASSERT_EQ(exp_seq, seq); - } - } -} - TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { // Given the sequential run of txns, with this timeout we should never see a // deadlock nor a timeout unless we have a key conflict, which should be @@ -732,7 +624,7 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { printf("Tested %" PRIu64 " cases so far\n", n); } DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); - auto seq = db_impl->GetLatestSequenceNumber(); + auto seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); exp_seq = seq; // This is increased before writing the batch for commit commit_writes = 0; @@ -800,24 +692,28 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { for (auto& t : threads) { t.join(); } - if (txn_db_options.write_policy == WRITE_PREPARED) { + if (options.concurrent_prepare) { // In this case none of the above scheduling tricks to deterministically // form merged bactches works because the writes go to saparte queues. // This would result in different write groups in each run of the test. We // still keep the test since althgouh non-deterministic and hard to debug, - // it is still useful to have. Since in this case we could finish with - // commit writes that dont write to memtable, the seq is not advanced in - // this code path. It will be after the next write. So we do one more - // write to make the impact of last seq visible. - txn_t0(0); + // it is still useful to have. + // TODO(myabandeh): Add a deterministic unit test for concurrent_prepare } + // Check if memtable inserts advanced seq number as expected - seq = db_impl->GetLatestSequenceNumber(); + seq = db_impl->TEST_GetLatestVisibleSequenceNumber(); ASSERT_EQ(exp_seq, seq); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + // The latest seq might be due to a commit without prepare and hence not + // persisted in the WAL. To make the verification of seq after recovery + // easier we write in a transaction with prepare which makes the latest seq + // to be persisted via the commitmarker. + txn_t3(0); + // Check if recovery preserves the last sequence number db_impl->FlushWAL(true); ReOpenNoDelete(); @@ -1354,11 +1250,16 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { options.write_buffer_size = 1024 * 1024; ReOpen(); std::vector versions; + uint64_t seq = 0; for (uint64_t i = 1; i <= 1024; i++) { std::string v = "bar" + ToString(i); ASSERT_OK(db->Put(WriteOptions(), "foo", v)); VerifyKeys({{"foo", v}}); - KeyVersion kv = {"foo", v, i, kTypeValue}; + seq++; // one for the key/value + KeyVersion kv = {"foo", v, seq, kTypeValue}; + if (options.concurrent_prepare) { + seq++; // one for the commit + } versions.emplace_back(kv); } std::reverse(std::begin(versions), std::end(versions)); @@ -1395,6 +1296,7 @@ TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) { TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { options.disable_auto_compactions = true; ReOpen(); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); // Snapshots to avoid keys get evicted. std::vector snapshots; // Keep track of expected sequence number. @@ -1402,7 +1304,11 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) { auto add_key = [&](std::function func) { ASSERT_OK(func()); - ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + expected_seq++; + if (options.concurrent_prepare) { + expected_seq++; // 1 for commit + } + ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); snapshots.push_back(db->GetSnapshot()); }; @@ -1489,7 +1395,8 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { ASSERT_OK(txn1->Prepare()); ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); ASSERT_OK(txn1->Commit()); - ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); delete txn1; // Take a snapshots to avoid keys get evicted before compaction. const Snapshot* snapshot1 = db->GetSnapshot(); @@ -1502,16 +1409,24 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) { // txn2 commit after snapshot2 and it is not visible. const Snapshot* snapshot2 = db->GetSnapshot(); ASSERT_OK(txn2->Commit()); - ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + ASSERT_EQ(++expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); delete txn2; // Take a snapshots to avoid keys get evicted before compaction. const Snapshot* snapshot3 = db->GetSnapshot(); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); - ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + expected_seq++; // 1 for write SequenceNumber seq1 = expected_seq; + if (options.concurrent_prepare) { + expected_seq++; // 1 for commit + } + ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); - ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); + expected_seq++; // 1 for write SequenceNumber seq2 = expected_seq; + if (options.concurrent_prepare) { + expected_seq++; // 1 for commit + } + ASSERT_EQ(expected_seq, db_impl->TEST_GetLatestVisibleSequenceNumber()); ASSERT_OK(db->Flush(FlushOptions())); db->ReleaseSnapshot(snapshot1); db->ReleaseSnapshot(snapshot3); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index e6d6f4597..16499cc33 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -89,6 +89,14 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() { return CommitBatchInternal(GetWriteBatch()->GetWriteBatch()); } +SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) { + if (db_impl_->immutable_db_options().concurrent_prepare) { + return db_impl_->IncAndFetchSequenceNumber(); + } else { + return prep_seq; + } +} + Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { // TODO(myabandeh): handle the duplicate keys in the batch // In the absence of Prepare markers, use Noop as a batch separator @@ -100,7 +108,7 @@ Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { no_log_ref, !disable_memtable, &seq_used); assert(seq_used != kMaxSequenceNumber); uint64_t& prepare_seq = seq_used; - uint64_t& commit_seq = seq_used; + uint64_t commit_seq = GetACommitSeqNumber(prepare_seq); // TODO(myabandeh): skip AddPrepared wpt_db_->AddPrepared(prepare_seq); wpt_db_->AddCommitted(prepare_seq, commit_seq); @@ -136,7 +144,8 @@ Status WritePreparedTxn::CommitInternal() { // Commit the data that is accompnaied with the commit marker // TODO(myabandeh): skip AddPrepared wpt_db_->AddPrepared(commit_seq); - wpt_db_->AddCommitted(commit_seq, commit_seq); + uint64_t commit_seq_2 = GetACommitSeqNumber(commit_seq); + wpt_db_->AddCommitted(commit_seq, commit_seq_2); } return s; } @@ -216,7 +225,7 @@ Status WritePreparedTxn::RollbackInternal() { no_log_ref, !disable_memtable, &seq_used); assert(seq_used != kMaxSequenceNumber); uint64_t& prepare_seq = seq_used; - uint64_t& commit_seq = seq_used; + uint64_t commit_seq = GetACommitSeqNumber(prepare_seq); // TODO(myabandeh): skip AddPrepared wpt_db_->AddPrepared(prepare_seq); wpt_db_->AddCommitted(prepare_seq, commit_seq); diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 09544b9f1..4dc6945c0 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -45,11 +45,17 @@ class WritePreparedTxn : public PessimisticTransaction { virtual ~WritePreparedTxn() {} + // To make WAL commit markers visible, the snapshot will be based on the last + // seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the + // memtable. using Transaction::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + // To make WAL commit markers visible, the snapshot will be based on the last + // seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq in the + // memtable. using Transaction::GetIterator; virtual Iterator* GetIterator(const ReadOptions& options) override; virtual Iterator* GetIterator(const ReadOptions& options, @@ -58,12 +64,20 @@ class WritePreparedTxn : public PessimisticTransaction { private: friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; + SequenceNumber GetACommitSeqNumber(SequenceNumber prep_seq); + Status PrepareInternal() override; Status CommitWithoutPrepareInternal() override; Status CommitBatchInternal(WriteBatch* batch) override; + // Since the data is already written to memtables at the Prepare phase, the + // commit entails writing only a commit marker in the WAL. The sequence number + // of the commit marker is then the commit timestamp of the transaction. To + // make the commit timestamp visible to readers, their snapshot is based on + // the last seq in the WAL, LastToBeWrittenSquence, as opposed to the last seq + // in the memtable. Status CommitInternal() override; Status RollbackInternal() override;