diff --git a/db/db_impl.cc b/db/db_impl.cc index e8cb95422..ce2b61e24 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1195,8 +1195,6 @@ Status DBImpl::Recover( // Note that prev_log_number() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of rocksdb. - const uint64_t min_log = versions_->MinLogNumber(); - const uint64_t prev_log = versions_->prev_log_number(); std::vector filenames; s = env_->GetChildren(db_options_.wal_dir, &filenames); if (!s.ok()) { @@ -1213,7 +1211,7 @@ Status DBImpl::Recover( "While creating a new Db, wal_dir contains " "existing log file: ", filenames[i]); - } else if ((number >= min_log) || (number == prev_log)) { + } else { logs.push_back(number); } } diff --git a/db/write_batch.cc b/db/write_batch.cc index 008724550..5ae031497 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -761,17 +761,17 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->Put(cf_mems_->GetColumnFamilyHandle(), key, value); - return Status::OK(); - } - MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); if (!moptions->inplace_update_support) { @@ -851,48 +851,50 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->Delete(cf_mems_->GetColumnFamilyHandle(), key); - return Status::OK(); - } - return DeleteImpl(column_family_id, key, kTypeDeletion); } virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->SingleDelete(cf_mems_->GetColumnFamilyHandle(), key); - return Status::OK(); - } - return DeleteImpl(column_family_id, key, kTypeSingleDeletion); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { assert(!concurrent_memtable_writes_); + if (rebuilding_trx_ != nullptr) { + WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); + return Status::OK(); + } + Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; return seek_status; } - if (rebuilding_trx_ != nullptr) { - rebuilding_trx_->Merge(cf_mems_->GetColumnFamilyHandle(), key, value); - return Status::OK(); - } + MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetMemTableOptions(); bool perform_merge = false; diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index f5be8526b..273565c2e 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -139,6 +139,7 @@ Status TransactionDB::Open( assert(recovered_trx->name_.length()); WriteOptions w_options; + w_options.sync = true; TransactionOptions t_options; Transaction* real_trx = diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index f6d5b797c..e10af66bd 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -682,6 +682,73 @@ TEST_F(TransactionTest, TwoPhaseSequenceTest) { ASSERT_EQ(value, "bar4"); } +TEST_F(TransactionTest, TwoPhaseDoubleRecoveryTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("a"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + delete txn; + + // kill and reopen + env_->SetFilesystemActive(false); + ReOpenNoDelete(); + + // commit old txn + txn = db->GetTransactionByName("a"); + s = txn->Commit(); + ASSERT_OK(s); + + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + delete txn; + + txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("b"); + ASSERT_OK(s); + + s = txn->Put(Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + + s = txn->Prepare(); + ASSERT_OK(s); + + s = txn->Commit(); + ASSERT_OK(s); + + // kill and reopen + env_->SetFilesystemActive(false); + ReOpenNoDelete(); + + // value is now available + s = db->Get(read_options, "foo", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar"); + + s = db->Get(read_options, "foo2", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar2"); +} + TEST_F(TransactionTest, TwoPhaseLogRollingTest) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB());