DisableIndexing() for Transactions

Summary:
MyRocks reported some perfomance issues when inserting many keys into a transaction due to the cost of inserting new keys into WriteBatchWithIndex.  Frequently, they don't even need the keys to be indexed as they don't need to read them back.  DisableIndexing() can be used to avoid the cost of indexing.

I also plan on eventually investigating if we can improve WriteBatchWithIndex performance.  But even if we improved the perf here, it is still beneficial to be able to disable the indexing all together for large transactions.

Test Plan: unit test

Reviewers: igor, rven, yoshinorim, spetrunia, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D48471
main
agiardullo 9 years ago
parent 776bd8d5eb
commit c5f3707d42
  1. 15
      include/rocksdb/utilities/transaction.h
  2. 38
      utilities/transactions/transaction_base.cc
  3. 12
      utilities/transactions/transaction_base.h
  4. 4
      utilities/transactions/transaction_db_impl.cc
  5. 62
      utilities/transactions/transaction_test.cc

@ -263,6 +263,21 @@ class Transaction {
// Similar to WriteBatch::PutLogData // Similar to WriteBatch::PutLogData
virtual void PutLogData(const Slice& blob) = 0; virtual void PutLogData(const Slice& blob) = 0;
// By default, all Put/Merge/Delete operations will be indexed in the
// transaction so that Get/GetForUpdate/GetIterator can search for these
// keys.
//
// If the caller does not want to fetch the keys about to be written,
// they may want to avoid indexing as a performance optimization.
// Calling DisableIndexing() will turn off indexing for all future
// Put/Merge/Delete operations until EnableIndexing() is called.
//
// If a key is Put/Merge/Deleted after DisableIndexing is called and then
// is fetched via Get/GetForUpdate/GetIterator, the result of the fetch is
// undefined.
virtual void DisableIndexing() = 0;
virtual void EnableIndexing() = 0;
// Returns the number of distinct Keys being tracked by this transaction. // Returns the number of distinct Keys being tracked by this transaction.
// If this transaction was created by a TransactinDB, this is the number of // If this transaction was created by a TransactinDB, this is the number of
// keys that are currently locked by this transaction. // keys that are currently locked by this transaction.

@ -179,7 +179,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key); Status s = TryLock(column_family, key);
if (s.ok()) { if (s.ok()) {
write_batch_->Put(column_family, key, value); GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; num_puts_++;
} }
@ -192,7 +192,7 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key); Status s = TryLock(column_family, key);
if (s.ok()) { if (s.ok()) {
write_batch_->Put(column_family, key, value); GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; num_puts_++;
} }
@ -204,7 +204,7 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key); Status s = TryLock(column_family, key);
if (s.ok()) { if (s.ok()) {
write_batch_->Merge(column_family, key, value); GetBatchForWrite()->Merge(column_family, key, value);
num_merges_++; num_merges_++;
} }
@ -216,7 +216,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key); Status s = TryLock(column_family, key);
if (s.ok()) { if (s.ok()) {
write_batch_->Delete(column_family, key); GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; num_deletes_++;
} }
@ -228,7 +228,7 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key); Status s = TryLock(column_family, key);
if (s.ok()) { if (s.ok()) {
write_batch_->Delete(column_family, key); GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; num_deletes_++;
} }
@ -240,7 +240,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key); Status s = TryLock(column_family, key);
if (s.ok()) { if (s.ok()) {
write_batch_->SingleDelete(column_family, key); GetBatchForWrite()->SingleDelete(column_family, key);
num_deletes_++; num_deletes_++;
} }
@ -252,7 +252,7 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key); Status s = TryLock(column_family, key);
if (s.ok()) { if (s.ok()) {
write_batch_->SingleDelete(column_family, key); GetBatchForWrite()->SingleDelete(column_family, key);
num_deletes_++; num_deletes_++;
} }
@ -265,7 +265,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key, untracked); Status s = TryLock(column_family, key, untracked);
if (s.ok()) { if (s.ok()) {
write_batch_->Put(column_family, key, value); GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; num_puts_++;
} }
@ -279,7 +279,7 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key, untracked); Status s = TryLock(column_family, key, untracked);
if (s.ok()) { if (s.ok()) {
write_batch_->Put(column_family, key, value); GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; num_puts_++;
} }
@ -293,7 +293,7 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key, untracked); Status s = TryLock(column_family, key, untracked);
if (s.ok()) { if (s.ok()) {
write_batch_->Merge(column_family, key, value); GetBatchForWrite()->Merge(column_family, key, value);
num_merges_++; num_merges_++;
} }
@ -306,7 +306,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key, untracked); Status s = TryLock(column_family, key, untracked);
if (s.ok()) { if (s.ok()) {
write_batch_->Delete(column_family, key); GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; num_deletes_++;
} }
@ -319,7 +319,7 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
Status s = TryLock(column_family, key, untracked); Status s = TryLock(column_family, key, untracked);
if (s.ok()) { if (s.ok()) {
write_batch_->Delete(column_family, key); GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; num_deletes_++;
} }
@ -380,6 +380,20 @@ const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
return nullptr; return nullptr;
} }
// Gets the write batch that should be used for Put/Merge/Deletes.
//
// Returns either a WriteBatch or WriteBatchWithIndex depending on whether
// DisableIndexing() has been called.
WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
if (indexing_enabled_) {
// Use WriteBatchWithIndex
return write_batch_.get();
} else {
// Don't use WriteBatchWithIndex. Return base WriteBatch.
return write_batch_->GetWriteBatch();
}
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -170,6 +170,10 @@ class TransactionBaseImpl : public Transaction {
void SetSnapshot() override; void SetSnapshot() override;
void DisableIndexing() override { indexing_enabled_ = false; }
void EnableIndexing() override { indexing_enabled_ = true; }
uint64_t GetElapsedTime() const override; uint64_t GetElapsedTime() const override;
uint64_t GetNumPuts() const override; uint64_t GetNumPuts() const override;
@ -241,8 +245,16 @@ class TransactionBaseImpl : public Transaction {
// Optimistic Transactions will wait till commit time to do conflict checking. // Optimistic Transactions will wait till commit time to do conflict checking.
TransactionKeyMap tracked_keys_; TransactionKeyMap tracked_keys_;
// If true, future Put/Merge/Deletes will be indexed in the
// WriteBatchWithIndex.
// If false, future Put/Merge/Deletes will be inserted directly into the
// underlying WriteBatch and not indexed in the WriteBatchWithIndex.
bool indexing_enabled_ = true;
Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key, Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
bool untracked = false); bool untracked = false);
WriteBatchBase* GetBatchForWrite();
}; };
} // namespace rocksdb } // namespace rocksdb

