diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 6c2640a8e..a625f5000 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -263,6 +263,21 @@ class Transaction { // Similar to WriteBatch::PutLogData virtual void PutLogData(const Slice& blob) = 0; + // By default, all Put/Merge/Delete operations will be indexed in the + // transaction so that Get/GetForUpdate/GetIterator can search for these + // keys. + // + // If the caller does not want to fetch the keys about to be written, + // they may want to avoid indexing as a performance optimization. + // Calling DisableIndexing() will turn off indexing for all future + // Put/Merge/Delete operations until EnableIndexing() is called. + // + // If a key is Put/Merge/Deleted after DisableIndexing is called and then + // is fetched via Get/GetForUpdate/GetIterator, the result of the fetch is + // undefined. + virtual void DisableIndexing() = 0; + virtual void EnableIndexing() = 0; + // Returns the number of distinct Keys being tracked by this transaction. // If this transaction was created by a TransactinDB, this is the number of // keys that are currently locked by this transaction. diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index dc9167735..37f8c75db 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -179,7 +179,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -192,7 +192,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -204,7 +204,7 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Merge(column_family, key, value); + GetBatchForWrite()->Merge(column_family, key, value); num_merges_++; } @@ -216,7 +216,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -228,7 +228,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -240,7 +240,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->SingleDelete(column_family, key); + GetBatchForWrite()->SingleDelete(column_family, key); num_deletes_++; } @@ -252,7 +252,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key); if (s.ok()) { - write_batch_->SingleDelete(column_family, key); + GetBatchForWrite()->SingleDelete(column_family, key); num_deletes_++; } @@ -265,7 +265,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -279,7 +279,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Put(column_family, key, value); + GetBatchForWrite()->Put(column_family, key, value); num_puts_++; } @@ -293,7 +293,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Merge(column_family, key, value); + GetBatchForWrite()->Merge(column_family, key, value); num_merges_++; } @@ -306,7 +306,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -319,7 +319,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, Status s = TryLock(column_family, key, untracked); if (s.ok()) { - write_batch_->Delete(column_family, key); + GetBatchForWrite()->Delete(column_family, key); num_deletes_++; } @@ -380,6 +380,20 @@ const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() { return nullptr; } +// Gets the write batch that should be used for Put/Merge/Deletes. +// +// Returns either a WriteBatch or WriteBatchWithIndex depending on whether +// DisableIndexing() has been called. +WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() { + if (indexing_enabled_) { + // Use WriteBatchWithIndex + return write_batch_.get(); + } else { + // Don't use WriteBatchWithIndex. Return base WriteBatch. + return write_batch_->GetWriteBatch(); + } +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 54ea56771..aac882eef 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -170,6 +170,10 @@ class TransactionBaseImpl : public Transaction { void SetSnapshot() override; + void DisableIndexing() override { indexing_enabled_ = false; } + + void EnableIndexing() override { indexing_enabled_ = true; } + uint64_t GetElapsedTime() const override; uint64_t GetNumPuts() const override; @@ -241,8 +245,16 @@ class TransactionBaseImpl : public Transaction { // Optimistic Transactions will wait till commit time to do conflict checking. TransactionKeyMap tracked_keys_; + // If true, future Put/Merge/Deletes will be indexed in the + // WriteBatchWithIndex. + // If false, future Put/Merge/Deletes will be inserted directly into the + // underlying WriteBatch and not indexed in the WriteBatchWithIndex. + bool indexing_enabled_ = true; + Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, bool untracked = false); + + WriteBatchBase* GetBatchForWrite(); }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index edf15e3bb..42ded1576 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -181,6 +181,7 @@ Status TransactionDBImpl::Put(const WriteOptions& options, Status s; Transaction* txn = BeginInternalTransaction(options); + txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do PutUntracked(). @@ -201,6 +202,7 @@ Status TransactionDBImpl::Delete(const WriteOptions& wopts, Status s; Transaction* txn = BeginInternalTransaction(wopts); + txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do @@ -222,6 +224,7 @@ Status TransactionDBImpl::Merge(const WriteOptions& options, Status s; Transaction* txn = BeginInternalTransaction(options); + txn->DisableIndexing(); // Since the client didn't create a transaction, they don't care about // conflict checking for this write. So we just need to do @@ -241,6 +244,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { // Need to lock all keys in this batch to prevent write conflicts with // concurrent transactions. Transaction* txn = BeginInternalTransaction(opts); + txn->DisableIndexing(); assert(dynamic_cast(txn) != nullptr); auto txn_impl = reinterpret_cast(txn); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index dedc94c2e..fd6bb2a33 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -1301,6 +1301,68 @@ TEST_F(TransactionTest, IteratorTest) { delete txn; } +TEST_F(TransactionTest, DisableIndexingTest) { + WriteOptions write_options; + ReadOptions read_options; + string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options); + ASSERT_TRUE(txn); + + s = txn->Put("A", "a"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + txn->DisableIndexing(); + + s = txn->Put("B", "b"); + ASSERT_OK(s); + + s = txn->Get(read_options, "B", &value); + ASSERT_TRUE(s.IsNotFound()); + + Iterator* iter = txn->GetIterator(read_options); + ASSERT_OK(iter->status()); + + iter->Seek("B"); + ASSERT_OK(iter->status()); + ASSERT_FALSE(iter->Valid()); + + s = txn->Delete("A"); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("a", value); + + txn->EnableIndexing(); + + s = txn->Put("B", "bb"); + ASSERT_OK(s); + + iter->Seek("B"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bb", iter->value().ToString()); + + s = txn->Get(read_options, "B", &value); + ASSERT_OK(s); + ASSERT_EQ("bb", value); + + s = txn->Put("A", "aa"); + ASSERT_OK(s); + + s = txn->Get(read_options, "A", &value); + ASSERT_OK(s); + ASSERT_EQ("aa", value); + + delete iter; + delete txn; +} + TEST_F(TransactionTest, SavepointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options;