From c27061dae763621bdcfb5cc0d0118f57269825b8 Mon Sep 17 00:00:00 2001 From: Reid Horuff Date: Thu, 5 May 2016 12:48:52 -0700 Subject: [PATCH] [rocksdb] 2PC double recovery bug fix Summary: 1. prepare() 2. crash 3. recover 4. commit() 5. crash 6. data is lost This is due to the transaction data still only residing in the WAL but because the logs were flushed on the first recovery the data is ignored on the second recovery. We must scan all logs found on recovery and only ignore redundant data at the time of replay. It is not possible to know which logs still contain relevant data at time of recovery. We cannot simply ignore a log because all of the non-2pc data it contains has already been written to L0. The changes made to MemTableInserter are to ensure that prepared sections are still recovered even if all of the non-2pc data in that log has already been flushed to L0. Test Plan: Provided test. Reviewers: sdong Subscribers: andrewkr, hermanlee4, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D57729 --- db/db_impl.cc | 4 +- db/write_batch.cc | 40 +++++------ utilities/transactions/transaction_db_impl.cc | 1 + utilities/transactions/transaction_test.cc | 67 +++++++++++++++++++ 4 files changed, 90 insertions(+), 22 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index e8cb95422..ce2b61e24 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1195,8 +1195,6 @@ Status DBImpl::Recover( // Note that prev_log_number() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of rocksdb. - const uint64_t min_log = versions_->MinLogNumber(); - const uint64_t prev_log = versions_->prev_log_number(); std::vector filenames; s = env_->GetChildren(db_options_.wal_dir, &filenames); if (!s.ok()) { @@ -1213,7 +1211,7 @@ Status DBImpl::Recover( "While creating a new Db, wal_dir contains " "existing log file: ", filenames[i]); - } else if ((number >= min_log) || (number == prev_log)) { + } else { logs.push_back(number); } } diff --git a/db/write_batch.cc b/db/write_batch.cc index 008724550..5ae031497 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -761,17 +761,17 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->Put(cf_mems_->GetColumnFamilyHandle(), key, value); - return Status::OK(); - } - MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { @@ -851,48 +851,50 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->Delete(cf_mems_->GetColumnFamilyHandle(), key); - return Status::OK(); - } - return DeleteImpl(column_family_id, key, kTypeDeletion); } virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->SingleDelete(cf_mems_->GetColumnFamilyHandle(), key); - return Status::OK(); - } - return DeleteImpl(column_family_id, key, kTypeSingleDeletion); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { assert(!concurrent_memtable_writes_); + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->Merge(cf_mems_->GetColumnFamilyHandle(), key, value); - return Status::OK(); - } + MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); bool perform_merge = false; diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index f5be8526b..273565c2e 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -139,6 +139,7 @@ Status TransactionDB::Open( assert(recovered_trx->name_.length()); WriteOptions w_options; + w_options.sync = true; TransactionOptions t_options; Transaction* real_trx = diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index f6d5b797c..e10af66bd 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -682,6 +682,73 @@ TEST_F(TransactionTest, TwoPhaseSequenceTest) { ASSERT_EQ(value, "bar4"); } +TEST_F(TransactionTest, TwoPhaseDoubleRecoveryTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("a"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + delete txn; + + // kill and reopen + env_->SetFilesystemActive(false); + ReOpenNoDelete(); + + // commit old txn + txn = db->GetTransactionByName("a"); + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + delete txn; + + txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("b"); + ASSERT_OK(s); + + s = txn->Put(Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + + s = txn->Prepare(); + ASSERT_OK(s); + + s = txn->Commit(); + ASSERT_OK(s); + + // kill and reopen + env_->SetFilesystemActive(false); + ReOpenNoDelete(); + + // value is now available + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + s = db->Get(read_options, "foo2", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar2"); +} + TEST_F(TransactionTest, TwoPhaseLogRollingTest) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB());