WritePrepared Txn: optimize SubBatchCnt

Summary:
Make use of the index in WriteBatchWithIndex to also count the number of sub-batches. This eliminates the need to separately scan the batch to count the number of sub-batches once a duplicate key is detected.
Closes https://github.com/facebook/rocksdb/pull/3529

Differential Revision: D7049947

Pulled By: maysamyabandeh

fbshipit-source-id: 81cbf12c4e662541c772c7265a8f91631e25c7cd
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 243220d08a
commit 90eca1e616
  1. 7
      include/rocksdb/utilities/write_batch_with_index.h
  2. 96
      utilities/transactions/write_prepared_transaction_test.cc
  3. 25
      utilities/transactions/write_prepared_txn.cc
  4. 27
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -228,8 +228,11 @@ class WriteBatchWithIndex : public WriteBatchBase {
private: private:
friend class WritePreparedTxn; friend class WritePreparedTxn;
// Returns true if there has been duplicate keys in the batch. friend class WriteBatchWithIndex_SubBatchCnt_Test;
bool HasDuplicateKeys(); // Returns the number of sub-batches inside the write batch. A sub-batch
// starts right before inserting a key that is a duplicate of a key in the
// last sub-batch.
size_t SubBatchCnt();
Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,

@ -179,6 +179,102 @@ TEST(PreparedHeap, Concurrent) {
} }
} }
// Test that WriteBatchWithIndex correctly counts the number of sub-batches
TEST(WriteBatchWithIndex, SubBatchCnt) {
ColumnFamilyOptions cf_options;
std::string cf_name = "two";
DB* db;
Options options;
options.create_if_missing = true;
const std::string dbname = test::TmpDir() + "/transaction_testdb";
DestroyDB(dbname, options);
ASSERT_OK(DB::Open(options, dbname, &db));
ColumnFamilyHandle* cf_handle = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
WriteOptions write_options;
size_t batch_cnt = 1;
size_t save_points = 0;
std::vector<size_t> batch_cnt_at;
WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
0);
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value2"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value3"));
batch_cnt++;
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the 2nd key. It should not be counted duplicate since a
// sub-patch is cut after the last duplicate.
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value4"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys but in a different cf. It should not be counted as
// duplicate keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(cf_handle, Slice("key"), Slice("value5"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// Test that the number of sub-batches matches what we count with
// SubBatchCounter
std::map<uint32_t, const Comparator*> comparators;
comparators[0] = db->DefaultColumnFamily()->GetComparator();
comparators[cf_handle->GetID()] = cf_handle->GetComparator();
SubBatchCounter counter(comparators);
ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
ASSERT_EQ(batch_cnt, counter.BatchCount());
// Test that RollbackToSavePoint will properly resets the number of
// sub-bathces
for (size_t i = save_points; i > 0; i--) {
batch.RollbackToSavePoint();
ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
}
// Test the count is right with random batches
{
const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
Random rnd(1131);
std::string keys[TOTAL_KEYS];
for (size_t k = 0; k < TOTAL_KEYS; k++) {
int len = static_cast<int>(rnd.Uniform(50));
keys[k] = test::RandomKey(&rnd, len);
}
for (size_t i = 0; i < 1000; i++) { // 1000 random batches
WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
0, true, 0);
for (size_t k = 0; k < 10; k++) { // 10 key per batch
size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
Slice key = Slice(keys[ki]);
std::string buffer;
Slice value = Slice(test::RandomString(&rnd, 16, &buffer));
rndbatch.Put(key, value);
}
SubBatchCounter batch_counter(comparators);
ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
}
}
delete cf_handle;
delete db;
}
TEST(CommitEntry64b, BasicTest) { TEST(CommitEntry64b, BasicTest) {
const size_t INDEX_BITS = static_cast<size_t>(21); const size_t INDEX_BITS = static_cast<size_t>(21);
const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS); const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);

