From 680864ae5465f485ab0e53bec9f8f102d63357c1 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 5 Mar 2018 10:48:29 -0800 Subject: [PATCH] WritePrepared Txn: Fix bug with duplicate keys during recovery Summary: Fix the following bugs: - During recovery a duplicate key was inserted twice into the write batch of the recovery transaction, once when the memtable returns false (because it was duplicates) and once for the 2nd attempt. This would result into different SubBatch count measured when the recovered transactions is committing. - If a cf is flushed during recovery the memtable is not available to assist in detecting the duplicate key. This could result into not advancing the sequence number when iterating over duplicate keys of a flushed cf and hence inserting the next key with the wrong sequence number. - SubBacthCounter would reset the comparator to default comparator after the first duplicate key. The 2nd duplicate key hence would have gone through a wrong comparator and not being detected. Closes https://github.com/facebook/rocksdb/pull/3562 Differential Revision: D7149440 Pulled By: maysamyabandeh fbshipit-source-id: 91ec317b165f363f5d11ff8b8c47c81cebb8ed77 --- db/db_impl.h | 18 +- db/write_batch.cc | 217 +++++++++++--- .../utilities/write_batch_with_index.h | 1 + .../pessimistic_transaction_db.cc | 5 + .../transactions/pessimistic_transaction_db.h | 1 + utilities/transactions/transaction_test.cc | 271 ++++++++++++++++-- utilities/transactions/transaction_test.h | 39 +++ .../transactions/write_prepared_txn_db.cc | 18 +- .../transactions/write_prepared_txn_db.h | 1 + 9 files changed, 497 insertions(+), 74 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index ff61577c4..3259189e5 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -550,9 +550,18 @@ class DBImpl : public DB { WriteBatch* batch_; // The seq number of the first key in the batch SequenceNumber seq_; + // Number of sub-batched. A new sub-batch is created if we txn attempts to + // inserts a duplicate key,seq to memtable. This is currently used in + // WritePrparedTxn + size_t batch_cnt_; explicit RecoveredTransaction(const uint64_t log, const std::string& name, - WriteBatch* batch, SequenceNumber seq) - : log_number_(log), name_(name), batch_(batch), seq_(seq) {} + WriteBatch* batch, SequenceNumber seq, + size_t batch_cnt) + : log_number_(log), + name_(name), + batch_(batch), + seq_(seq), + batch_cnt_(batch_cnt) {} ~RecoveredTransaction() { delete batch_; } }; @@ -574,9 +583,10 @@ class DBImpl : public DB { } void InsertRecoveredTransaction(const uint64_t log, const std::string& name, - WriteBatch* batch, SequenceNumber seq) { + WriteBatch* batch, SequenceNumber seq, + size_t batch_cnt) { recovered_transactions_[name] = - new RecoveredTransaction(log, name, batch, seq); + new RecoveredTransaction(log, name, batch, seq, batch_cnt); MarkLogAsContainingPrepSection(log); } diff --git a/db/write_batch.cc b/db/write_batch.cc index 4e257b319..ff4c1a96d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -978,6 +978,60 @@ Status WriteBatch::PopSavePoint() { return Status::OK(); } +// TODO(myabandeh): move it to util +namespace { +// During recovery if the memtable is flushed we cannot rely on its help on +// duplicate key detection and as key insert will not be attempted. This class +// will be used as a emulator of memtable to tell if insertion of a key/seq +// would have resulted in duplication. +class DuplicateDetector { + public: + explicit DuplicateDetector(DBImpl* db) : db_(db) {} + bool IsDuplicateKeySeq(uint32_t cf, const Slice& key, SequenceNumber seq) { + assert(seq >= batch_seq_); + if (batch_seq_ != seq) { // it is a new batch + keys_.clear(); + } + batch_seq_ = seq; + CFKeys& cf_keys = keys_[cf]; + if (cf_keys.size() == 0) { // just inserted + InitWithComp(cf); + } + auto it = cf_keys.insert(key); + if (it.second == false) { // second is false if a element already existed. + keys_.clear(); + InitWithComp(cf); + keys_[cf].insert(key); + return true; + } + return false; + } + + private: + SequenceNumber batch_seq_ = 0; + DBImpl* db_; + // A comparator to be used in std::set + struct SetComparator { + explicit SetComparator() : user_comparator_(BytewiseComparator()) {} + explicit SetComparator(const Comparator* user_comparator) + : user_comparator_(user_comparator ? user_comparator + : BytewiseComparator()) {} + bool operator()(const Slice& lhs, const Slice& rhs) const { + return user_comparator_->Compare(lhs, rhs) < 0; + } + + private: + const Comparator* user_comparator_; + }; + using CFKeys = std::set; + std::map keys_; + void InitWithComp(const uint32_t cf) { + auto cmp = db_->GetColumnFamilyHandle(cf)->GetComparator(); + keys_[cf] = CFKeys(SetComparator(cmp)); + } +}; +} // anonymous namespace + class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; @@ -1008,6 +1062,7 @@ class MemTableInserter : public WriteBatch::Handler { bool seq_per_batch_; // Whether the memtable write will be done only after the commit bool write_after_commit_; + DuplicateDetector duplicate_detector_; MemPostInfoMap& GetPostMap() { assert(concurrent_memtable_writes_); @@ -1045,7 +1100,8 @@ class MemTableInserter : public WriteBatch::Handler { // Write after commit currently uses one seq per key (instead of per // batch). So seq_per_batch being false indicates write_after_commit // approach. - write_after_commit_(!seq_per_batch) { + write_after_commit_(!seq_per_batch), + duplicate_detector_(db_) { assert(cf_mems_); } @@ -1135,17 +1191,25 @@ class MemTableInserter : public WriteBatch::Handler { Status PutCFImpl(uint32_t column_family_id, const Slice& key, const Slice& value, ValueType value_type) { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } Status ret_status; @@ -1215,6 +1279,13 @@ class MemTableInserter : public WriteBatch::Handler { } } } + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); + } // Since all Puts are logged in trasaction logs (if enabled), always bump // sequence number. Even if the update eventually fails and does not result // in memtable add/update. @@ -1248,57 +1319,102 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } - return DeleteImpl(column_family_id, key, Slice(), kTypeDeletion); + auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion); + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); + } + return ret_status; } virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, + key); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } - return DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion); + auto ret_status = + DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion); + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); + } + return ret_status; } virtual Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, const Slice& end_key) override { - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, begin_key, end_key); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, + begin_key, end_key); + // TODO(myabandeh): when transctional DeleteRange support is added, + // check if end_key must also be added. + batch_boundry = duplicate_detector_.IsDuplicateKeySeq( + column_family_id, begin_key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } if (db_ != nullptr) { @@ -1315,23 +1431,42 @@ class MemTableInserter : public WriteBatch::Handler { } } - return DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion); + auto ret_status = + DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion); + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, + begin_key, end_key); + } + return ret_status; } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { assert(!concurrent_memtable_writes_); - if (rebuilding_trx_ != nullptr) { + // optimize for non-recovery mode + if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); - if (write_after_commit_) { - return Status::OK(); - } + return Status::OK(); // else insert the values to the memtable right away } Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - MaybeAdvanceSeq(); + if (UNLIKELY(!SeekToColumnFamily(column_family_id, &seek_status))) { + bool batch_boundry = false; + if (rebuilding_trx_ != nullptr) { + assert(!write_after_commit_); + // The CF is probabely flushed and hence no need for insert but we still + // need to keep track of the keys for upcoming rollback/commit. + WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, + value); + batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, + key, sequence_); + } + MaybeAdvanceSeq(batch_boundry); return seek_status; } @@ -1412,6 +1547,13 @@ class MemTableInserter : public WriteBatch::Handler { } } + // optimize for non-recovery mode + if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + assert(!write_after_commit_); + // If the ret_status is TryAgain then let the next try to add the ky to + // the the rebuilding transaction object. + WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); + } MaybeAdvanceSeq(); CheckMemtableFull(); return ret_status; @@ -1466,8 +1608,13 @@ class MemTableInserter : public WriteBatch::Handler { if (recovering_log_number_ != 0) { assert(db_->allow_2pc()); + size_t batch_cnt = + write_after_commit_ + ? 0 // 0 will disable further checks + : static_cast(sequence_ - rebuilding_trx_seq_ + 1); db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(), - rebuilding_trx_, rebuilding_trx_seq_); + rebuilding_trx_, rebuilding_trx_seq_, + batch_cnt); rebuilding_trx_ = nullptr; } else { assert(rebuilding_trx_ == nullptr); diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index a0c9635da..0ce166890 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -227,6 +227,7 @@ class WriteBatchWithIndex : public WriteBatchBase { void SetMaxBytes(size_t max_bytes) override; private: + friend class PessimisticTransactionDB; friend class WritePreparedTxn; friend class WriteBatchWithIndex_SubBatchCnt_Test; // Returns the number of sub-batches inside the write batch. A sub-batch diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index cd75209f1..a3b3a622a 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -142,6 +142,11 @@ Status PessimisticTransactionDB::Initialize( } s = real_trx->RebuildFromWriteBatch(recovered_trx->batch_); + // WriteCommitted set this to to disable this check that is specific to + // WritePrepared txns + assert(recovered_trx->batch_cnt_ == 0 || + real_trx->GetWriteBatch()->SubBatchCnt() == + recovered_trx->batch_cnt_); real_trx->SetState(Transaction::PREPARED); if (!s.ok()) { break; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 1386b5c22..af4114cc2 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -135,6 +135,7 @@ class PessimisticTransactionDB : public TransactionDB { friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; friend class TransactionTest_DoubleEmptyWrite_Test; + friend class TransactionTest_DuplicateKeys_Test; friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; friend class TransactionTest_TwoPhaseLongPrepareTest_Test; friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 2015d314c..9d9d06082 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -5051,6 +5051,36 @@ TEST_P(TransactionTest, Optimizations) { } } +// A comparator that uses only the first three bytes +class ThreeBytewiseComparator : public Comparator { + public: + ThreeBytewiseComparator() {} + virtual const char* Name() const override { + return "test.ThreeBytewiseComparator"; + } + virtual int Compare(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na.compare(nb); + } + virtual bool Equal(const Slice& a, const Slice& b) const override { + Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); + Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); + return na == nb; + } + // This methods below dont seem relevant to this test. Implement them if + // proven othersize. + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortestSeparator(start, limit); + } + void FindShortSuccessor(std::string* key) const override { + const Comparator* bytewise_comp = BytewiseComparator(); + bytewise_comp->FindShortSuccessor(key); + } +}; + // Test that the transactional db can handle duplicate keys in the write batch TEST_P(TransactionTest, DuplicateKeys) { ColumnFamilyOptions cf_options; @@ -5090,35 +5120,6 @@ TEST_P(TransactionTest, DuplicateKeys) { // Test with non-bytewise comparator { - // A comparator that uses only the first three bytes - class ThreeBytewiseComparator : public Comparator { - public: - ThreeBytewiseComparator() {} - virtual const char* Name() const override { - return "test.ThreeBytewiseComparator"; - } - virtual int Compare(const Slice& a, const Slice& b) const override { - Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); - Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); - return na.compare(nb); - } - virtual bool Equal(const Slice& a, const Slice& b) const override { - Slice na = Slice(a.data(), a.size() < 3 ? a.size() : 3); - Slice nb = Slice(b.data(), b.size() < 3 ? b.size() : 3); - return na == nb; - } - // This methods below dont seem relevant to this test. Implement them if - // proven othersize. - void FindShortestSeparator(std::string* start, - const Slice& limit) const override { - const Comparator* bytewise_comp = BytewiseComparator(); - bytewise_comp->FindShortestSeparator(start, limit); - } - void FindShortSuccessor(std::string* key) const override { - const Comparator* bytewise_comp = BytewiseComparator(); - bytewise_comp->FindShortSuccessor(key); - } - }; ReOpen(); std::unique_ptr comp_gc(new ThreeBytewiseComparator()); cf_options.comparator = comp_gc.get(); @@ -5128,6 +5129,8 @@ TEST_P(TransactionTest, DuplicateKeys) { batch.Put(cf_handle, Slice("key"), Slice("value")); // The first three bytes are the same, do it must be counted as duplicate batch.Put(cf_handle, Slice("key2"), Slice("value2")); + // check for 2nd duplicate key in cf with non-default comparator + batch.Put(cf_handle, Slice("key2b"), Slice("value2b")); ASSERT_OK(db->Write(write_options, &batch)); // The value must be the most recent value for all the keys equal to "key", @@ -5135,7 +5138,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ReadOptions ropt; PinnableSlice pinnable_val; ASSERT_OK(db->Get(ropt, cf_handle, "key", &pinnable_val)); - ASSERT_TRUE(pinnable_val == ("value2")); + ASSERT_TRUE(pinnable_val == ("value2b")); // Test duplicate keys with rollback TransactionOptions txn_options; @@ -5145,7 +5148,7 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Merge(cf_handle, Slice("key4"), Slice("value4"))); ASSERT_OK(txn0->Rollback()); ASSERT_OK(db->Get(ropt, cf_handle, "key5", &pinnable_val)); - ASSERT_TRUE(pinnable_val == ("value2")); + ASSERT_TRUE(pinnable_val == ("value2b")); delete txn0; delete cf_handle; @@ -5321,6 +5324,212 @@ TEST_P(TransactionTest, DuplicateKeys) { ASSERT_OK(txn0->Commit()); delete txn0; } + + // Test sucessfull recovery after a crash + { + ReOpen(); + TransactionOptions txn_options; + WriteOptions write_options; + ReadOptions ropt; + Transaction* txn0; + PinnableSlice pinnable_val; + Status s; + + std::unique_ptr comp_gc(new ThreeBytewiseComparator()); + cf_options.comparator = comp_gc.get(); + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + delete cf_handle; + std::vector cfds{ + ColumnFamilyDescriptor(kDefaultColumnFamilyName, + ColumnFamilyOptions(options)), + ColumnFamilyDescriptor(cf_name, cf_options), + }; + std::vector handles; + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + + ASSERT_OK(db->Put(write_options, "foo0", "init")); + ASSERT_OK(db->Put(write_options, "foo1", "init")); + ASSERT_OK(db->Put(write_options, handles[1], "foo0", "init")); + ASSERT_OK(db->Put(write_options, handles[1], "foo1", "init")); + + // one entry + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0a")); + + // two entries, no duplicate + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("foo0"), Slice("bar0b"))); + ASSERT_OK(txn0->Put(handles[1], Slice("fol1"), Slice("bar1b"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0b"))); + ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1b"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1b")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "fol1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1b")); + + // one duplicate with ::Put + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0c"))); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey1"), Slice("bar1d"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0c"))); + ASSERT_OK(txn0->Put(Slice("foo1"), Slice("bar1c"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0d"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0d")); + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1c")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1d")); + + // Duplicate with ::Put, ::Delete + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0e"))); + ASSERT_OK(txn0->Delete(handles[1], Slice("key-nonkey1"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e"))); + ASSERT_OK(txn0->Delete(Slice("foo0"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + + // Duplicate with ::Put, ::SingleDelete + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar0g"))); + ASSERT_OK(txn0->SingleDelete(handles[1], Slice("key-nonkey1"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0e"))); + ASSERT_OK(txn0->SingleDelete(Slice("foo0"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + + // Duplicate with ::Put, ::Merge + txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(handles[1], Slice("key-nonkey0"), Slice("bar1i"))); + ASSERT_OK(txn0->Merge(handles[1], Slice("key-nonkey1"), Slice("bar1j"))); + ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0f"))); + ASSERT_OK(txn0->Merge(Slice("foo0"), Slice("bar0g"))); + ASSERT_OK(txn0->Prepare()); + delete txn0; + // This will check the asserts inside recovery code + db->FlushWAL(true); + // Flush only cf 1 + reinterpret_cast(db->GetRootDB()) + ->TEST_FlushMemTable(true, handles[1]); + reinterpret_cast(db)->TEST_Crash(); + ASSERT_OK(ReOpenNoDelete(cfds, &handles)); + txn0 = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn0 != nullptr); + ASSERT_OK(txn0->Commit()); + delete txn0; + pinnable_val.Reset(); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0f,bar0g")); + pinnable_val.Reset(); + s = db->Get(ropt, handles[1], "key-nonkey2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1i,bar1j")); + + for (auto h : handles) { + delete h; + } + delete db; + db = nullptr; + } } } // namespace rocksdb diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 950af6c13..beec0df40 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -101,6 +101,27 @@ class TransactionTestBase : public ::testing::Test { return s; } + Status ReOpenNoDelete(std::vector& cfs, + std::vector* handles) { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete db; + db = nullptr; + env->AssertNoOpenFile(); + env->DropUnsyncedFileData(); + env->ResetState(); + Status s; + if (use_stackable_db_ == false) { + s = TransactionDB::Open(options, txn_db_options, dbname, cfs, handles, + &db); + } else { + s = OpenWithStackableDB(cfs, handles); + } + return s; + } + Status ReOpen() { delete db; DestroyDB(dbname, options); @@ -113,6 +134,24 @@ class TransactionTestBase : public ::testing::Test { return s; } + Status OpenWithStackableDB(std::vector& cfs, + std::vector* handles) { + std::vector compaction_enabled_cf_indices; + TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices); + DB* root_db; + Options options_copy(options); + const bool use_seq_per_batch = + txn_db_options.write_policy == WRITE_PREPARED; + Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db, + use_seq_per_batch); + if (s.ok()) { + s = TransactionDB::WrapStackableDB( + new StackableDB(root_db), txn_db_options, + compaction_enabled_cf_indices, *handles, &db); + } + return s; + } + Status OpenWithStackableDB() { std::vector compaction_enabled_cf_indices; std::vector column_families{ColumnFamilyDescriptor( diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index d91bdccaa..accc75338 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -475,7 +475,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, CommitEntry64b evicted_64b; CommitEntry evicted; bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); - if (to_be_evicted) { + if (LIKELY(to_be_evicted)) { + assert(evicted.prep_seq != prepare_seq); auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); ROCKS_LOG_DETAILS(info_log_, "Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64, @@ -491,7 +492,11 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, } bool succ = ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); - if (!succ) { + if (UNLIKELY(!succ)) { + ROCKS_LOG_ERROR(info_log_, + "ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64 + ",%" PRIu64 " retrying...", + indexed_seq, prepare_seq, commit_seq); // A very rare event, in which the commit entry is updated before we do. // Here we apply a very simple solution of retrying. if (loop_cnt > 100) { @@ -783,16 +788,21 @@ WritePreparedTxnDB::~WritePreparedTxnDB() { db_impl_->CancelAllBackgroundWork(true /*wait*/); } +void SubBatchCounter::InitWithComp(const uint32_t cf) { + auto cmp = comparators_[cf]; + keys_[cf] = CFKeys(SetComparator(cmp)); +} + void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) { CFKeys& cf_keys = keys_[cf]; if (cf_keys.size() == 0) { // just inserted - auto cmp = comparators_[cf]; - keys_[cf] = CFKeys(SetComparator(cmp)); + InitWithComp(cf); } auto it = cf_keys.insert(key); if (it.second == false) { // second is false if a element already existed. batches_++; keys_.clear(); + InitWithComp(cf); keys_[cf].insert(key); } } diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 63b66a753..04f66bf48 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -541,6 +541,7 @@ struct SubBatchCounter : public WriteBatch::Handler { size_t batches_; size_t BatchCount() { return batches_; } void AddKey(const uint32_t cf, const Slice& key); + void InitWithComp(const uint32_t cf); Status MarkNoop(bool) override { return Status::OK(); } Status MarkEndPrepare(const Slice&) override { return Status::OK(); } Status MarkCommit(const Slice&) override { return Status::OK(); }