From 4e3c3d8c6a69ab3ebf91b7a7292dbb53cd49b9f9 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Thu, 5 Oct 2017 07:28:37 -0700 Subject: [PATCH] WritePrepared Txn: duplicate keys Summary: With WriteCommitted, when the write batch has duplicate keys, the txn db simply inserts them to the db with different seq numbers and let the db ignore/merge the duplicate values at the read time. With WritePrepared all the entries of the batch are inserted with the same seq number which prevents us from benefiting from this simple solution. This patch applies a hackish solution to unblock the end-to-end testing. The hack is to be replaced with a proper solution soon. The patch simply detects the duplicate key insertions, and mark the previous one as obsolete. Then before writing to the db it rewrites the batch eliminating the obsolete keys. This would incur a memcpy cost. Furthermore handing duplicate merge would require to do FullMerge instead of simply ignoring the previous value, which is not handled by this patch. Closes https://github.com/facebook/rocksdb/pull/2969 Differential Revision: D5976337 Pulled By: maysamyabandeh fbshipit-source-id: 114e65b66f137d8454ff2d1d782b8c05da95f989 --- .../utilities/write_batch_with_index.h | 8 ++ include/rocksdb/write_batch.h | 4 + .../pessimistic_transaction_db.cc | 2 + .../write_prepared_transaction_test.cc | 66 +++++++++++++++ utilities/transactions/write_prepared_txn.cc | 16 +++- .../write_batch_with_index.cc | 80 ++++++++++++++++++- 6 files changed, 173 insertions(+), 3 deletions(-) diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 96a9e5fb5..f49cb3888 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -228,6 +228,14 @@ class WriteBatchWithIndex : public WriteBatchBase { private: friend class WritePreparedTxn; + // TODO(myabandeh): this is hackish, non-efficient solution to enable the e2e + // unit tests. Replace it with a proper solution. Collapse the WriteBatch to + // remove the duplicate keys. The index will not be updated after this. + // Returns false if collapse was not necessary + bool Collapse(); + void DisableDuplicateMergeKeys() { allow_dup_merge_ = false; } + bool allow_dup_merge_ = true; + Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, ReadCallback* callback); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 52c26d09a..41f491b72 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -327,6 +327,10 @@ class WriteBatch : public WriteBatchBase { private: friend class WriteBatchInternal; friend class LocalSavePoint; + // TODO(myabandeh): this is needed for a hack to collapse the write batch and + // remove duplicate keys. Remove it when the hack is replaced with a propper + // solution. + friend class WriteBatchWithIndex; SavePoints* save_points_; // When sending a WriteBatch through WriteImpl we might want to diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 38191972d..95a2bc42e 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -465,6 +465,8 @@ Status PessimisticTransactionDB::Write(const WriteOptions& opts, // concurrent transactions. Transaction* txn = BeginInternalTransaction(opts); txn->DisableIndexing(); + // TODO(myabandeh): indexing being disabled we need another machanism to + // detect duplicattes in the input patch auto txn_impl = static_cast_with_check(txn); diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index da74e90a7..bbea84666 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1238,6 +1238,72 @@ TEST_P(WritePreparedTransactionTest, RollbackTest) { } } +// TODO(myabandeh): move it to transaction_test when it is extended to +// WROTE_PREPARED. + +// Test that the transactional db can handle duplicate keys in the write batch +TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) { + for (bool do_prepare : {true, false}) { + TransactionOptions txn_options; + WriteOptions write_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + auto s = txn0->SetName("xid"); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0a")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo0"), Slice("bar0b")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo1"), Slice("bar1")); + ASSERT_OK(s); + s = txn0->Merge(Slice("foo2"), Slice("bar2a")); + ASSERT_OK(s); + // TODO(myabandeh): enable this after duplicatae merge keys are supported + // s = txn0->Merge(Slice("foo2"), Slice("bar2a")); + // ASSERT_OK(s); + s = txn0->Put(Slice("foo2"), Slice("bar2b")); + ASSERT_OK(s); + s = txn0->Put(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + // TODO(myabandeh): enable this after duplicatae merge keys are supported + // s = txn0->Merge(Slice("foo3"), Slice("bar3")); + // ASSERT_OK(s); + s = txn0->Put(Slice("foo4"), Slice("bar4")); + ASSERT_OK(s); + s = txn0->Delete(Slice("foo4")); + ASSERT_OK(s); + s = txn0->SingleDelete(Slice("foo4")); + ASSERT_OK(s); + if (do_prepare) { + s = txn0->Prepare(); + ASSERT_OK(s); + } + s = txn0->Commit(); + ASSERT_OK(s); + if (!do_prepare) { + auto pdb = reinterpret_cast(db); + pdb->UnregisterTransaction(txn0); + } + delete txn0; + ReadOptions ropt; + PinnableSlice pinnable_val; + + s = db->Get(ropt, db->DefaultColumnFamily(), "foo0", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar0b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo1", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar1")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo2", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar2b")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo3", &pinnable_val); + ASSERT_OK(s); + ASSERT_TRUE(pinnable_val == ("bar3")); + s = db->Get(ropt, db->DefaultColumnFamily(), "foo4", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 86bc6fc62..3458ef5a8 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -27,6 +27,7 @@ WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db, : PessimisticTransaction(txn_db, write_options, txn_options), wpt_db_(txn_db) { PessimisticTransaction::Initialize(txn_options); + GetWriteBatch()->DisableDuplicateMergeKeys(); } Status WritePreparedTxn::Get(const ReadOptions& read_options, @@ -47,6 +48,11 @@ Status WritePreparedTxn::PrepareInternal() { WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); const bool disable_memtable = true; uint64_t seq_used = kMaxSequenceNumber; + bool collapsed = GetWriteBatch()->Collapse(); + if (collapsed) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Collapse overhead due to duplicate keys"); + } Status s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), /*callback*/ nullptr, &log_number_, /*log ref*/ 0, @@ -59,11 +65,17 @@ Status WritePreparedTxn::PrepareInternal() { } Status WritePreparedTxn::CommitWithoutPrepareInternal() { + bool collapsed = GetWriteBatch()->Collapse(); + if (collapsed) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Collapse overhead due to duplicate keys"); + } return CommitBatchInternal(GetWriteBatch()->GetWriteBatch()); } Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { - // In the absense of Prepare markers, use Noop as a batch separator + // TODO(myabandeh): handle the duplicate keys in the batch + // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(batch); const bool disable_memtable = true; const uint64_t no_log_ref = 0; @@ -112,7 +124,7 @@ Status WritePreparedTxn::RollbackInternal() { WriteBatch rollback_batch; assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); - // In the absense of Prepare markers, use Noop as a batch separator + // In the absence of Prepare markers, use Noop as a batch separator WriteBatchInternal::InsertNoop(&rollback_batch); // In WritePrepared, the txn is is the same as prepare seq auto last_visible_txn = GetId() - 1; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 44df259d5..d63f5f7b0 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -7,8 +7,8 @@ #include "rocksdb/utilities/write_batch_with_index.h" -#include #include +#include #include "db/column_family.h" #include "db/db_impl.h" @@ -399,6 +399,7 @@ struct WriteBatchWithIndex::Rep { WriteBatchEntrySkipList skip_list; bool overwrite_key; size_t last_entry_offset; + std::vector obsolete_offsets; // Remember current offset of internal write batch, which is used as // the starting offset of the next record. @@ -450,6 +451,7 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( } WriteBatchIndexEntry* non_const_entry = const_cast(iter.GetRawEntry()); + obsolete_offsets.push_back(non_const_entry->offset); non_const_entry->offset = last_entry_offset; return true; } @@ -576,6 +578,66 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } + bool WriteBatchWithIndex::Collapse() { + if (rep->obsolete_offsets.size() == 0) { + return false; + } + WriteBatch& write_batch = rep->write_batch; + assert(write_batch.Count() != 0); + size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); + Slice input(write_batch.Data()); + input.remove_prefix(offset); + std::string collapsed_buf; + collapsed_buf.resize(WriteBatchInternal::kHeader); + + size_t count = 0; + Status s; + // Loop through all entries in the write batch and add keep them if they are + // not obsolete by a newere entry. + while (s.ok() && !input.empty()) { + Slice key, value, blob, xid; + uint32_t column_family_id = 0; // default + char tag = 0; + // set offset of current entry for call to AddNewEntry() + size_t last_entry_offset = input.data() - write_batch.Data().data(); + s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, + &value, &blob, &xid); + if (rep->obsolete_offsets.front() == last_entry_offset) { + rep->obsolete_offsets.erase(rep->obsolete_offsets.begin()); + continue; + } + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + case kTypeColumnFamilySingleDeletion: + case kTypeSingleDeletion: + case kTypeColumnFamilyMerge: + case kTypeMerge: + count++; + break; + case kTypeLogData: + case kTypeBeginPrepareXID: + case kTypeEndPrepareXID: + case kTypeCommitXID: + case kTypeRollbackXID: + case kTypeNoop: + break; + default: + assert(0); + } + size_t entry_offset = input.data() - write_batch.Data().data(); + const std::string& wb_data = write_batch.Data(); + Slice entry_ptr = Slice(wb_data.data() + last_entry_offset, + entry_offset - last_entry_offset); + collapsed_buf.append(entry_ptr.data(), entry_ptr.size()); + } + write_batch.rep_ = std::move(collapsed_buf); + WriteBatchInternal::SetCount(&write_batch, static_cast(count)); + return true; + } + WBWIIterator* WriteBatchWithIndex::NewIterator() { return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); } @@ -689,7 +751,15 @@ Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(column_family, key, value); if (s.ok()) { + auto size_before = rep->obsolete_offsets.size(); rep->AddOrUpdateIndex(column_family, key); + auto size_after = rep->obsolete_offsets.size(); + bool duplicate_key = size_before != size_after; + if (!allow_dup_merge_ && duplicate_key) { + assert(0); + return Status::NotSupported( + "Duplicate key with merge value is not supported yet"); + } } return s; } @@ -698,7 +768,15 @@ Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(key, value); if (s.ok()) { + auto size_before = rep->obsolete_offsets.size(); rep->AddOrUpdateIndex(key); + auto size_after = rep->obsolete_offsets.size(); + bool duplicate_key = size_before != size_after; + if (!allow_dup_merge_ && duplicate_key) { + assert(0); + return Status::NotSupported( + "Duplicate key with merge value is not supported yet"); + } } return s; }