diff --git a/db/db_impl.h b/db/db_impl.h index a27e45c38..3d281cbb7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -561,25 +561,50 @@ class DBImpl : public DB { // these will then be passed to TransactionDB so that // locks can be reacquired before writing can resume. struct RecoveredTransaction { - uint64_t log_number_; std::string name_; - WriteBatch* batch_; - // The seq number of the first key in the batch - SequenceNumber seq_; - // Number of sub-batched. A new sub-batch is created if we txn attempts to - // inserts a duplicate key,seq to memtable. This is currently used in - // WritePrparedTxn - size_t batch_cnt_; + bool unprepared_; + + struct BatchInfo { + uint64_t log_number_; + // TODO(lth): For unprepared, the memory usage here can be big for + // unprepared transactions. This is only useful for rollbacks, and we + // can in theory just keep keyset for that. + WriteBatch* batch_; + // Number of sub-batches. A new sub-batch is created if txn attempts to + // insert a duplicate key,seq to memtable. This is currently used in + // WritePreparedTxn/WriteUnpreparedTxn. + size_t batch_cnt_; + }; + + // This maps the seq of the first key in the batch to BatchInfo, which + // contains WriteBatch and other information relevant to the batch. + // + // For WriteUnprepared, batches_ can have size greater than 1, but for + // other write policies, it must be of size 1. + std::map batches_; + explicit RecoveredTransaction(const uint64_t log, const std::string& name, WriteBatch* batch, SequenceNumber seq, - size_t batch_cnt) - : log_number_(log), - name_(name), - batch_(batch), - seq_(seq), - batch_cnt_(batch_cnt) {} - - ~RecoveredTransaction() { delete batch_; } + size_t batch_cnt, bool unprepared) + : name_(name), unprepared_(unprepared) { + batches_[seq] = {log, batch, batch_cnt}; + } + + ~RecoveredTransaction() { + for (auto& it : batches_) { + delete it.second.batch_; + } + } + + void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch, + size_t batch_cnt, bool unprepared) { + assert(batches_.count(seq) == 0); + batches_[seq] = {log_number, batch, batch_cnt}; + // Prior state must be unprepared, since the prepare batch must be the + // last batch. + assert(unprepared_); + unprepared_ = unprepared; + } }; bool allow_2pc() const { return immutable_db_options_.allow_2pc; } @@ -600,9 +625,19 @@ class DBImpl : public DB { void InsertRecoveredTransaction(const uint64_t log, const std::string& name, WriteBatch* batch, SequenceNumber seq, - size_t batch_cnt) { - recovered_transactions_[name] = - new RecoveredTransaction(log, name, batch, seq, batch_cnt); + size_t batch_cnt, bool unprepared_batch) { + // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple + // times for every unprepared batch encountered during recovery. + // + // If the transaction is prepared, then the last call to + // InsertRecoveredTransaction will have unprepared_batch = false. + auto rtxn = recovered_transactions_.find(name); + if (rtxn == recovered_transactions_.end()) { + recovered_transactions_[name] = new RecoveredTransaction( + log, name, batch, seq, batch_cnt, unprepared_batch); + } else { + rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch); + } logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log); } @@ -611,7 +646,10 @@ class DBImpl : public DB { assert(it != recovered_transactions_.end()); auto* trx = it->second; recovered_transactions_.erase(it); - logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(trx->log_number_); + for (const auto& info : trx->batches_) { + logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed( + info.second.log_number_); + } delete trx; } @@ -751,6 +789,7 @@ class DBImpl : public DB { friend class WritePreparedTxn; friend class WritePreparedTxnDB; friend class WriteBatchWithIndex; + friend class WriteUnpreparedTxnDB; #ifndef ROCKSDB_LITE friend class ForwardIterator; #endif @@ -762,6 +801,7 @@ class DBImpl : public DB { friend class WriteCallbackTest_WriteWithCallbackTest_Test; friend class XFTransactionWriteHandler; friend class DBBlobIndexTest; + friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test; #endif struct CompactionState; diff --git a/db/dbformat.h b/db/dbformat.h index 221959036..d191fbd4a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -58,8 +58,8 @@ enum ValueType : unsigned char { // Similar to kTypeBeginPersistedPrepareXID, this is to ensure that WAL // generated by WriteUnprepared write policy is not mistakenly read by // another. - kTypeBeginUnprepareXID = 0x13, // WAL only. - kMaxValue = 0x7F // Not used for storing records. + kTypeBeginUnprepareXID = 0x13, // WAL only. + kMaxValue = 0x7F // Not used for storing records. }; // Defined in dbformat.cc diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index dce1b093f..d0b95543d 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -284,7 +284,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { const Slice& /*val*/) override { return Status::OK(); } - Status MarkBeginPrepare() override { return Status::OK(); } + Status MarkBeginPrepare(bool) override { return Status::OK(); } Status MarkRollback(const Slice&) override { return Status::OK(); } }; diff --git a/db/write_batch.cc b/db/write_batch.cc index 6e2db1cf3..5c9378f56 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -73,6 +73,7 @@ enum ContentFlags : uint32_t { HAS_ROLLBACK = 1 << 8, HAS_DELETE_RANGE = 1 << 9, HAS_BLOB_INDEX = 1 << 10, + HAS_BEGIN_UNPREPARE = 1 << 11, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -108,8 +109,11 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } - Status MarkBeginPrepare() override { + Status MarkBeginPrepare(bool unprepare) override { content_flags |= ContentFlags::HAS_BEGIN_PREPARE; + if (unprepare) { + content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE; + } return Status::OK(); } @@ -532,8 +536,8 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeBeginUnprepareXID: assert(content_flags_.load(std::memory_order_relaxed) & - (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); - handler->MarkBeginPrepare(); + (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE)); + handler->MarkBeginPrepare(true /* unprepared */); empty_batch = false; if (handler->WriteAfterCommit()) { s = Status::NotSupported( @@ -1052,6 +1056,8 @@ class MemTableInserter : public WriteBatch::Handler { bool write_after_commit_; // Whether memtable write can be done before prepare bool write_before_prepare_; + // Whether this batch was unprepared or not + bool unprepared_batch_; using DupDetector = std::aligned_storage::type; DupDetector duplicate_detector_; bool dup_dectector_on_; @@ -1111,6 +1117,7 @@ class MemTableInserter : public WriteBatch::Handler { // WriteUnprepared can write WriteBatches per transaction, so // batch_per_txn being false indicates write_before_prepare. write_before_prepare_(!batch_per_txn), + unprepared_batch_(false), duplicate_detector_(), dup_dectector_on_(false) { assert(cf_mems_); @@ -1586,7 +1593,9 @@ class MemTableInserter : public WriteBatch::Handler { } } - Status MarkBeginPrepare() override { + // The write batch handler calls MarkBeginPrepare with unprepare set to true + // if it encounters the kTypeBeginUnprepareXID marker. + Status MarkBeginPrepare(bool unprepare) override { assert(rebuilding_trx_ == nullptr); assert(db_); @@ -1602,6 +1611,11 @@ class MemTableInserter : public WriteBatch::Handler { // we are now iterating through a prepared section rebuilding_trx_ = new WriteBatch(); rebuilding_trx_seq_ = sequence_; + // We only call MarkBeginPrepare once per batch, and unprepared_batch_ + // is initialized to false by default. + assert(!unprepared_batch_); + unprepared_batch_ = unprepare; + if (has_valid_writes_ != nullptr) { *has_valid_writes_ = true; } @@ -1622,7 +1636,7 @@ class MemTableInserter : public WriteBatch::Handler { : static_cast(sequence_ - rebuilding_trx_seq_ + 1); db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(), rebuilding_trx_, rebuilding_trx_seq_, - batch_cnt); + batch_cnt, unprepared_batch_); rebuilding_trx_ = nullptr; } else { assert(rebuilding_trx_ == nullptr); @@ -1665,9 +1679,12 @@ class MemTableInserter : public WriteBatch::Handler { // duplicate re-insertion of values. assert(log_number_ref_ == 0); if (write_after_commit_) { + // write_after_commit_ can only have one batch in trx. + assert(trx->batches_.size() == 1); + const auto& batch_info = trx->batches_.begin()->second; // all inserts must reference this trx log number - log_number_ref_ = trx->log_number_; - s = trx->batch_->Iterate(this); + log_number_ref_ = batch_info.log_number_; + s = batch_info.batch_->Iterate(this); log_number_ref_ = 0; } // else the values are already inserted before the commit diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index cf2a121e4..84b866a31 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -290,8 +290,9 @@ namespace { virtual void LogData(const Slice& blob) override { seen += "LogData(" + blob.ToString() + ")"; } - virtual Status MarkBeginPrepare() override { - seen += "MarkBeginPrepare()"; + virtual Status MarkBeginPrepare(bool unprepare) override { + seen += + "MarkBeginPrepare(" + std::string(unprepare ? "true" : "false") + ")"; return Status::OK(); } virtual Status MarkEndPrepare(const Slice& xid) override { @@ -403,7 +404,7 @@ TEST_F(WriteBatchTest, PrepareCommit) { TestHandler handler; batch.Iterate(&handler); ASSERT_EQ( - "MarkBeginPrepare()" + "MarkBeginPrepare(false)" "Put(k1, v1)" "Put(k2, v2)" "MarkEndPrepare(xid1)" diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index d87dfb500..bdf918fd6 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -488,6 +488,7 @@ class Transaction { private: friend class PessimisticTransactionDB; + friend class WriteUnpreparedTxnDB; // No copying allowed Transaction(const Transaction&); void operator=(const Transaction&); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 3abf53b9d..7db177f86 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -243,7 +243,7 @@ class WriteBatch : public WriteBatchBase { // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); - virtual Status MarkBeginPrepare() { + virtual Status MarkBeginPrepare(bool = false) { return Status::InvalidArgument("MarkBeginPrepare() handler not defined."); } diff --git a/java/rocksjni/writebatchhandlerjnicallback.cc b/java/rocksjni/writebatchhandlerjnicallback.cc index 5173f64b5..69fe87659 100644 --- a/java/rocksjni/writebatchhandlerjnicallback.cc +++ b/java/rocksjni/writebatchhandlerjnicallback.cc @@ -305,7 +305,8 @@ rocksdb::Status WriteBatchHandlerJniCallback::PutBlobIndexCF(uint32_t column_fam } } -rocksdb::Status WriteBatchHandlerJniCallback::MarkBeginPrepare() { +rocksdb::Status WriteBatchHandlerJniCallback::MarkBeginPrepare(bool unprepare) { + assert(!unprepare); m_env->CallVoidMethod(m_jcallback_obj, m_jMarkBeginPrepareMethodId); // check for Exception, in-particular RocksDBException diff --git a/java/rocksjni/writebatchhandlerjnicallback.h b/java/rocksjni/writebatchhandlerjnicallback.h index 311a268db..720f1693c 100644 --- a/java/rocksjni/writebatchhandlerjnicallback.h +++ b/java/rocksjni/writebatchhandlerjnicallback.h @@ -42,8 +42,8 @@ class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Hand void DeleteRange(const Slice& beginKey, const Slice& endKey); void LogData(const Slice& blob); Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key, - const Slice& value); - Status MarkBeginPrepare(); + const Slice& value); + Status MarkBeginPrepare(bool); Status MarkEndPrepare(const Slice& xid); Status MarkNoop(bool empty_batch); Status MarkRollback(const Slice& xid); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 62c3a6af9..ffcda844c 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1917,8 +1917,9 @@ class InMemoryHandler : public WriteBatch::Handler { return Status::OK(); } - virtual Status MarkBeginPrepare() override { - row_ << "BEGIN_PREARE "; + virtual Status MarkBeginPrepare(bool unprepare) override { + row_ << "BEGIN_PREPARE("; + row_ << (unprepare ? "true" : "false") << ") "; return Status::OK(); } diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index ff8121e9e..0f1d9b41e 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -124,7 +124,10 @@ Status PessimisticTransactionDB::Initialize( for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) { auto recovered_trx = it->second; assert(recovered_trx); - assert(recovered_trx->log_number_); + assert(recovered_trx->batches_.size() == 1); + const auto& seq = recovered_trx->batches_.begin()->first; + const auto& batch_info = recovered_trx->batches_.begin()->second; + assert(batch_info.log_number_); assert(recovered_trx->name_.length()); WriteOptions w_options; @@ -133,21 +136,20 @@ Status PessimisticTransactionDB::Initialize( Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); assert(real_trx); - real_trx->SetLogNumber(recovered_trx->log_number_); - assert(recovered_trx->seq_ != kMaxSequenceNumber); - real_trx->SetId(recovered_trx->seq_); + real_trx->SetLogNumber(batch_info.log_number_); + assert(seq != kMaxSequenceNumber); + real_trx->SetId(seq); s = real_trx->SetName(recovered_trx->name_); if (!s.ok()) { break; } - s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_); + s = real_trx->RebuildFromWriteBatch(batch_info.batch_); // WriteCommitted set this to to disable this check that is specific to // WritePrepared txns - assert(recovered_trx->batch_cnt_ == 0 || - real_trx->GetWriteBatch()->SubBatchCnt() == - recovered_trx->batch_cnt_); + assert(batch_info.batch_cnt_ == 0 || + real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_); real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { break; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 13ce4ab08..679cd26ff 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -142,6 +142,7 @@ class PessimisticTransactionDB : public TransactionDB { friend class TransactionTest_TwoPhaseLongPrepareTest_Test; friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; + friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test; TransactionLockMgr lock_mgr_; // Must be held when adding/dropping column families. diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 3539c7431..821913ed8 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -699,7 +699,7 @@ Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) { // this is used for reconstructing prepared transactions upon // recovery. there should not be any meta markers in the batches // we are processing. - Status MarkBeginPrepare() override { return Status::InvalidArgument(); } + Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); } Status MarkEndPrepare(const Slice&) override { return Status::InvalidArgument(); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 7ae7ae09b..5bf5835b1 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -294,7 +294,7 @@ Status WritePreparedTxn::RollbackInternal() { } Status MarkNoop(bool) override { return Status::OK(); } - Status MarkBeginPrepare() override { return Status::OK(); } + Status MarkBeginPrepare(bool) override { return Status::OK(); } Status MarkEndPrepare(const Slice&) override { return Status::OK(); } Status MarkCommit(const Slice&) override { return Status::OK(); } Status MarkRollback(const Slice&) override { diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 6685c8278..4016241c9 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -71,6 +71,7 @@ class WritePreparedTxn : public PessimisticTransaction { private: friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTxnDB; + friend class WriteUnpreparedTxnDB; Status PrepareInternal() override; diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 105c37df7..34ed04aa6 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -37,9 +37,13 @@ Status WritePreparedTxnDB::Initialize( assert(dbimpl != nullptr); auto rtxns = dbimpl->recovered_transactions(); for (auto rtxn : rtxns) { - auto cnt = rtxn.second->batch_cnt_ ? rtxn.second->batch_cnt_ : 1; + // There should only one batch for WritePrepared policy. + assert(rtxn.second->batches_.size() == 1); + const auto& seq = rtxn.second->batches_.begin()->first; + const auto& batch_info = rtxn.second->batches_.begin()->second; + auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; for (size_t i = 0; i < cnt; i++) { - AddPrepared(rtxn.second->seq_ + i); + AddPrepared(seq + i); } } SequenceNumber prev_max = max_evicted_seq_; diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 65797aa77..e91f68be2 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -383,6 +383,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_RollbackTest_Test; + friend class WriteUnpreparedTxnDB; + friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test; void Init(const TransactionDBOptions& /* unused */); @@ -754,7 +756,7 @@ struct SubBatchCounter : public WriteBatch::Handler { AddKey(cf, key); return Status::OK(); } - Status MarkBeginPrepare() override { return Status::OK(); } + Status MarkBeginPrepare(bool) override { return Status::OK(); } Status MarkRollback(const Slice&) override { return Status::OK(); } bool WriteAfterCommit() const override { return false; } }; diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 115695cd0..542ab9cd9 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -188,6 +188,111 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { delete txn; } +TEST_P(WriteUnpreparedTransactionTest, RecoveryRollbackUnprepared) { + WriteOptions write_options; + write_options.disableWAL = false; + uint64_t seq_used = kMaxSequenceNumber; + uint64_t log_number; + WriteBatch batch; + std::vector prepared_trans; + WriteUnpreparedTxnDB* wup_db; + options.disable_auto_compactions = true; + + // Try unprepared batches with empty database. + for (int num_batches = 0; num_batches < 10; num_batches++) { + // Reset database. + prepared_trans.clear(); + ReOpen(); + wup_db = dynamic_cast(db); + + // Write num_batches unprepared batches into the WAL. + for (int i = 0; i < num_batches; i++) { + batch.Clear(); + // TODO(lth): Instead of manually calling WriteImpl with a write batch, + // use methods on Transaction instead once it is implemented. + ASSERT_OK(WriteBatchInternal::InsertNoop(&batch)); + ASSERT_OK(WriteBatchInternal::Put(&batch, + db->DefaultColumnFamily()->GetID(), + "k" + ToString(i), "value")); + // MarkEndPrepare will change the Noop marker into an unprepared marker. + ASSERT_OK(WriteBatchInternal::MarkEndPrepare( + &batch, Slice("xid1"), /* write after commit */ false, + /* unprepared batch */ true)); + ASSERT_OK(wup_db->db_impl_->WriteImpl( + write_options, &batch, /*callback*/ nullptr, &log_number, + /*log ref*/ 0, /* disable memtable */ true, &seq_used, + /* prepare_batch_cnt_ */ 1)); + } + + // Crash and run recovery code paths. + wup_db->db_impl_->FlushWAL(true); + wup_db->TEST_Crash(); + ReOpenNoDelete(); + wup_db = dynamic_cast(db); + + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + + // Check that DB is empty. + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + delete iter; + } + + // Try unprepared batches with non-empty database. + for (int num_batches = 1; num_batches < 10; num_batches++) { + // Reset database. + prepared_trans.clear(); + ReOpen(); + wup_db = dynamic_cast(db); + for (int i = 0; i < num_batches; i++) { + ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i), + "before value " + ToString(i))); + } + + // Write num_batches unprepared batches into the WAL. + for (int i = 0; i < num_batches; i++) { + batch.Clear(); + // TODO(lth): Instead of manually calling WriteImpl with a write batch, + // use methods on Transaction instead once it is implemented. + ASSERT_OK(WriteBatchInternal::InsertNoop(&batch)); + ASSERT_OK(WriteBatchInternal::Put(&batch, + db->DefaultColumnFamily()->GetID(), + "k" + ToString(i), "value")); + // MarkEndPrepare will change the Noop marker into an unprepared marker. + ASSERT_OK(WriteBatchInternal::MarkEndPrepare( + &batch, Slice("xid1"), /* write after commit */ false, + /* unprepared batch */ true)); + ASSERT_OK(wup_db->db_impl_->WriteImpl( + write_options, &batch, /*callback*/ nullptr, &log_number, + /*log ref*/ 0, /* disable memtable */ true, &seq_used, + /* prepare_batch_cnt_ */ 1)); + } + + // Crash and run recovery code paths. + wup_db->db_impl_->FlushWAL(true); + wup_db->TEST_Crash(); + ReOpenNoDelete(); + wup_db = dynamic_cast(db); + + db->GetAllPreparedTransactions(&prepared_trans); + ASSERT_EQ(prepared_trans.size(), 0); + + // Check that DB has before values. + Iterator* iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + for (int i = 0; i < num_batches; i++) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "k" + ToString(i)); + ASSERT_EQ(iter->value().ToString(), "before value " + ToString(i)); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + delete iter; + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index 65eb7ad98..1dc7338a5 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -60,6 +60,7 @@ class WriteUnpreparedTxn : public WritePreparedTxn { private: friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test; + friend class WriteUnpreparedTxnDB; WriteUnpreparedTxnDB* wupt_db_; diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 913ee3bd4..24d45254f 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -15,6 +15,293 @@ namespace rocksdb { +// Instead of reconstructing a Transaction object, and calling rollback on it, +// we can be more efficient with RollbackRecoveredTransaction by skipping +// unnecessary steps (eg. updating CommitMap, reconstructing keyset) +Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( + const DBImpl::RecoveredTransaction* rtxn) { + // TODO(lth): Reduce duplicate code with WritePrepared rollback logic. + assert(rtxn->unprepared_); + auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap(); + auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap(); + const bool kRollbackMergeOperands = true; + WriteOptions w_options; + // If we crash during recovery, we can just recalculate and rewrite the + // rollback batch. + w_options.disableWAL = true; + + // Iterate starting with largest sequence number. + for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); it++) { + auto last_visible_txn = it->first - 1; + const auto& batch = it->second.batch_; + WriteBatch rollback_batch; + + struct RollbackWriteBatchBuilder : public WriteBatch::Handler { + DBImpl* db_; + ReadOptions roptions; + WritePreparedTxnReadCallback callback; + WriteBatch* rollback_batch_; + std::map& comparators_; + std::map& handles_; + using CFKeys = std::set; + std::map keys_; + bool rollback_merge_operands_; + RollbackWriteBatchBuilder( + DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq, + WriteBatch* dst_batch, + std::map& comparators, + std::map& handles, + bool rollback_merge_operands) + : db_(db), + callback(wpt_db, snap_seq, + 0), // 0 disables min_uncommitted optimization + rollback_batch_(dst_batch), + comparators_(comparators), + handles_(handles), + rollback_merge_operands_(rollback_merge_operands) {} + + Status Rollback(uint32_t cf, const Slice& key) { + Status s; + CFKeys& cf_keys = keys_[cf]; + if (cf_keys.size() == 0) { // just inserted + auto cmp = comparators_[cf]; + keys_[cf] = CFKeys(SetComparator(cmp)); + } + auto res = cf_keys.insert(key); + if (res.second == + false) { // second is false if a element already existed. + return s; + } + + PinnableSlice pinnable_val; + bool not_used; + auto cf_handle = handles_[cf]; + s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, ¬_used, + &callback); + assert(s.ok() || s.IsNotFound()); + if (s.ok()) { + s = rollback_batch_->Put(cf_handle, key, pinnable_val); + assert(s.ok()); + } else if (s.IsNotFound()) { + // There has been no readable value before txn. By adding a delete we + // make sure that there will be none afterwards either. + s = rollback_batch_->Delete(cf_handle, key); + assert(s.ok()); + } else { + // Unexpected status. Return it to the user. + } + return s; + } + + Status PutCF(uint32_t cf, const Slice& key, + const Slice& /*val*/) override { + return Rollback(cf, key); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return Rollback(cf, key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return Rollback(cf, key); + } + + Status MergeCF(uint32_t cf, const Slice& key, + const Slice& /*val*/) override { + if (rollback_merge_operands_) { + return Rollback(cf, key); + } else { + return Status::OK(); + } + } + + // Recovered batches do not contain 2PC markers. + Status MarkNoop(bool) override { return Status::InvalidArgument(); } + Status MarkBeginPrepare(bool) override { + return Status::InvalidArgument(); + } + Status MarkEndPrepare(const Slice&) override { + return Status::InvalidArgument(); + } + Status MarkCommit(const Slice&) override { + return Status::InvalidArgument(); + } + Status MarkRollback(const Slice&) override { + return Status::InvalidArgument(); + } + } rollback_handler(db_impl_, this, last_visible_txn, &rollback_batch, + *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), + !kRollbackMergeOperands); + + auto s = batch->Iterate(&rollback_handler); + if (!s.ok()) { + return s; + } + + // The Rollback marker will be used as a batch separator + WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_); + + const uint64_t kNoLogRef = 0; + const bool kDisableMemtable = true; + const size_t kOneBatch = 1; + uint64_t seq_used = kMaxSequenceNumber; + s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr, + kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch); + if (!s.ok()) { + return s; + } + + // If two_write_queues, we must manually release the sequence number to + // readers. + if (db_impl_->immutable_db_options().two_write_queues) { + db_impl_->SetLastPublishedSequence(seq_used); + } + } + + return Status::OK(); +} + +Status WriteUnpreparedTxnDB::Initialize( + const std::vector& compaction_enabled_cf_indices, + const std::vector& handles) { + // TODO(lth): Reduce code duplication in this function. + auto dbimpl = reinterpret_cast(GetRootDB()); + assert(dbimpl != nullptr); + + db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); + // A callback to commit a single sub-batch + class CommitSubBatchPreReleaseCallback : public PreReleaseCallback { + public: + explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) + : db_(db) {} + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled) override { +#ifdef NDEBUG + (void)is_mem_disabled; +#endif + assert(!is_mem_disabled); + db_->AddCommitted(commit_seq, commit_seq); + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + }; + db_impl_->SetRecoverableStatePreReleaseCallback( + new CommitSubBatchPreReleaseCallback(this)); + + // PessimisticTransactionDB::Initialize + for (auto cf_ptr : handles) { + AddColumnFamily(cf_ptr); + } + // Verify cf options + for (auto handle : handles) { + ColumnFamilyDescriptor cfd; + Status s = handle->GetDescriptor(&cfd); + if (!s.ok()) { + return s; + } + s = VerifyCFOptions(cfd.options); + if (!s.ok()) { + return s; + } + } + + // Re-enable compaction for the column families that initially had + // compaction enabled. + std::vector compaction_enabled_cf_handles; + compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); + for (auto index : compaction_enabled_cf_indices) { + compaction_enabled_cf_handles.push_back(handles[index]); + } + + Status s = EnableAutoCompaction(compaction_enabled_cf_handles); + if (!s.ok()) { + return s; + } + + // create 'real' transactions from recovered shell transactions + auto rtxns = dbimpl->recovered_transactions(); + for (auto rtxn : rtxns) { + auto recovered_trx = rtxn.second; + assert(recovered_trx); + assert(recovered_trx->batches_.size() >= 1); + assert(recovered_trx->name_.length()); + + // We can only rollback transactions after AdvanceMaxEvictedSeq is called, + // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why + // two iterations is required. + if (recovered_trx->unprepared_) { + continue; + } + + WriteOptions w_options; + w_options.sync = true; + TransactionOptions t_options; + + auto first_log_number = recovered_trx->batches_.begin()->second.log_number_; + auto last_seq = recovered_trx->batches_.rbegin()->first; + auto last_prepare_batch_cnt = + recovered_trx->batches_.begin()->second.batch_cnt_; + + Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); + assert(real_trx); + auto wupt = + static_cast_with_check(real_trx); + + real_trx->SetLogNumber(first_log_number); + real_trx->SetId(last_seq); + s = real_trx->SetName(recovered_trx->name_); + if (!s.ok()) { + break; + } + wupt->prepare_batch_cnt_ = last_prepare_batch_cnt; + + for (auto batch : recovered_trx->batches_) { + const auto& seq = batch.first; + const auto& batch_info = batch.second; + auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; + assert(batch_info.log_number_); + + for (size_t i = 0; i < cnt; i++) { + AddPrepared(seq + i); + } + assert(wupt->unprep_seqs_.count(seq) == 0); + wupt->unprep_seqs_[seq] = cnt; + } + + wupt->write_batch_.Clear(); + WriteBatchInternal::InsertNoop(wupt->write_batch_.GetWriteBatch()); + + real_trx->SetState(Transaction::PREPARED); + if (!s.ok()) { + break; + } + } + + SequenceNumber prev_max = max_evicted_seq_; + SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); + AdvanceMaxEvictedSeq(prev_max, last_seq); + + // Rollback unprepared transactions. + for (auto rtxn : rtxns) { + auto recovered_trx = rtxn.second; + if (recovered_trx->unprepared_) { + s = RollbackRecoveredTransaction(recovered_trx); + if (!s.ok()) { + return s; + } + continue; + } + } + + if (s.ok()) { + dbimpl->DeleteAllRecoveredTransactions(); + } + + return s; +} + Transaction* WriteUnpreparedTxnDB::BeginTransaction( const WriteOptions& write_options, const TransactionOptions& txn_options, Transaction* old_txn) { diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index 10393d59e..85d47593e 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -21,7 +21,11 @@ class WriteUnpreparedTxnDB : public WritePreparedTxnDB { public: using WritePreparedTxnDB::WritePreparedTxnDB; - Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options, + Status Initialize(const std::vector& compaction_enabled_cf_indices, + const std::vector& handles) override; + + Transaction* BeginTransaction(const WriteOptions& write_options, + const TransactionOptions& txn_options, Transaction* old_txn) override; // Struct to hold ownership of snapshot and read callback for cleanup. @@ -31,6 +35,9 @@ class WriteUnpreparedTxnDB : public WritePreparedTxnDB { Iterator* NewIterator(const ReadOptions& options, ColumnFamilyHandle* column_family, WriteUnpreparedTxn* txn); + + private: + Status RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction* rtxn); }; } // namespace rocksdb