WriteUnPrepared: Add support for recovering WriteUnprepared transactions (#4078)

Summary:
This adds support for recovering WriteUnprepared transactions through the following changes:
- The information in `RecoveredTransaction` is extended so that it can reference multiple batches.
- `MarkBeginPrepare` is extended with a bool indicating whether it is an unprepared begin, and this is passed down to `InsertRecoveredTransaction` to indicate whether the current transaction is prepared or not.
- `WriteUnpreparedTxnDB::Initialize` is overridden so that it will rollback unprepared transactions from the recovered transactions. This can be done without updating the prepare heap/commit map, because this is before the DB has finished initializing, and after writing the rollback batch, those data structures should not contain information about the rolled back transaction anyway.

Commit/Rollback of live transactions is still unimplemented and will come later.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4078

Differential Revision: D8703382

Pulled By: lth

fbshipit-source-id: 7e0aada6c23bd39299f1f20d6c060492e0e6b60a
main
Manuel Ung 7 years ago committed by Facebook Github Bot
parent db7ae0a485
commit b9846370e9
  1. 80
      db/db_impl.h
  2. 4
      db/dbformat.h
  3. 2
      db/transaction_log_impl.cc
  4. 31
      db/write_batch.cc
  5. 7
      db/write_batch_test.cc
  6. 1
      include/rocksdb/utilities/transaction.h
  7. 2
      include/rocksdb/write_batch.h
  8. 3
      java/rocksjni/writebatchhandlerjnicallback.cc
  9. 4
      java/rocksjni/writebatchhandlerjnicallback.h
  10. 5
      tools/ldb_cmd.cc
  11. 18
      utilities/transactions/pessimistic_transaction_db.cc
  12. 1
      utilities/transactions/pessimistic_transaction_db.h
  13. 2
      utilities/transactions/transaction_base.cc
  14. 2
      utilities/transactions/write_prepared_txn.cc
  15. 1
      utilities/transactions/write_prepared_txn.h
  16. 8
      utilities/transactions/write_prepared_txn_db.cc
  17. 4
      utilities/transactions/write_prepared_txn_db.h
  18. 105
      utilities/transactions/write_unprepared_transaction_test.cc
  19. 1
      utilities/transactions/write_unprepared_txn.h
  20. 287
      utilities/transactions/write_unprepared_txn_db.cc
  21. 9
      utilities/transactions/write_unprepared_txn_db.h

@ -561,25 +561,50 @@ class DBImpl : public DB {
// these will then be passed to TransactionDB so that
// locks can be reacquired before writing can resume.
struct RecoveredTransaction {
uint64_t log_number_;
std::string name_;
WriteBatch* batch_;
// The seq number of the first key in the batch
SequenceNumber seq_;
// Number of sub-batched. A new sub-batch is created if we txn attempts to
// inserts a duplicate key,seq to memtable. This is currently used in
// WritePrparedTxn
size_t batch_cnt_;
bool unprepared_;
struct BatchInfo {
uint64_t log_number_;
// TODO(lth): For unprepared, the memory usage here can be big for
// unprepared transactions. This is only useful for rollbacks, and we
// can in theory just keep keyset for that.
WriteBatch* batch_;
// Number of sub-batches. A new sub-batch is created if txn attempts to
// insert a duplicate key,seq to memtable. This is currently used in
// WritePreparedTxn/WriteUnpreparedTxn.
size_t batch_cnt_;
};
// This maps the seq of the first key in the batch to BatchInfo, which
// contains WriteBatch and other information relevant to the batch.
//
// For WriteUnprepared, batches_ can have size greater than 1, but for
// other write policies, it must be of size 1.
std::map<SequenceNumber, BatchInfo> batches_;
explicit RecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch, SequenceNumber seq,
size_t batch_cnt)
: log_number_(log),
name_(name),
batch_(batch),
seq_(seq),
batch_cnt_(batch_cnt) {}
~RecoveredTransaction() { delete batch_; }
size_t batch_cnt, bool unprepared)
: name_(name), unprepared_(unprepared) {
batches_[seq] = {log, batch, batch_cnt};
}
~RecoveredTransaction() {
for (auto& it : batches_) {
delete it.second.batch_;
}
}
void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch,
size_t batch_cnt, bool unprepared) {
assert(batches_.count(seq) == 0);
batches_[seq] = {log_number, batch, batch_cnt};
// Prior state must be unprepared, since the prepare batch must be the
// last batch.
assert(unprepared_);
unprepared_ = unprepared;
}
};
bool allow_2pc() const { return immutable_db_options_.allow_2pc; }
@ -600,9 +625,19 @@ class DBImpl : public DB {
void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch, SequenceNumber seq,
size_t batch_cnt) {
recovered_transactions_[name] =
new RecoveredTransaction(log, name, batch, seq, batch_cnt);
size_t batch_cnt, bool unprepared_batch) {
// For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple
// times for every unprepared batch encountered during recovery.
//
// If the transaction is prepared, then the last call to
// InsertRecoveredTransaction will have unprepared_batch = false.
auto rtxn = recovered_transactions_.find(name);
if (rtxn == recovered_transactions_.end()) {
recovered_transactions_[name] = new RecoveredTransaction(
log, name, batch, seq, batch_cnt, unprepared_batch);
} else {
rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch);
}
logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log);
}
@ -611,7 +646,10 @@ class DBImpl : public DB {
assert(it != recovered_transactions_.end());
auto* trx = it->second;
recovered_transactions_.erase(it);
logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(trx->log_number_);
for (const auto& info : trx->batches_) {
logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(
info.second.log_number_);
}
delete trx;
}
@ -751,6 +789,7 @@ class DBImpl : public DB {
friend class WritePreparedTxn;
friend class WritePreparedTxnDB;
friend class WriteBatchWithIndex;
friend class WriteUnpreparedTxnDB;
#ifndef ROCKSDB_LITE
friend class ForwardIterator;
#endif
@ -762,6 +801,7 @@ class DBImpl : public DB {
friend class WriteCallbackTest_WriteWithCallbackTest_Test;
friend class XFTransactionWriteHandler;
friend class DBBlobIndexTest;
friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test;
#endif
struct CompactionState;

@ -58,8 +58,8 @@ enum ValueType : unsigned char {
// Similar to kTypeBeginPersistedPrepareXID, this is to ensure that WAL
// generated by WriteUnprepared write policy is not mistakenly read by
// another.
kTypeBeginUnprepareXID = 0x13, // WAL only.
kMaxValue = 0x7F // Not used for storing records.
kTypeBeginUnprepareXID = 0x13, // WAL only.
kMaxValue = 0x7F // Not used for storing records.
};
// Defined in dbformat.cc

@ -284,7 +284,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
const Slice& /*val*/) override {
return Status::OK();
}
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
};

