From c5f3707d42285fd625546ad5fdb51e627f589b78 Mon Sep 17 00:00:00 2001 From: agiardullo Date: Fri, 9 Oct 2015 13:31:10 -0700 Subject: [PATCH] DisableIndexing() for Transactions Summary: MyRocks reported some perfomance issues when inserting many keys into a transaction due to the cost of inserting new keys into WriteBatchWithIndex. Frequently, they don't even need the keys to be indexed as they don't need to read them back. DisableIndexing() can be used to avoid the cost of indexing. I also plan on eventually investigating if we can improve WriteBatchWithIndex performance. But even if we improved the perf here, it is still beneficial to be able to disable the indexing all together for large transactions. Test Plan: unit test Reviewers: igor, rven, yoshinorim, spetrunia, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D48471 --- include/rocksdb/utilities/transaction.h | 15 +++++ utilities/transactions/transaction_base.cc | 38 ++++++++---- utilities/transactions/transaction_base.h | 12 ++++ utilities/transactions/transaction_db_impl.cc | 4 ++ utilities/transactions/transaction_test.cc | 62 +++++++++++++++++++ 5 files changed, 119 insertions(+), 12 deletions(-) diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 6c2640a8e..a625f5000 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -263,6 +263,21 @@ class Transaction { // Similar to WriteBatch::PutLogData virtual void PutLogData(const Slice& blob) = 0; + // By default, all Put/Merge/Delete operations will be indexed in the + // transaction so that Get/GetForUpdate/GetIterator can search for these + // keys. + // + // If the caller does not want to fetch the keys about to be written, + // they may want to avoid indexing as a performance optimization. + // Calling DisableIndexing() will turn off indexing for all future + // Put/Merge/Delete operations until EnableIndexing() is called. + // + // If a key is Put/Merge/Deleted after DisableIndexing is called and then + // is fetched via Get/GetForUpdate/GetIterator, the result of the fetch is + // undefined. + virtual void DisableIndexing() = 0; + virtual void EnableIndexing() = 0; + // Returns the number of distinct Keys being tracked by this transaction. // If this transaction was created by a TransactinDB, this is the number of // keys that are currently locked by this transaction. diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index dc9167735..37f8c75db 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -179,7 +179,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -192,7 +192,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -204,7 +204,7 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Merge(column_family, key, value); + GetBatchForWrite()->Merge(column_family, key, value); num_merges_++; } @@ -216,7 +216,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -228,7 +228,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -240,7 +240,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->SingleDelete(column_family, key); + GetBatchForWrite()->SingleDelete(column_family, key); num_deletes_++; } @@ -252,7 +252,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->SingleDelete(column_family, key); + GetBatchForWrite()->SingleDelete(column_family, key); num_deletes_++; } @@ -265,7 +265,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -279,7 +279,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -293,7 +293,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Merge(column_family, key, value); + GetBatchForWrite()->Merge(column_family, key, value); num_merges_++; } @@ -306,7 +306,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -319,7 +319,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -380,6 +380,20 @@ const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() { return nullptr; } +// Gets the write batch that should be used for Put/Merge/Deletes. +// +// Returns either a WriteBatch or WriteBatchWithIndex depending on whether +// DisableIndexing() has been called. +WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { + if (indexing_enabled_) { + // Use WriteBatchWithIndex + return write_batch_.get(); + } else { + // Don't use WriteBatchWithIndex. Return base WriteBatch. + return write_batch_->GetWriteBatch(); + } +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 54ea56771..aac882eef 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -170,6 +170,10 @@ class TransactionBaseImpl : public Transaction { void SetSnapshot() override; + void DisableIndexing() override { indexing_enabled_ = false; } + + void EnableIndexing() override { indexing_enabled_ = true; } + uint64_t GetElapsedTime() const override; uint64_t GetNumPuts() const override; @@ -241,8 +245,16 @@ class TransactionBaseImpl : public Transaction { // Optimistic Transactions will wait till commit time to do conflict checking. TransactionKeyMap tracked_keys_; + // If true, future Put/Merge/Deletes will be indexed in the + // WriteBatchWithIndex. + // If false, future Put/Merge/Deletes will be inserted directly into the + // underlying WriteBatch and not indexed in the WriteBatchWithIndex. + bool indexing_enabled_ = true; + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool untracked = false); + + WriteBatchBase* GetBatchForWrite(); }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index edf15e3bb..42ded1576 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -181,6 +181,7 @@ Status TransactionDBImpl::Put(const WriteOptions& options, Status s; Transaction* txn = BeginInternalTransaction(options); + txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do PutUntracked(). @@ -201,6 +202,7 @@ Status TransactionDBImpl::Delete(const WriteOptions& wopts, Status s; Transaction* txn = BeginInternalTransaction(wopts); + txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do @@ -222,6 +224,7 @@ Status TransactionDBImpl::Merge(const WriteOptions& options, Status s; Transaction* txn = BeginInternalTransaction(options); + txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do @@ -241,6 +244,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { // Need to lock all keys in this batch to prevent write conflicts with // concurrent transactions. Transaction* txn = BeginInternalTransaction(opts); + txn->DisableIndexing(); assert(dynamic_cast(txn) != nullptr); auto txn_impl = reinterpret_cast(txn); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index dedc94c2e..fd6bb2a33 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1301,6 +1301,68 @@ TEST_F(TransactionTest, IteratorTest) { delete txn; } +TEST_F(TransactionTest, DisableIndexingTest) { + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + txn->DisableIndexing(); + + s = txn->Put("B", "b"); + ASSERT_OK(s); + + s = txn->Get(read_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + + Iterator* iter = txn->GetIterator(read_options); + ASSERT_OK(iter->status()); + + iter->Seek("B"); + ASSERT_OK(iter->status()); + ASSERT_FALSE(iter->Valid()); + + s = txn->Delete("A"); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + txn->EnableIndexing(); + + s = txn->Put("B", "bb"); + ASSERT_OK(s); + + iter->Seek("B"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bb", iter->value().ToString()); + + s = txn->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("bb", value); + + s = txn->Put("A", "aa"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("aa", value); + + delete iter; + delete txn; +} + TEST_F(TransactionTest, SavepointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options;