@ -181,6 +181,7 @@ Status TransactionDBImpl::Put(const WriteOptions& options,
Status s; Status s;
Transaction* txn = BeginInternalTransaction(options); Transaction* txn = BeginInternalTransaction(options);
txn->DisableIndexing();
// Since the client didn't create a transaction, they don't care about // Since the client didn't create a transaction, they don't care about
// conflict checking for this write. So we just need to do PutUntracked(). // conflict checking for this write. So we just need to do PutUntracked().
@ -201,6 +202,7 @@ Status TransactionDBImpl::Delete(const WriteOptions& wopts,
Status s; Status s;
Transaction* txn = BeginInternalTransaction(wopts); Transaction* txn = BeginInternalTransaction(wopts);
txn->DisableIndexing();
// Since the client didn't create a transaction, they don't care about // Since the client didn't create a transaction, they don't care about
// conflict checking for this write. So we just need to do // conflict checking for this write. So we just need to do
@ -222,6 +224,7 @@ Status TransactionDBImpl::Merge(const WriteOptions& options,
Status s; Status s;
Transaction* txn = BeginInternalTransaction(options); Transaction* txn = BeginInternalTransaction(options);
txn->DisableIndexing();
// Since the client didn't create a transaction, they don't care about // Since the client didn't create a transaction, they don't care about
// conflict checking for this write. So we just need to do // conflict checking for this write. So we just need to do
@ -241,6 +244,7 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
// Need to lock all keys in this batch to prevent write conflicts with // Need to lock all keys in this batch to prevent write conflicts with
// concurrent transactions. // concurrent transactions.
Transaction* txn = BeginInternalTransaction(opts); Transaction* txn = BeginInternalTransaction(opts);
txn->DisableIndexing();
assert(dynamic_cast<TransactionImpl*>(txn) != nullptr); assert(dynamic_cast<TransactionImpl*>(txn) != nullptr);
auto txn_impl = reinterpret_cast<TransactionImpl*>(txn); auto txn_impl = reinterpret_cast<TransactionImpl*>(txn);

@ -1301,6 +1301,68 @@ TEST_F(TransactionTest, IteratorTest) {
delete txn; delete txn;
} }
TEST_F(TransactionTest, DisableIndexingTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
Status s;
Transaction* txn = db->BeginTransaction(write_options);
ASSERT_TRUE(txn);
s = txn->Put("A", "a");
ASSERT_OK(s);
s = txn->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a", value);
txn->DisableIndexing();
s = txn->Put("B", "b");
ASSERT_OK(s);
s = txn->Get(read_options, "B", &value);
ASSERT_TRUE(s.IsNotFound());
Iterator* iter = txn->GetIterator(read_options);
ASSERT_OK(iter->status());
iter->Seek("B");
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->Valid());
s = txn->Delete("A");
s = txn->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("a", value);
txn->EnableIndexing();
s = txn->Put("B", "bb");
ASSERT_OK(s);
iter->Seek("B");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bb", iter->value().ToString());
s = txn->Get(read_options, "B", &value);
ASSERT_OK(s);
ASSERT_EQ("bb", value);
s = txn->Put("A", "aa");
ASSERT_OK(s);
s = txn->Get(read_options, "A", &value);
ASSERT_OK(s);
ASSERT_EQ("aa", value);
delete iter;
delete txn;
}
TEST_F(TransactionTest, SavepointTest) { TEST_F(TransactionTest, SavepointTest) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options, snapshot_read_options; ReadOptions read_options, snapshot_read_options;

Loading…
Cancel
Save