diff --git a/db/db_impl.h b/db/db_impl.h index 3d281cbb7..d0123e9b7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -790,6 +790,8 @@ class DBImpl : public DB { friend class WritePreparedTxnDB; friend class WriteBatchWithIndex; friend class WriteUnpreparedTxnDB; + friend class WriteUnpreparedTxn; + #ifndef ROCKSDB_LITE friend class ForwardIterator; #endif @@ -801,7 +803,7 @@ class DBImpl : public DB { friend class WriteCallbackTest_WriteWithCallbackTest_Test; friend class XFTransactionWriteHandler; friend class DBBlobIndexTest; - friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test; + friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; #endif struct CompactionState; diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 788ffb9c5..f171d90d4 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -311,7 +311,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, - 0 /*recovery_log_number*/, this, parallel, seq_per_batch_); + 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, + batch_per_txn_); } else { SequenceNumber next_sequence = current_sequence; // Note: the logic for advancing seq here must be consistent with the @@ -346,7 +347,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, &w, w.sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, seq_per_batch_, - w.batch_cnt); + w.batch_cnt, batch_per_txn_); } } if (seq_used != nullptr) { @@ -508,7 +509,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, memtable_write_group.status = WriteBatchInternal::InsertInto( memtable_write_group, w.sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, - 0 /*log_number*/, this, seq_per_batch_); + 0 /*log_number*/, this, false /*concurrent_memtable_writes*/, + seq_per_batch_, batch_per_txn_); versions_->SetLastSequence(memtable_write_group.last_sequence); write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); } diff --git a/db/write_batch.cc b/db/write_batch.cc index aa78d45cd..295fba22e 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -727,6 +727,11 @@ Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid, ContentFlags::HAS_END_PREPARE | ContentFlags::HAS_BEGIN_PREPARE, std::memory_order_relaxed); + if (unprepared_batch) { + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_BEGIN_UNPREPARE, + std::memory_order_relaxed); + } return Status::OK(); } diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index ba2029674..d25b9513b 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -231,6 +231,7 @@ class WriteBatchWithIndex : public WriteBatchBase { Status PopSavePoint() override; void SetMaxBytes(size_t max_bytes) override; + size_t GetDataSize() const; private: friend class PessimisticTransactionDB; diff --git a/util/transaction_test_util.cc b/util/transaction_test_util.cc index 3f8eea6de..19d27b1a1 100644 --- a/util/transaction_test_util.cc +++ b/util/transaction_test_util.cc @@ -47,6 +47,14 @@ RandomTransactionInserter::~RandomTransactionInserter() { bool RandomTransactionInserter::TransactionDBInsert( TransactionDB* db, const TransactionOptions& txn_options) { txn_ = db->BeginTransaction(write_options_, txn_options, txn_); + + std::hash hasher; + char name[64]; + snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%d", + hasher(std::this_thread::get_id()), txn_id_++); + assert(strlen(name) < 64 - 1); + txn_->SetName(name); + bool take_snapshot = rand_->OneIn(2); if (take_snapshot) { txn_->SetSnapshot(); @@ -173,14 +181,8 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, if (s.ok()) { if (txn != nullptr) { - std::hash hasher; - char name[64]; - snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%d", hasher(std::this_thread::get_id()), - txn_id_++); - assert(strlen(name) < 64 - 1); if (!is_optimistic && !rand_->OneIn(10)) { // also try commit without prpare - txn->SetName(name); s = txn->Prepare(); assert(s.ok()); } diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index befa19f04..4e8fa14b2 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -191,14 +191,22 @@ Status PessimisticTransaction::Prepare() { } if (can_prepare) { + bool wal_already_marked = false; txn_state_.store(AWAITING_PREPARE); // transaction can't expire after preparation expiration_time_ = 0; + if (log_number_ > 0) { + assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); + wal_already_marked = true; + } + s = PrepareInternal(); if (s.ok()) { assert(log_number_ != 0); - dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( - log_number_); + if (!wal_already_marked) { + dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( + log_number_); + } txn_state_.store(PREPARED); } } else if (txn_state_ == LOCKS_STOLEN) { @@ -264,7 +272,14 @@ Status PessimisticTransaction::Commit() { "Commit-time batch contains values that will not be committed."); } else { txn_state_.store(AWAITING_COMMIT); + if (log_number_ > 0) { + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); + } s = CommitWithoutPrepareInternal(); + if (!name_.empty()) { + txn_db_impl_->UnregisterTransaction(this); + } Clear(); if (s.ok()) { txn_state_.store(COMMITED); @@ -349,6 +364,16 @@ Status PessimisticTransaction::Rollback() { txn_state_.store(ROLLEDBACK); } } else if (txn_state_ == STARTED) { + if (log_number_ > 0) { + assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); + assert(GetId() > 0); + s = RollbackInternal(); + + if (s.ok()) { + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); + } + } // prepare couldn't have taken place Clear(); } else if (txn_state_ == COMMITED) { diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 0434a69db..77b5640d8 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -130,7 +130,7 @@ class PessimisticTransaction : public TransactionBaseImpl { virtual Status RollbackInternal() = 0; - void Initialize(const TransactionOptions& txn_options); + virtual void Initialize(const TransactionOptions& txn_options); Status LockBatch(WriteBatch* batch, TransactionKeyMap* keys_to_unlock); diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 76032699d..e80b28852 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -136,13 +136,15 @@ class PessimisticTransactionDB : public TransactionDB { private: friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; + friend class WriteUnpreparedTxn; friend class TransactionTest_DoubleEmptyWrite_Test; friend class TransactionTest_DuplicateKeys_Test; friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; - friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test; + friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; + friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; TransactionLockMgr lock_mgr_; // Must be held when adding/dropping column families. diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 96eb93708..89118e0b1 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4950,8 +4950,16 @@ TEST_P(TransactionTest, MemoryLimitTest) { ASSERT_EQ(2, txn->GetNumPuts()); s = txn->Put(Slice("b"), Slice("....")); - ASSERT_TRUE(s.IsMemoryLimit()); - ASSERT_EQ(2, txn->GetNumPuts()); + auto pdb = reinterpret_cast(db); + // For write unprepared, write batches exceeding max_write_batch_size will + // just flush to DB instead of returning a memory limit error. + if (pdb->GetTxnDBOptions().write_policy != WRITE_UNPREPARED) { + ASSERT_TRUE(s.IsMemoryLimit()); + ASSERT_EQ(2, txn->GetNumPuts()); + } else { + ASSERT_OK(s); + ASSERT_EQ(3, txn->GetNumPuts()); + } txn->Rollback(); delete txn; @@ -5285,10 +5293,6 @@ TEST_P(TransactionTest, DuplicateKeys) { s = txn0->Commit(); ASSERT_OK(s); } - if (!do_prepare && !do_rollback) { - auto pdb = reinterpret_cast(db); - pdb->UnregisterTransaction(txn0); - } delete txn0; ReadOptions ropt; PinnableSlice pinnable_val; diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index a8c3093ec..ea8b1717e 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -272,8 +272,6 @@ class TransactionTestBase : public ::testing::Test { exp_seq++; } } - auto pdb = reinterpret_cast(db); - pdb->UnregisterTransaction(txn); delete txn; }; std::function txn_t3 = [&](size_t index) { @@ -387,12 +385,6 @@ class TransactionTestBase : public ::testing::Test { ASSERT_OK(txn->Prepare()); } ASSERT_OK(txn->Commit()); - if (type == 2) { - auto pdb = reinterpret_cast(db); - // TODO(myabandeh): this is counter-intuitive. The destructor should - // also do the unregistering. - pdb->UnregisterTransaction(txn); - } delete txn; break; default: diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 5bf5835b1..cb20d1439 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -34,6 +34,11 @@ WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, : PessimisticTransaction(txn_db, write_options, txn_options), wpt_db_(txn_db) {} +void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) { + PessimisticTransaction::Initialize(txn_options); + prepare_batch_cnt_ = 0; +} + Status WritePreparedTxn::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val) { diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 4016241c9..46c114c74 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -64,6 +64,7 @@ class WritePreparedTxn : public PessimisticTransaction { virtual void SetSnapshot() override; protected: + void Initialize(const TransactionOptions& txn_options) override; // Override the protected SetId to make it visible to the friend class // WritePreparedTxnDB inline void SetId(uint64_t id) override { Transaction::SetId(id); } @@ -72,6 +73,7 @@ class WritePreparedTxn : public PessimisticTransaction { friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTxnDB; friend class WriteUnpreparedTxnDB; + friend class WriteUnpreparedTxn; Status PrepareInternal() override; diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index e91f68be2..ec76e2716 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -384,7 +384,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; friend class WriteUnpreparedTxnDB; - friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test; + friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; void Init(const TransactionDBOptions& /* unused */); diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 542ab9cd9..8ec7f6bfb 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -179,6 +179,11 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { iter->Prev(); verify_state(iter, "a", "v7"); + // Since the unprep_seqs_ data were faked for testing, we do not want the + // destructor for the transaction to be rolling back data that did not + // exist. + wup_txn->unprep_seqs_.clear(); + db->ReleaseSnapshot(snapshot0); db->ReleaseSnapshot(snapshot2); db->ReleaseSnapshot(snapshot4); @@ -188,108 +193,235 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { delete txn; } -TEST_P(WriteUnpreparedTransactionTest, RecoveryRollbackUnprepared) { +// This tests how write unprepared behaves during recovery when the DB crashes +// after a transaction has either been unprepared or prepared, and tests if +// the changes are correctly applied for prepared transactions if we decide to +// rollback/commit. +TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) { WriteOptions write_options; write_options.disableWAL = false; - uint64_t seq_used = kMaxSequenceNumber; - uint64_t log_number; - WriteBatch batch; + TransactionOptions txn_options; std::vector prepared_trans; WriteUnpreparedTxnDB* wup_db; options.disable_auto_compactions = true; - // Try unprepared batches with empty database. - for (int num_batches = 0; num_batches < 10; num_batches++) { - // Reset database. - prepared_trans.clear(); - ReOpen(); - wup_db = dynamic_cast(db); - - // Write num_batches unprepared batches into the WAL. - for (int i = 0; i < num_batches; i++) { - batch.Clear(); - // TODO(lth): Instead of manually calling WriteImpl with a write batch, - // use methods on Transaction instead once it is implemented. - ASSERT_OK(WriteBatchInternal::InsertNoop(&batch)); - ASSERT_OK(WriteBatchInternal::Put(&batch, - db->DefaultColumnFamily()->GetID(), - "k" + ToString(i), "value")); - // MarkEndPrepare will change the Noop marker into an unprepared marker. - ASSERT_OK(WriteBatchInternal::MarkEndPrepare( - &batch, Slice("xid1"), /* write after commit */ false, - /* unprepared batch */ true)); - ASSERT_OK(wup_db->db_impl_->WriteImpl( - write_options, &batch, /*callback*/ nullptr, &log_number, - /*log ref*/ 0, /* disable memtable */ true, &seq_used, - /* prepare_batch_cnt_ */ 1)); + enum Action { UNPREPARED, ROLLBACK, COMMIT }; + + // batch_size of 1 causes writes to DB for every marker. + for (size_t batch_size : {1, 1000000}) { + txn_options.max_write_batch_size = batch_size; + for (bool empty : {true, false}) { + for (Action a : {UNPREPARED, ROLLBACK, COMMIT}) { + for (int num_batches = 1; num_batches < 10; num_batches++) { + // Reset database. + prepared_trans.clear(); + ReOpen(); + wup_db = dynamic_cast(db); + if (!empty) { + for (int i = 0; i < num_batches; i++) { + ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i), + "before value" + ToString(i))); + } + } + + // Write num_batches unprepared batches. + Transaction* txn = db->BeginTransaction(write_options, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast(txn); + txn->SetName("xid"); + for (int i = 0; i < num_batches; i++) { + ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i))); + if (txn_options.max_write_batch_size == 1) { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); + } else { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); + } + } + if (a == UNPREPARED) { + // This is done to prevent the destructor from rolling back the + // transaction for us, since we want to pretend we crashed and + // test that recovery does the rollback. + wup_txn->unprep_seqs_.clear(); + } else { + txn->Prepare(); + } + delete txn; + + // Crash and run recovery code paths. + wup_db->db_impl_->FlushWAL(true); + wup_db->TEST_Crash(); + ReOpenNoDelete(); + wup_db = dynamic_cast(db); + + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), a == UNPREPARED ? 0 : 1); + if (a == ROLLBACK) { + ASSERT_OK(prepared_trans[0]->Rollback()); + delete prepared_trans[0]; + } else if (a == COMMIT) { + ASSERT_OK(prepared_trans[0]->Commit()); + delete prepared_trans[0]; + } + + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + // Check that DB has before values. + if (!empty || a == COMMIT) { + for (int i = 0; i < num_batches; i++) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); + if (a == COMMIT) { + ASSERT_EQ(iter->value().ToString(), "value" + ToString(i)); + } else { + ASSERT_EQ(iter->value().ToString(), + "before value" + ToString(i)); + } + iter->Next(); + } + } + ASSERT_FALSE(iter->Valid()); + delete iter; + } + } } - - // Crash and run recovery code paths. - wup_db->db_impl_->FlushWAL(true); - wup_db->TEST_Crash(); - ReOpenNoDelete(); - wup_db = dynamic_cast(db); - - db->GetAllPreparedTransactions(&prepared_trans); - ASSERT_EQ(prepared_trans.size(), 0); - - // Check that DB is empty. - Iterator* iter = db->NewIterator(ReadOptions()); - iter->SeekToFirst(); - ASSERT_FALSE(iter->Valid()); - delete iter; } +} - // Try unprepared batches with non-empty database. - for (int num_batches = 1; num_batches < 10; num_batches++) { - // Reset database. - prepared_trans.clear(); - ReOpen(); - wup_db = dynamic_cast(db); - for (int i = 0; i < num_batches; i++) { - ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i), - "before value " + ToString(i))); - } - - // Write num_batches unprepared batches into the WAL. - for (int i = 0; i < num_batches; i++) { - batch.Clear(); - // TODO(lth): Instead of manually calling WriteImpl with a write batch, - // use methods on Transaction instead once it is implemented. - ASSERT_OK(WriteBatchInternal::InsertNoop(&batch)); - ASSERT_OK(WriteBatchInternal::Put(&batch, - db->DefaultColumnFamily()->GetID(), - "k" + ToString(i), "value")); - // MarkEndPrepare will change the Noop marker into an unprepared marker. - ASSERT_OK(WriteBatchInternal::MarkEndPrepare( - &batch, Slice("xid1"), /* write after commit */ false, - /* unprepared batch */ true)); - ASSERT_OK(wup_db->db_impl_->WriteImpl( - write_options, &batch, /*callback*/ nullptr, &log_number, - /*log ref*/ 0, /* disable memtable */ true, &seq_used, - /* prepare_batch_cnt_ */ 1)); +// Basic test to see that unprepared batch gets written to DB when batch size +// is exceeded. It also does some basic checks to see if commit/rollback works +// as expected for write unprepared. +TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) { + WriteOptions write_options; + TransactionOptions txn_options; + const int kNumKeys = 10; + + // batch_size of 1 causes writes to DB for every marker. + for (size_t batch_size : {1, 1000000}) { + txn_options.max_write_batch_size = batch_size; + for (bool prepare : {false, true}) { + for (bool commit : {false, true}) { + ReOpen(); + Transaction* txn = db->BeginTransaction(write_options, txn_options); + WriteUnpreparedTxn* wup_txn = dynamic_cast(txn); + txn->SetName("xid"); + + for (int i = 0; i < kNumKeys; i++) { + txn->Put("k" + ToString(i), "v" + ToString(i)); + if (txn_options.max_write_batch_size == 1) { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), i + 1); + } else { + ASSERT_EQ(wup_txn->GetUnpreparedSequenceNumbers().size(), 0); + } + } + + if (prepare) { + ASSERT_OK(txn->Prepare()); + } + + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + assert(!iter->Valid()); + ASSERT_FALSE(iter->Valid()); + delete iter; + + if (commit) { + ASSERT_OK(txn->Commit()); + } else { + ASSERT_OK(txn->Rollback()); + } + delete txn; + + iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + + for (int i = 0; i < (commit ? kNumKeys : 0); i++) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); + ASSERT_EQ(iter->value().ToString(), "v" + ToString(i)); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + delete iter; + } } + } +} - // Crash and run recovery code paths. - wup_db->db_impl_->FlushWAL(true); - wup_db->TEST_Crash(); - ReOpenNoDelete(); - wup_db = dynamic_cast(db); - - db->GetAllPreparedTransactions(&prepared_trans); - ASSERT_EQ(prepared_trans.size(), 0); - - // Check that DB has before values. - Iterator* iter = db->NewIterator(ReadOptions()); - iter->SeekToFirst(); - for (int i = 0; i < num_batches; i++) { - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); - ASSERT_EQ(iter->value().ToString(), "before value " + ToString(i)); - iter->Next(); +// Test whether logs containing unprepared/prepared batches are kept even +// after memtable finishes flushing, and whether they are removed when +// transaction commits/aborts. +// +// TODO(lth): Merge with TransactionTest/TwoPhaseLogRollingTest tests. +TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) { + WriteOptions write_options; + TransactionOptions txn_options; + // batch_size of 1 causes writes to DB for every marker. + txn_options.max_write_batch_size = 1; + const int kNumKeys = 10; + + WriteOptions wopts; + wopts.sync = true; + + for (bool prepare : {false, true}) { + for (bool commit : {false, true}) { + ReOpen(); + auto wup_db = dynamic_cast(db); + auto db_impl = wup_db->db_impl_; + + Transaction* txn1 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn1->SetName("xid1")); + + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn2->SetName("xid2")); + + // Spread this transaction across multiple log files. + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(txn1->Put("k1" + ToString(i), "v" + ToString(i))); + if (i >= kNumKeys / 2) { + ASSERT_OK(txn2->Put("k2" + ToString(i), "v" + ToString(i))); + } + + if (i > 0) { + db_impl->TEST_SwitchWAL(); + } + } + + ASSERT_GT(txn1->GetLogNumber(), 0); + ASSERT_GT(txn2->GetLogNumber(), 0); + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + ASSERT_GT(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + + if (prepare) { + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn2->Prepare()); + } + + ASSERT_GE(db_impl->TEST_LogfileNumber(), txn1->GetLogNumber()); + ASSERT_GE(db_impl->TEST_LogfileNumber(), txn2->GetLogNumber()); + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn1->GetLogNumber()); + if (commit) { + ASSERT_OK(txn1->Commit()); + } else { + ASSERT_OK(txn1->Rollback()); + } + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), + txn2->GetLogNumber()); + + if (commit) { + ASSERT_OK(txn2->Commit()); + } else { + ASSERT_OK(txn2->Rollback()); + } + + ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0); + + delete txn1; + delete txn2; } - ASSERT_FALSE(iter->Valid()); - delete iter; } } diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 001c2444d..d4efe8ff9 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -46,7 +46,436 @@ SequenceNumber WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() { WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db, const WriteOptions& write_options, const TransactionOptions& txn_options) - : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) {} + : WritePreparedTxn(txn_db, write_options, txn_options), wupt_db_(txn_db) { + max_write_batch_size_ = txn_options.max_write_batch_size; + // We set max bytes to zero so that we don't get a memory limit error. + // Instead of trying to keep write batch strictly under the size limit, we + // just flush to DB when the limit is exceeded in write unprepared, to avoid + // having retry logic. This also allows very big key-value pairs that exceed + // max bytes to succeed. + write_batch_.SetMaxBytes(0); +} + +WriteUnpreparedTxn::~WriteUnpreparedTxn() { + if (!unprep_seqs_.empty()) { + assert(log_number_ > 0); + assert(GetId() > 0); + assert(!name_.empty()); + + // We should rollback regardless of GetState, but some unit tests that + // test crash recovery run the destructor assuming that rollback does not + // happen, so that rollback during recovery can be exercised. + if (GetState() == STARTED) { + auto s __attribute__((__unused__)) = RollbackInternal(); + // TODO(lth): Better error handling. + assert(s.ok()); + dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed( + log_number_); + } + } +} + +void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) { + PessimisticTransaction::Initialize(txn_options); + max_write_batch_size_ = txn_options.max_write_batch_size; + write_batch_.SetMaxBytes(0); + unprep_seqs_.clear(); + write_set_keys_.clear(); +} + +Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Put(column_family, key, value); +} + +Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Put(column_family, key, value); +} + +Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Merge(column_family, key, value); +} + +Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, + const Slice& key) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Delete(column_family, key); +} + +Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::Delete(column_family, key); +} + +Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::SingleDelete(column_family, key); +} + +Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + Status s = MaybeFlushWriteBatchToDB(); + if (!s.ok()) { + return s; + } + return TransactionBaseImpl::SingleDelete(column_family, key); +} + +Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { + const bool kPrepared = true; + Status s; + + bool needs_mark = (log_number_ == 0); + + if (max_write_batch_size_ != 0 && + write_batch_.GetDataSize() > max_write_batch_size_) { + assert(GetState() != PREPARED); + s = FlushWriteBatchToDB(!kPrepared); + if (s.ok()) { + assert(log_number_ > 0); + // This is done to prevent WAL files after log_number_ from being + // deleted, because they could potentially contain unprepared batches. + if (needs_mark) { + dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( + log_number_); + } + } + } + return s; +} + +void WriteUnpreparedTxn::UpdateWriteKeySet(uint32_t cfid, const Slice& key) { + // TODO(lth): write_set_keys_ can just be a std::string instead of a vector. + write_set_keys_[cfid].push_back(key.ToString()); +} + +Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { + if (name_.empty()) { + return Status::InvalidArgument("Cannot write to DB without SetName."); + } + + // Update write_key_set_ for rollback purposes. + KeySetBuilder keyset_handler( + this, wupt_db_->txn_db_options_.rollback_merge_operands); + auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&keyset_handler); + assert(s.ok()); + if (!s.ok()) { + return s; + } + + // TODO(lth): Reduce duplicate code with WritePrepared prepare logic. + WriteOptions write_options = write_options_; + write_options.disableWAL = false; + const bool WRITE_AFTER_COMMIT = true; + // MarkEndPrepare will change Noop marker to the appropriate marker. + WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, + !WRITE_AFTER_COMMIT, !prepared); + // For each duplicate key we account for a new sub-batch + prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt(); + // AddPrepared better to be called in the pre-release callback otherwise there + // is a non-zero chance of max advancing prepare_seq and readers assume the + // data as committed. + // Also having it in the PreReleaseCallback allows in-order addition of + // prepared entries to PrepareHeap and hence enables an optimization. Refer to + // SmallestUnCommittedSeq for more details. + AddPreparedCallback add_prepared_callback( + wpt_db_, prepare_batch_cnt_, + db_impl_->immutable_db_options().two_write_queues); + const bool DISABLE_MEMTABLE = true; + uint64_t seq_used = kMaxSequenceNumber; + // log_number_ should refer to the oldest log containing uncommitted data + // from the current transaction. This means that if log_number_ is set, + // WriteImpl should not overwrite that value, so set log_used to nullptr if + // log_number_ is already set. + uint64_t* log_used = log_number_ ? nullptr : &log_number_; + s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), + /*callback*/ nullptr, log_used, /*log ref*/ + 0, !DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, + &add_prepared_callback); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + auto prepare_seq = seq_used; + + // Only call SetId if it hasn't been set yet. + if (GetId() == 0) { + SetId(prepare_seq); + } + // unprep_seqs_ will also contain prepared seqnos since they are treated in + // the same way in the prepare/commit callbacks. See the comment on the + // definition of unprep_seqs_. + unprep_seqs_[prepare_seq] = prepare_batch_cnt_; + + // Reset transaction state. + if (!prepared) { + prepare_batch_cnt_ = 0; + write_batch_.Clear(); + WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch()); + } + + return s; +} + +Status WriteUnpreparedTxn::PrepareInternal() { + const bool kPrepared = true; + return FlushWriteBatchToDB(kPrepared); +} + +Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() { + if (unprep_seqs_.empty()) { + assert(log_number_ == 0); + assert(GetId() == 0); + return WritePreparedTxn::CommitWithoutPrepareInternal(); + } + + // TODO(lth): We should optimize commit without prepare to not perform + // a prepare under the hood. + auto s = PrepareInternal(); + if (!s.ok()) { + return s; + } + return CommitInternal(); +} + +Status WriteUnpreparedTxn::CommitInternal() { + // TODO(lth): Reduce duplicate code with WritePrepared commit logic. + + // We take the commit-time batch and append the Commit marker. The Memtable + // will ignore the Commit marker in non-recovery mode + WriteBatch* working_batch = GetCommitTimeWriteBatch(); + const bool empty = working_batch->Count() == 0; + WriteBatchInternal::MarkCommit(working_batch, name_); + + const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_; + if (!empty && for_recovery) { + // When not writing to memtable, we can still cache the latest write batch. + // The cached batch will be written to memtable in WriteRecoverableState + // during FlushMemTable + WriteBatchInternal::SetAsLastestPersistentState(working_batch); + } + + const bool includes_data = !empty && !for_recovery; + size_t commit_batch_cnt = 0; + if (UNLIKELY(includes_data)) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Duplicate key overhead"); + SubBatchCounter counter(*wpt_db_->GetCFComparatorMap()); + auto s = working_batch->Iterate(&counter); + assert(s.ok()); + commit_batch_cnt = counter.BatchCount(); + } + const bool disable_memtable = !includes_data; + const bool do_one_write = + !db_impl_->immutable_db_options().two_write_queues || disable_memtable; + const bool publish_seq = do_one_write; + // Note: CommitTimeWriteBatch does not need AddPrepared since it is written to + // DB in one shot. min_uncommitted still works since it requires capturing + // data that is written to DB but not yet committed, while + // CommitTimeWriteBatch commits with PreReleaseCallback. + WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt, publish_seq); + uint64_t seq_used = kMaxSequenceNumber; + // Since the prepared batch is directly written to memtable, there is already + // a connection between the memtable and its WAL, so there is no need to + // redundantly reference the log that contains the prepared data. + const uint64_t zero_log_number = 0ull; + size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; + auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, + zero_log_number, disable_memtable, &seq_used, + batch_cnt, &update_commit_map); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + if (LIKELY(do_one_write || !s.ok())) { + if (LIKELY(s.ok())) { + // Note RemovePrepared should be called after WriteImpl that publishsed + // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + } + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; + } // else do the 2nd write to publish seq + // Note: the 2nd write comes with a performance penality. So if we have too + // many of commits accompanied with ComitTimeWriteBatch and yet we cannot + // enable use_only_the_last_commit_time_batch_for_recovery_ optimization, + // two_write_queues should be disabled to avoid many additional writes here. + class PublishSeqPreReleaseCallback : public PreReleaseCallback { + public: + explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) + : db_impl_(db_impl) {} + virtual Status Callback(SequenceNumber seq, bool is_mem_disabled + __attribute__((__unused__))) override { + assert(is_mem_disabled); + assert(db_impl_->immutable_db_options().two_write_queues); + db_impl_->SetLastPublishedSequence(seq); + return Status::OK(); + } + + private: + DBImpl* db_impl_; + } publish_seq_callback(db_impl_); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + const bool DISABLE_MEMTABLE = true; + const size_t ONE_BATCH = 1; + const uint64_t NO_REF_LOG = 0; + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + &publish_seq_callback); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + // Note RemovePrepared should be called after WriteImpl that publishsed the + // seq. Otherwise SmallestUnCommittedSeq optimization breaks. + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; +} + +Status WriteUnpreparedTxn::RollbackInternal() { + // TODO(lth): Reduce duplicate code with WritePrepared rollback logic. + WriteBatchWithIndex rollback_batch( + wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); + assert(GetId() != kMaxSequenceNumber); + assert(GetId() > 0); + const auto& cf_map = *wupt_db_->GetCFHandleMap(); + // In WritePrepared, the txn is is the same as prepare seq + auto last_visible_txn = GetId() - 1; + Status s; + + ReadOptions roptions; + // Note that we do not use WriteUnpreparedTxnReadCallback because we do not + // need to read our own writes when reading prior versions of the key for + // rollback. + WritePreparedTxnReadCallback callback(wpt_db_, last_visible_txn, 0); + for (const auto& cfkey : write_set_keys_) { + const auto cfid = cfkey.first; + const auto& keys = cfkey.second; + for (const auto& key : keys) { + const auto& cf_handle = cf_map.at(cfid); + PinnableSlice pinnable_val; + bool not_used; + s = db_impl_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); + + if (s.ok()) { + s = rollback_batch.Put(cf_handle, key, pinnable_val); + assert(s.ok()); + } else if (s.IsNotFound()) { + s = rollback_batch.Delete(cf_handle, key); + assert(s.ok()); + } else { + return s; + } + } + } + + // The Rollback marker will be used as a batch separator + WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_); + bool do_one_write = !db_impl_->immutable_db_options().two_write_queues; + const bool DISABLE_MEMTABLE = true; + const uint64_t NO_REF_LOG = 0; + uint64_t seq_used = kMaxSequenceNumber; + // TODO(lth): We write rollback batch all in a single batch here, but this + // should be subdivded into multiple batches as well. In phase 2, when key + // sets are read from WAL, this will happen naturally. + const size_t ONE_BATCH = 1; + // We commit the rolled back prepared batches. ALthough this is + // counter-intuitive, i) it is safe to do so, since the prepared batches are + // already canceled out by the rollback batch, ii) adding the commit entry to + // CommitCache will allow us to benefit from the existing mechanism in + // CommitCache that keeps an entry evicted due to max advance and yet overlaps + // with a live snapshot around so that the live snapshot properly skips the + // entry even if its prepare seq is lower than max_evicted_seq_. + WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map( + wpt_db_, db_impl_, unprep_seqs_, ONE_BATCH); + // Note: the rollback batch does not need AddPrepared since it is written to + // DB in one shot. min_uncommitted still works since it requires capturing + // data that is written to DB but not yet committed, while the roolback + // batch commits with PreReleaseCallback. + s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(), + nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE, + &seq_used, rollback_batch.SubBatchCnt(), + do_one_write ? &update_commit_map : nullptr); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + if (!s.ok()) { + return s; + } + if (do_one_write) { + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; + } // else do the 2nd write for commit + uint64_t& prepare_seq = seq_used; + ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, + "RollbackInternal 2nd write prepare_seq: %" PRIu64, + prepare_seq); + // Commit the batch by writing an empty batch to the queue that will release + // the commit sequence number to readers. + const size_t ZERO_COMMITS = 0; + WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare( + wpt_db_, db_impl_, prepare_seq, ONE_BATCH, ZERO_COMMITS); + WriteBatch empty_batch; + empty_batch.PutLogData(Slice()); + // In the absence of Prepare markers, use Noop as a batch separator + WriteBatchInternal::InsertNoop(&empty_batch); + s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, + NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, + &update_commit_map_with_prepare); + assert(!s.ok() || seq_used != kMaxSequenceNumber); + // Mark the txn as rolled back + uint64_t& rollback_seq = seq_used; + if (s.ok()) { + // Note: it is safe to do it after PreReleaseCallback via WriteImpl since + // all the writes by the prpared batch are already blinded by the rollback + // batch. The only reason we commit the prepared batch here is to benefit + // from the existing mechanism in CommitCache that takes care of the rare + // cases that the prepare seq is visible to a snsapshot but max evicted seq + // advances that prepare seq. + for (const auto& seq : unprep_seqs_) { + for (size_t i = 0; i < seq.second; i++) { + wpt_db_->AddCommitted(seq.first + i, rollback_seq); + } + } + for (const auto& seq : unprep_seqs_) { + wpt_db_->RemovePrepared(seq.first, seq.second); + } + } + + unprep_seqs_.clear(); + write_set_keys_.clear(); + return s; +} Status WriteUnpreparedTxn::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 1dc7338a5..84594070a 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -7,6 +7,8 @@ #ifndef ROCKSDB_LITE +#include + #include "utilities/transactions/write_prepared_txn.h" #include "utilities/transactions/write_unprepared_txn_db.h" @@ -42,7 +44,53 @@ class WriteUnpreparedTxn : public WritePreparedTxn { const WriteOptions& write_options, const TransactionOptions& txn_options); - virtual ~WriteUnpreparedTxn() {} + virtual ~WriteUnpreparedTxn(); + + using TransactionBaseImpl::Put; + virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + + using TransactionBaseImpl::Merge; + virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + + using TransactionBaseImpl::Delete; + virtual Status Delete(ColumnFamilyHandle* column_family, + const Slice& key) override; + virtual Status Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + + using TransactionBaseImpl::SingleDelete; + virtual Status SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) override; + virtual Status SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + + virtual Status RebuildFromWriteBatch(WriteBatch*) override { + // This function was only useful for recovering prepared transactions, but + // is unused for write prepared because a transaction may consist of + // multiple write batches. + // + // If there are use cases outside of recovery that can make use of this, + // then support could be added. + return Status::NotSupported("Not supported for WriteUnprepared"); + } + + const std::map& GetUnpreparedSequenceNumbers(); + + void UpdateWriteKeySet(uint32_t cfid, const Slice& key); + + protected: + void Initialize(const TransactionOptions& txn_options) override; + + Status PrepareInternal() override; + + Status CommitWithoutPrepareInternal() override; + Status CommitInternal() override; + + Status RollbackInternal() override; // Get and GetIterator needs to be overridden so that a ReadCallback to // handle read-your-own-write is used. @@ -56,20 +104,37 @@ class WriteUnpreparedTxn : public WritePreparedTxn { virtual Iterator* GetIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override; - const std::map& GetUnpreparedSequenceNumbers(); - private: friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test; + friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; + friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test; friend class WriteUnpreparedTxnDB; + Status MaybeFlushWriteBatchToDB(); + Status FlushWriteBatchToDB(bool prepared); + + // For write unprepared, we check on every writebatch append to see if + // max_write_batch_size_ has been exceeded, and then call + // FlushWriteBatchToDB if so. This logic is encapsulated in + // MaybeFlushWriteBatchToDB. + size_t max_write_batch_size_; WriteUnpreparedTxnDB* wupt_db_; // Ordered list of unprep_seq sequence numbers that we have already written // to DB. // - // This maps unprep_seq => prepare_batch_cnt for each prepared batch written - // by this transactioin. + // This maps unprep_seq => prepare_batch_cnt for each unprepared batch + // written by this transaction. + // + // Note that this contains both prepared and unprepared batches, since they + // are treated similarily in prepare heap/commit map, so it simplifies the + // commit callbacks. std::map unprep_seqs_; + + // Set of keys that have written to that have already been written to DB + // (ie. not in write_batch_). + // + std::map> write_set_keys_; }; } // namespace rocksdb diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 24d45254f..51bb30818 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -24,7 +24,6 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( assert(rtxn->unprepared_); auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap(); auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap(); - const bool kRollbackMergeOperands = true; WriteOptions w_options; // If we crash during recovery, we can just recalculate and rewrite the // rollback batch. @@ -131,7 +130,7 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( } } rollback_handler(db_impl_, this, last_visible_txn, &rollback_batch, *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), - !kRollbackMergeOperands); + txn_db_options_.rollback_merge_operands); auto s = batch->Iterate(&rollback_handler); if (!s.ok()) { @@ -240,7 +239,7 @@ Status WriteUnpreparedTxnDB::Initialize( TransactionOptions t_options; auto first_log_number = recovered_trx->batches_.begin()->second.log_number_; - auto last_seq = recovered_trx->batches_.rbegin()->first; + auto first_seq = recovered_trx->batches_.begin()->first; auto last_prepare_batch_cnt = recovered_trx->batches_.begin()->second.batch_cnt_; @@ -250,7 +249,7 @@ Status WriteUnpreparedTxnDB::Initialize( static_cast_with_check(real_trx); real_trx->SetLogNumber(first_log_number); - real_trx->SetId(last_seq); + real_trx->SetId(first_seq); s = real_trx->SetName(recovered_trx->name_); if (!s.ok()) { break; @@ -268,6 +267,13 @@ Status WriteUnpreparedTxnDB::Initialize( } assert(wupt->unprep_seqs_.count(seq) == 0); wupt->unprep_seqs_[seq] = cnt; + KeySetBuilder keyset_handler(wupt, + txn_db_options_.rollback_merge_operands); + s = batch_info.batch_->Iterate(&keyset_handler); + assert(s.ok()); + if (!s.ok()) { + break; + } } wupt->write_batch_.Clear(); @@ -366,5 +372,29 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, return db_iter; } +Status KeySetBuilder::PutCF(uint32_t cf, const Slice& key, + const Slice& /*val*/) { + txn_->UpdateWriteKeySet(cf, key); + return Status::OK(); +} + +Status KeySetBuilder::DeleteCF(uint32_t cf, const Slice& key) { + txn_->UpdateWriteKeySet(cf, key); + return Status::OK(); +} + +Status KeySetBuilder::SingleDeleteCF(uint32_t cf, const Slice& key) { + txn_->UpdateWriteKeySet(cf, key); + return Status::OK(); +} + +Status KeySetBuilder::MergeCF(uint32_t cf, const Slice& key, + const Slice& /*val*/) { + if (rollback_merge_operands_) { + txn_->UpdateWriteKeySet(cf, key); + } + return Status::OK(); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index 85d47593e..6763aa99f 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -40,5 +40,98 @@ class WriteUnpreparedTxnDB : public WritePreparedTxnDB { Status RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction* rtxn); }; +class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { + // TODO(lth): Reduce code duplication with + // WritePreparedCommitEntryPreReleaseCallback + public: + // includes_data indicates that the commit also writes non-empty + // CommitTimeWriteBatch to memtable, which needs to be committed separately. + WriteUnpreparedCommitEntryPreReleaseCallback( + WritePreparedTxnDB* db, DBImpl* db_impl, + const std::map& unprep_seqs, + size_t data_batch_cnt = 0, bool publish_seq = true) + : db_(db), + db_impl_(db_impl), + unprep_seqs_(unprep_seqs), + data_batch_cnt_(data_batch_cnt), + includes_data_(data_batch_cnt_ > 0), + publish_seq_(publish_seq) { + assert(unprep_seqs.size() > 0); + } + + virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled + __attribute__((__unused__))) override { + const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) + ? commit_seq + : commit_seq + data_batch_cnt_ - 1; + // Recall that unprep_seqs maps (un)prepared_seq => prepare_batch_cnt. + for (const auto& s : unprep_seqs_) { + for (size_t i = 0; i < s.second; i++) { + db_->AddCommitted(s.first + i, last_commit_seq); + } + } + + if (includes_data_) { + assert(data_batch_cnt_); + // Commit the data that is accompanied with the commit request + for (size_t i = 0; i < data_batch_cnt_; i++) { + // For commit seq of each batch use the commit seq of the last batch. + // This would make debugging easier by having all the batches having + // the same sequence number. + db_->AddCommitted(commit_seq + i, last_commit_seq); + } + } + if (db_impl_->immutable_db_options().two_write_queues && publish_seq_) { + assert(is_mem_disabled); // implies the 2nd queue + // Publish the sequence number. We can do that here assuming the callback + // is invoked only from one write queue, which would guarantee that the + // publish sequence numbers will be in order, i.e., once a seq is + // published all the seq prior to that are also publishable. + db_impl_->SetLastPublishedSequence(last_commit_seq); + } + // else SequenceNumber that is updated as part of the write already does the + // publishing + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + DBImpl* db_impl_; + const std::map& unprep_seqs_; + size_t data_batch_cnt_; + // Either because it is commit without prepare or it has a + // CommitTimeWriteBatch + bool includes_data_; + // Should the callback also publishes the commit seq number + bool publish_seq_; +}; + +struct KeySetBuilder : public WriteBatch::Handler { + WriteUnpreparedTxn* txn_; + bool rollback_merge_operands_; + + KeySetBuilder(WriteUnpreparedTxn* txn, bool rollback_merge_operands) + : txn_(txn), rollback_merge_operands_(rollback_merge_operands) {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override; + + Status DeleteCF(uint32_t cf, const Slice& key) override; + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override; + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override; + + // Recovered batches do not contain 2PC markers. + Status MarkNoop(bool) override { return Status::InvalidArgument(); } + Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } + Status MarkEndPrepare(const Slice&) override { + return Status::InvalidArgument(); + } + Status MarkCommit(const Slice&) override { return Status::InvalidArgument(); } + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } +}; + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 6715023f3..52eb70024 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -938,5 +938,9 @@ void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) { rep->write_batch.SetMaxBytes(max_bytes); } +size_t WriteBatchWithIndex::GetDataSize() const { + return rep->write_batch.GetDataSize(); +} + } // namespace rocksdb #endif // !ROCKSDB_LITE