@ -73,6 +73,7 @@ enum ContentFlags : uint32_t {
HAS_ROLLBACK = 1 << 8,
HAS_DELETE_RANGE = 1 << 9,
HAS_BLOB_INDEX = 1 << 10,
HAS_BEGIN_UNPREPARE = 1 << 11,
};
struct BatchContentClassifier : public WriteBatch::Handler {
@ -108,8 +109,11 @@ struct BatchContentClassifier : public WriteBatch::Handler {
return Status::OK();
}
Status MarkBeginPrepare() override {
Status MarkBeginPrepare(bool unprepare) override {
content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
if (unprepare) {
content_flags |= ContentFlags::HAS_BEGIN_UNPREPARE;
}
return Status::OK();
}
@ -532,8 +536,8 @@ Status WriteBatch::Iterate(Handler* handler) const {
break;
case kTypeBeginUnprepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
handler->MarkBeginPrepare(true /* unprepared */);
empty_batch = false;
if (handler->WriteAfterCommit()) {
s = Status::NotSupported(
@ -1052,6 +1056,8 @@ class MemTableInserter : public WriteBatch::Handler {
bool write_after_commit_;
// Whether memtable write can be done before prepare
bool write_before_prepare_;
// Whether this batch was unprepared or not
bool unprepared_batch_;
using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
DupDetector duplicate_detector_;
bool dup_dectector_on_;
@ -1111,6 +1117,7 @@ class MemTableInserter : public WriteBatch::Handler {
// WriteUnprepared can write WriteBatches per transaction, so
// batch_per_txn being false indicates write_before_prepare.
write_before_prepare_(!batch_per_txn),
unprepared_batch_(false),
duplicate_detector_(),
dup_dectector_on_(false) {
assert(cf_mems_);
@ -1586,7 +1593,9 @@ class MemTableInserter : public WriteBatch::Handler {
}
}
Status MarkBeginPrepare() override {
// The write batch handler calls MarkBeginPrepare with unprepare set to true
// if it encounters the kTypeBeginUnprepareXID marker.
Status MarkBeginPrepare(bool unprepare) override {
assert(rebuilding_trx_ == nullptr);
assert(db_);
@ -1602,6 +1611,11 @@ class MemTableInserter : public WriteBatch::Handler {
// we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch();
rebuilding_trx_seq_ = sequence_;
// We only call MarkBeginPrepare once per batch, and unprepared_batch_
// is initialized to false by default.
assert(!unprepared_batch_);
unprepared_batch_ = unprepare;
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
@ -1622,7 +1636,7 @@ class MemTableInserter : public WriteBatch::Handler {
: static_cast<size_t>(sequence_ - rebuilding_trx_seq_ + 1);
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_, rebuilding_trx_seq_,
batch_cnt);
batch_cnt, unprepared_batch_);
rebuilding_trx_ = nullptr;
} else {
assert(rebuilding_trx_ == nullptr);
@ -1665,9 +1679,12 @@ class MemTableInserter : public WriteBatch::Handler {
// duplicate re-insertion of values.
assert(log_number_ref_ == 0);
if (write_after_commit_) {
// write_after_commit_ can only have one batch in trx.
assert(trx->batches_.size() == 1);
const auto& batch_info = trx->batches_.begin()->second;
// all inserts must reference this trx log number
log_number_ref_ = trx->log_number_;
s = trx->batch_->Iterate(this);
log_number_ref_ = batch_info.log_number_;
s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0;
}
// else the values are already inserted before the commit

@ -290,8 +290,9 @@ namespace {
virtual void LogData(const Slice& blob) override {
seen += "LogData(" + blob.ToString() + ")";
}
virtual Status MarkBeginPrepare() override {
seen += "MarkBeginPrepare()";
virtual Status MarkBeginPrepare(bool unprepare) override {
seen +=
"MarkBeginPrepare(" + std::string(unprepare ? "true" : "false") + ")";
return Status::OK();
}
virtual Status MarkEndPrepare(const Slice& xid) override {
@ -403,7 +404,7 @@ TEST_F(WriteBatchTest, PrepareCommit) {
TestHandler handler;
batch.Iterate(&handler);
ASSERT_EQ(
"MarkBeginPrepare()"
"MarkBeginPrepare(false)"
"Put(k1, v1)"
"Put(k2, v2)"
"MarkEndPrepare(xid1)"

@ -488,6 +488,7 @@ class Transaction {
private:
friend class PessimisticTransactionDB;
friend class WriteUnpreparedTxnDB;
// No copying allowed
Transaction(const Transaction&);
void operator=(const Transaction&);

@ -243,7 +243,7 @@ class WriteBatch : public WriteBatchBase {
// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);
virtual Status MarkBeginPrepare() {
virtual Status MarkBeginPrepare(bool = false) {
return Status::InvalidArgument("MarkBeginPrepare() handler not defined.");
}

@ -305,7 +305,8 @@ rocksdb::Status WriteBatchHandlerJniCallback::PutBlobIndexCF(uint32_t column_fam
}
}
rocksdb::Status WriteBatchHandlerJniCallback::MarkBeginPrepare() {
rocksdb::Status WriteBatchHandlerJniCallback::MarkBeginPrepare(bool unprepare) {
assert(!unprepare);
m_env->CallVoidMethod(m_jcallback_obj, m_jMarkBeginPrepareMethodId);
// check for Exception, in-particular RocksDBException

@ -42,8 +42,8 @@ class WriteBatchHandlerJniCallback : public JniCallback, public WriteBatch::Hand
void DeleteRange(const Slice& beginKey, const Slice& endKey);
void LogData(const Slice& blob);
Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value);
Status MarkBeginPrepare();
const Slice& value);
Status MarkBeginPrepare(bool);
Status MarkEndPrepare(const Slice& xid);
Status MarkNoop(bool empty_batch);
Status MarkRollback(const Slice& xid);

@ -1917,8 +1917,9 @@ class InMemoryHandler : public WriteBatch::Handler {
return Status::OK();
}
virtual Status MarkBeginPrepare() override {
row_ << "BEGIN_PREARE ";
virtual Status MarkBeginPrepare(bool unprepare) override {
row_ << "BEGIN_PREPARE(";
row_ << (unprepare ? "true" : "false") << ") ";
return Status::OK();
}

@ -124,7 +124,10 @@ Status PessimisticTransactionDB::Initialize(
for (auto it = rtrxs.begin(); it != rtrxs.end(); it++) {
auto recovered_trx = it->second;
assert(recovered_trx);
assert(recovered_trx->log_number_);
assert(recovered_trx->batches_.size() == 1);
const auto& seq = recovered_trx->batches_.begin()->first;
const auto& batch_info = recovered_trx->batches_.begin()->second;
assert(batch_info.log_number_);
assert(recovered_trx->name_.length());
WriteOptions w_options;
@ -133,21 +136,20 @@ Status PessimisticTransactionDB::Initialize(
Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
assert(real_trx);
real_trx->SetLogNumber(recovered_trx->log_number_);
assert(recovered_trx->seq_ != kMaxSequenceNumber);
real_trx->SetId(recovered_trx->seq_);
real_trx->SetLogNumber(batch_info.log_number_);
assert(seq != kMaxSequenceNumber);
real_trx->SetId(seq);
s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) {
break;
}
s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_);
s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
// WriteCommitted set this to to disable this check that is specific to
// WritePrepared txns
assert(recovered_trx->batch_cnt_ == 0 ||
real_trx->GetWriteBatch()->SubBatchCnt() ==
recovered_trx->batch_cnt_);
assert(batch_info.batch_cnt_ == 0 ||
real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
real_trx->SetState(Transaction::PREPARED);
if (!s.ok()) {
break;

@ -142,6 +142,7 @@ class PessimisticTransactionDB : public TransactionDB {
friend class TransactionTest_TwoPhaseLongPrepareTest_Test;
friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test;
TransactionLockMgr lock_mgr_;
// Must be held when adding/dropping column families.

@ -699,7 +699,7 @@ Status TransactionBaseImpl::RebuildFromWriteBatch(WriteBatch* src_batch) {
// this is used for reconstructing prepared transactions upon
// recovery. there should not be any meta markers in the batches
// we are processing.
Status MarkBeginPrepare() override { return Status::InvalidArgument(); }
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();

@ -294,7 +294,7 @@ Status WritePreparedTxn::RollbackInternal() {
}
Status MarkNoop(bool) override { return Status::OK(); }
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkRollback(const Slice&) override {

@ -71,6 +71,7 @@ class WritePreparedTxn : public PessimisticTransaction {
private:
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTxnDB;
friend class WriteUnpreparedTxnDB;
Status PrepareInternal() override;

@ -37,9 +37,13 @@ Status WritePreparedTxnDB::Initialize(
assert(dbimpl != nullptr);
auto rtxns = dbimpl->recovered_transactions();
for (auto rtxn : rtxns) {
auto cnt = rtxn.second->batch_cnt_ ? rtxn.second->batch_cnt_ : 1;
// There should only one batch for WritePrepared policy.
assert(rtxn.second->batches_.size() == 1);
const auto& seq = rtxn.second->batches_.begin()->first;
const auto& batch_info = rtxn.second->batches_.begin()->second;
auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
for (size_t i = 0; i < cnt; i++) {
AddPrepared(rtxn.second->seq_ + i);
AddPrepared(seq + i);
}
}
SequenceNumber prev_max = max_evicted_seq_;

@ -383,6 +383,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test;
friend class WriteUnpreparedTxnDB;
friend class WriteUnpreparedTransactionTest_RecoveryRollbackUnprepared_Test;
void Init(const TransactionDBOptions& /* unused */);
@ -754,7 +756,7 @@ struct SubBatchCounter : public WriteBatch::Handler {
AddKey(cf, key);
return Status::OK();
}
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
bool WriteAfterCommit() const override { return false; }
};

@ -188,6 +188,111 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
delete txn;
}
TEST_P(WriteUnpreparedTransactionTest, RecoveryRollbackUnprepared) {
WriteOptions write_options;
write_options.disableWAL = false;
uint64_t seq_used = kMaxSequenceNumber;
uint64_t log_number;
WriteBatch batch;
std::vector<Transaction*> prepared_trans;
WriteUnpreparedTxnDB* wup_db;
options.disable_auto_compactions = true;
// Try unprepared batches with empty database.
for (int num_batches = 0; num_batches < 10; num_batches++) {
// Reset database.
prepared_trans.clear();
ReOpen();
wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
// Write num_batches unprepared batches into the WAL.
for (int i = 0; i < num_batches; i++) {
batch.Clear();
// TODO(lth): Instead of manually calling WriteImpl with a write batch,
// use methods on Transaction instead once it is implemented.
ASSERT_OK(WriteBatchInternal::InsertNoop(&batch));
ASSERT_OK(WriteBatchInternal::Put(&batch,
db->DefaultColumnFamily()->GetID(),
"k" + ToString(i), "value"));
// MarkEndPrepare will change the Noop marker into an unprepared marker.
ASSERT_OK(WriteBatchInternal::MarkEndPrepare(
&batch, Slice("xid1"), /* write after commit */ false,
/* unprepared batch */ true));
ASSERT_OK(wup_db->db_impl_->WriteImpl(
write_options, &batch, /*callback*/ nullptr, &log_number,
/*log ref*/ 0, /* disable memtable */ true, &seq_used,
/* prepare_batch_cnt_ */ 1));
}
// Crash and run recovery code paths.
wup_db->db_impl_->FlushWAL(true);
wup_db->TEST_Crash();
ReOpenNoDelete();
wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 0);
// Check that DB is empty.
Iterator* iter = db->NewIterator(ReadOptions());
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
delete iter;
}
// Try unprepared batches with non-empty database.
for (int num_batches = 1; num_batches < 10; num_batches++) {
// Reset database.
prepared_trans.clear();
ReOpen();
wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
for (int i = 0; i < num_batches; i++) {
ASSERT_OK(db->Put(WriteOptions(), "k" + ToString(i),
"before value " + ToString(i)));
}
// Write num_batches unprepared batches into the WAL.
for (int i = 0; i < num_batches; i++) {
batch.Clear();
// TODO(lth): Instead of manually calling WriteImpl with a write batch,
// use methods on Transaction instead once it is implemented.
ASSERT_OK(WriteBatchInternal::InsertNoop(&batch));
ASSERT_OK(WriteBatchInternal::Put(&batch,
db->DefaultColumnFamily()->GetID(),
"k" + ToString(i), "value"));
// MarkEndPrepare will change the Noop marker into an unprepared marker.
ASSERT_OK(WriteBatchInternal::MarkEndPrepare(
&batch, Slice("xid1"), /* write after commit */ false,
/* unprepared batch */ true));
ASSERT_OK(wup_db->db_impl_->WriteImpl(
write_options, &batch, /*callback*/ nullptr, &log_number,
/*log ref*/ 0, /* disable memtable */ true, &seq_used,
/* prepare_batch_cnt_ */ 1));
}
// Crash and run recovery code paths.
wup_db->db_impl_->FlushWAL(true);
wup_db->TEST_Crash();
ReOpenNoDelete();
wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
db->GetAllPreparedTransactions(&prepared_trans);
ASSERT_EQ(prepared_trans.size(), 0);
// Check that DB has before values.
Iterator* iter = db->NewIterator(ReadOptions());
iter->SeekToFirst();
for (int i = 0; i < num_batches; i++) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "k" + ToString(i));
ASSERT_EQ(iter->value().ToString(), "before value " + ToString(i));
iter->Next();
}
ASSERT_FALSE(iter->Valid());
delete iter;
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -60,6 +60,7 @@ class WriteUnpreparedTxn : public WritePreparedTxn {
private:
friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
friend class WriteUnpreparedTxnDB;
WriteUnpreparedTxnDB* wupt_db_;

@ -15,6 +15,293 @@
namespace rocksdb {
// Instead of reconstructing a Transaction object, and calling rollback on it,
// we can be more efficient with RollbackRecoveredTransaction by skipping
// unnecessary steps (eg. updating CommitMap, reconstructing keyset)
Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
const DBImpl::RecoveredTransaction* rtxn) {
// TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
assert(rtxn->unprepared_);
auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
const bool kRollbackMergeOperands = true;
WriteOptions w_options;
// If we crash during recovery, we can just recalculate and rewrite the
// rollback batch.
w_options.disableWAL = true;
// Iterate starting with largest sequence number.
for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); it++) {
auto last_visible_txn = it->first - 1;
const auto& batch = it->second.batch_;
WriteBatch rollback_batch;
struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_;
ReadOptions roptions;
WritePreparedTxnReadCallback callback;
WriteBatch* rollback_batch_;
std::map<uint32_t, const Comparator*>& comparators_;
std::map<uint32_t, ColumnFamilyHandle*>& handles_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
bool rollback_merge_operands_;
RollbackWriteBatchBuilder(
DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
WriteBatch* dst_batch,
std::map<uint32_t, const Comparator*>& comparators,
std::map<uint32_t, ColumnFamilyHandle*>& handles,
bool rollback_merge_operands)
: db_(db),
callback(wpt_db, snap_seq,
0), // 0 disables min_uncommitted optimization
rollback_batch_(dst_batch),
comparators_(comparators),
handles_(handles),
rollback_merge_operands_(rollback_merge_operands) {}
Status Rollback(uint32_t cf, const Slice& key) {
Status s;
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto res = cf_keys.insert(key);
if (res.second ==
false) { // second is false if a element already existed.
return s;
}
PinnableSlice pinnable_val;
bool not_used;
auto cf_handle = handles_[cf];
s = db_->GetImpl(roptions, cf_handle, key, &pinnable_val, &not_used,
&callback);
assert(s.ok() || s.IsNotFound());
if (s.ok()) {
s = rollback_batch_->Put(cf_handle, key, pinnable_val);
assert(s.ok());
} else if (s.IsNotFound()) {
// There has been no readable value before txn. By adding a delete we
// make sure that there will be none afterwards either.
s = rollback_batch_->Delete(cf_handle, key);
assert(s.ok());
} else {
// Unexpected status. Return it to the user.
}
return s;
}
Status PutCF(uint32_t cf, const Slice& key,
const Slice& /*val*/) override {
return Rollback(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return Rollback(cf, key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return Rollback(cf, key);
}
Status MergeCF(uint32_t cf, const Slice& key,
const Slice& /*val*/) override {
if (rollback_merge_operands_) {
return Rollback(cf, key);
} else {
return Status::OK();
}
}
// Recovered batches do not contain 2PC markers.
Status MarkNoop(bool) override { return Status::InvalidArgument(); }
Status MarkBeginPrepare(bool) override {
return Status::InvalidArgument();
}
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkCommit(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
} rollback_handler(db_impl_, this, last_visible_txn, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
!kRollbackMergeOperands);
auto s = batch->Iterate(&rollback_handler);
if (!s.ok()) {
return s;
}
// The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
const uint64_t kNoLogRef = 0;
const bool kDisableMemtable = true;
const size_t kOneBatch = 1;
uint64_t seq_used = kMaxSequenceNumber;
s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch);
if (!s.ok()) {
return s;
}
// If two_write_queues, we must manually release the sequence number to
// readers.
if (db_impl_->immutable_db_options().two_write_queues) {
db_impl_->SetLastPublishedSequence(seq_used);
}
}
return Status::OK();
}
Status WriteUnpreparedTxnDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
// TODO(lth): Reduce code duplication in this function.
auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
assert(dbimpl != nullptr);
db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
// A callback to commit a single sub-batch
class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
public:
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
: db_(db) {}
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
assert(!is_mem_disabled);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
};
db_impl_->SetRecoverableStatePreReleaseCallback(
new CommitSubBatchPreReleaseCallback(this));
// PessimisticTransactionDB::Initialize
for (auto cf_ptr : handles) {
AddColumnFamily(cf_ptr);
}
// Verify cf options
for (auto handle : handles) {
ColumnFamilyDescriptor cfd;
Status s = handle->GetDescriptor(&cfd);
if (!s.ok()) {
return s;
}
s = VerifyCFOptions(cfd.options);
if (!s.ok()) {
return s;
}
}
// Re-enable compaction for the column families that initially had
// compaction enabled.
std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
for (auto index : compaction_enabled_cf_indices) {
compaction_enabled_cf_handles.push_back(handles[index]);
}
Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
if (!s.ok()) {
return s;
}
// create 'real' transactions from recovered shell transactions
auto rtxns = dbimpl->recovered_transactions();
for (auto rtxn : rtxns) {
auto recovered_trx = rtxn.second;
assert(recovered_trx);
assert(recovered_trx->batches_.size() >= 1);
assert(recovered_trx->name_.length());
// We can only rollback transactions after AdvanceMaxEvictedSeq is called,
// but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
// two iterations is required.
if (recovered_trx->unprepared_) {
continue;
}
WriteOptions w_options;
w_options.sync = true;
TransactionOptions t_options;
auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
auto last_seq = recovered_trx->batches_.rbegin()->first;
auto last_prepare_batch_cnt =
recovered_trx->batches_.begin()->second.batch_cnt_;
Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
assert(real_trx);
auto wupt =
static_cast_with_check<WriteUnpreparedTxn, Transaction>(real_trx);
real_trx->SetLogNumber(first_log_number);
real_trx->SetId(last_seq);
s = real_trx->SetName(recovered_trx->name_);
if (!s.ok()) {
break;
}
wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
for (auto batch : recovered_trx->batches_) {
const auto& seq = batch.first;
const auto& batch_info = batch.second;
auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
assert(batch_info.log_number_);
for (size_t i = 0; i < cnt; i++) {
AddPrepared(seq + i);
}
assert(wupt->unprep_seqs_.count(seq) == 0);
wupt->unprep_seqs_[seq] = cnt;
}
wupt->write_batch_.Clear();
WriteBatchInternal::InsertNoop(wupt->write_batch_.GetWriteBatch());
real_trx->SetState(Transaction::PREPARED);
if (!s.ok()) {
break;
}
}
SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
AdvanceMaxEvictedSeq(prev_max, last_seq);
// Rollback unprepared transactions.
for (auto rtxn : rtxns) {
auto recovered_trx = rtxn.second;
if (recovered_trx->unprepared_) {
s = RollbackRecoveredTransaction(recovered_trx);
if (!s.ok()) {
return s;
}
continue;
}
}
if (s.ok()) {
dbimpl->DeleteAllRecoveredTransactions();
}
return s;
}
Transaction* WriteUnpreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {

@ -21,7 +21,11 @@ class WriteUnpreparedTxnDB : public WritePreparedTxnDB {
public:
using WritePreparedTxnDB::WritePreparedTxnDB;
Transaction* BeginTransaction(const WriteOptions& write_options, const TransactionOptions& txn_options,
Status Initialize(const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) override;
Transaction* BeginTransaction(const WriteOptions& write_options,
const TransactionOptions& txn_options,
Transaction* old_txn) override;
// Struct to hold ownership of snapshot and read callback for cleanup.
@ -31,6 +35,9 @@ class WriteUnpreparedTxnDB : public WritePreparedTxnDB {
Iterator* NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family,
WriteUnpreparedTxn* txn);
private:
Status RollbackRecoveredTransaction(const DBImpl::RecoveredTransaction* rtxn);
};
} // namespace rocksdb

Loading…
Cancel
Save