@ -71,15 +71,7 @@ Status WritePreparedTxn::PrepareInternal() {
const bool DISABLE_MEMTABLE = true; const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
// For each duplicate key we account for a new sub-batch // For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = 1; prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());
prepare_batch_cnt_ = counter.BatchCount();
}
Status s = Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, /*callback*/ nullptr, &log_number_, /*log ref*/ 0,
@ -98,10 +90,7 @@ Status WritePreparedTxn::PrepareInternal() {
Status WritePreparedTxn::CommitWithoutPrepareInternal() { Status WritePreparedTxn::CommitWithoutPrepareInternal() {
// For each duplicate key we account for a new sub-batch // For each duplicate key we account for a new sub-batch
size_t batch_cnt = 1; const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) {
batch_cnt = 0; // this will trigger a batch cnt compute
}
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt); return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
} }
@ -326,15 +315,7 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) { Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch); auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
prepare_batch_cnt_ = 1; prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());
prepare_batch_cnt_ = counter.BatchCount();
}
return ret; return ret;
} }

@ -8,7 +8,6 @@
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include <memory> #include <memory>
#include <vector>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_impl.h" #include "db/db_impl.h"
@ -393,14 +392,21 @@ struct WriteBatchWithIndex::Rep {
comparator(index_comparator, &write_batch), comparator(index_comparator, &write_batch),
skip_list(comparator, &arena), skip_list(comparator, &arena),
overwrite_key(_overwrite_key), overwrite_key(_overwrite_key),
last_entry_offset(0) {} last_entry_offset(0),
last_sub_batch_offset(0),
sub_batch_cnt(1) {}
ReadableWriteBatch write_batch; ReadableWriteBatch write_batch;
WriteBatchEntryComparator comparator; WriteBatchEntryComparator comparator;
Arena arena; Arena arena;
WriteBatchEntrySkipList skip_list; WriteBatchEntrySkipList skip_list;
bool overwrite_key; bool overwrite_key;
size_t last_entry_offset; size_t last_entry_offset;
std::vector<size_t> obsolete_offsets; // The starting offset of the last sub-batch. A sub-batch starts right before
// inserting a key that is a duplicate of a key in the last sub-batch. Zero,
// the default, means that no duplicate key is detected so far.
size_t last_sub_batch_offset;
// Total number of sub-batches in the write batch. Default is 1.
size_t sub_batch_cnt;
// Remember current offset of internal write batch, which is used as // Remember current offset of internal write batch, which is used as
// the starting offset of the next record. // the starting offset of the next record.
@ -452,7 +458,10 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
} }
WriteBatchIndexEntry* non_const_entry = WriteBatchIndexEntry* non_const_entry =
const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry()); const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
obsolete_offsets.push_back(non_const_entry->offset); if (LIKELY(last_sub_batch_offset <= non_const_entry->offset)) {
last_sub_batch_offset = last_entry_offset;
sub_batch_cnt++;
}
non_const_entry->offset = last_entry_offset; non_const_entry->offset = last_entry_offset;
return true; return true;
} }
@ -504,6 +513,8 @@ void WriteBatchWithIndex::Rep::ClearIndex() {
new (&arena) Arena(); new (&arena) Arena();
new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); new (&skip_list) WriteBatchEntrySkipList(comparator, &arena);
last_entry_offset = 0; last_entry_offset = 0;
last_sub_batch_offset = 0;
sub_batch_cnt = 1;
} }
Status WriteBatchWithIndex::Rep::ReBuildIndex() { Status WriteBatchWithIndex::Rep::ReBuildIndex() {
@ -582,9 +593,7 @@ WriteBatchWithIndex::~WriteBatchWithIndex() {}
WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
bool WriteBatchWithIndex::HasDuplicateKeys() { size_t WriteBatchWithIndex::SubBatchCnt() { return rep->sub_batch_cnt; }
return rep->obsolete_offsets.size() > 0;
}
WBWIIterator* WriteBatchWithIndex::NewIterator() { WBWIIterator* WriteBatchWithIndex::NewIterator() {
return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
@ -883,8 +892,8 @@ Status WriteBatchWithIndex::RollbackToSavePoint() {
Status s = rep->write_batch.RollbackToSavePoint(); Status s = rep->write_batch.RollbackToSavePoint();
if (s.ok()) { if (s.ok()) {
// obsolete_offsets will be rebuilt by ReBuildIndex rep->sub_batch_cnt = 1;
rep->obsolete_offsets.clear(); rep->last_sub_batch_offset = 0;
s = rep->ReBuildIndex(); s = rep->ReBuildIndex();
} }

Loading…
Cancel
Save