WritePrepared Txn: use TransactionDBWriteOptimizations (2nd attempt)

Summary:
TransactionDB::Write can receive some optimization hints from the user. One is to skip the concurrency control mechanism. WritePreparedTxnDB is currently ignoring such hints. This patch optimizes WritePreparedTxnDB::Write for skip_concurrency_control and skip_duplicate_key_check hints.
Closes https://github.com/facebook/rocksdb/pull/3496

Differential Revision: D6971784

Pulled By: maysamyabandeh

fbshipit-source-id: cbab10ad538fa2b8bcb47e37c77724afe6e30f03
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent ee1c802675
commit 8a04ee4fd1
  1. 4
      include/rocksdb/utilities/transaction_db.h
  2. 21
      utilities/transactions/transaction_test.cc
  3. 3
      utilities/transactions/transaction_test.h
  4. 3
      utilities/transactions/write_prepared_transaction_test.cc
  5. 130
      utilities/transactions/write_prepared_txn.cc
  6. 6
      utilities/transactions/write_prepared_txn.h
  7. 108
      utilities/transactions/write_prepared_txn_db.cc
  8. 61
      utilities/transactions/write_prepared_txn_db.h

@ -138,6 +138,10 @@ struct TransactionDBWriteOptimizations {
// and hence the concurrency control mechanism could be skipped for this // and hence the concurrency control mechanism could be skipped for this
// write. // write.
bool skip_concurrency_control = false; bool skip_concurrency_control = false;
// If true, the application guarantees that there is no duplicate <column
// family, key> in the write batch and any employed mechanism to hanlde
// duplicate keys could be skipped.
bool skip_duplicate_key_check = false;
}; };
struct KeyLockInfo { struct KeyLockInfo {

@ -4987,6 +4987,27 @@ TEST_P(TransactionTest, SeqAdvanceTest) {
} }
} }
// Verify that the optimization would not compromize the correctness
TEST_P(TransactionTest, Optimizations) {
size_t comb_cnt = size_t(1) << 2; // 2 is number of optimization vars
for (size_t new_comb = 0; new_comb < comb_cnt; new_comb++) {
TransactionDBWriteOptimizations optimizations;
optimizations.skip_concurrency_control = IsInCombination(0, new_comb);
optimizations.skip_duplicate_key_check = IsInCombination(1, new_comb);
ReOpen();
WriteOptions write_options;
WriteBatch batch;
batch.Put(Slice("k"), Slice("v1"));
ASSERT_OK(db->Write(write_options, &batch));
ReadOptions ropt;
PinnableSlice pinnable_val;
ASSERT_OK(db->Get(ropt, db->DefaultColumnFamily(), "k", &pinnable_val));
ASSERT_TRUE(pinnable_val == ("v1"));
}
}
// Test that the transactional db can handle duplicate keys in the write batch // Test that the transactional db can handle duplicate keys in the write batch
TEST_P(TransactionTest, DuplicateKeys) { TEST_P(TransactionTest, DuplicateKeys) {
ColumnFamilyOptions cf_options; ColumnFamilyOptions cf_options;

@ -36,6 +36,9 @@
namespace rocksdb { namespace rocksdb {
// Return true if the ith bit is set in combination represented by comb
bool IsInCombination(size_t i, size_t comb) { return comb & (size_t(1) << i); }
class TransactionTestBase : public ::testing::Test { class TransactionTestBase : public ::testing::Test {
public: public:
TransactionDB* db; TransactionDB* db;

@ -627,9 +627,6 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
} }
} }
// Return true if the ith bit is set in combination represented by comb
bool IsInCombination(size_t i, size_t comb) { return comb & (size_t(1) << i); }
// This test is too slow for travis // This test is too slow for travis
#ifndef TRAVIS #ifndef TRAVIS
// Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in // Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in

@ -62,69 +62,6 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
return write_batch_.NewIteratorWithBase(column_family, db_iter); return write_batch_.NewIteratorWithBase(column_family, db_iter);
} }
namespace {
// A wrapper around Comparator to make it usable in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}
private:
const Comparator* user_comparator_;
};
// Count the number of sub-batches inside a batch. A sub-batch does not have
// duplicate keys.
struct SubBatchCounter : public WriteBatch::Handler {
explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
: comparators_(comparators), batches_(1) {}
std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
size_t batches_;
size_t BatchCount() { return batches_; }
void AddKey(uint32_t cf, const Slice& key) {
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto it = cf_keys.insert(key);
if (it.second == false) { // second is false if a element already existed.
batches_++;
keys_.clear();
keys_[cf].insert(key);
}
}
Status MarkNoop(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
bool WriteAfterCommit() const override { return false; }
};
} // namespace
Status WritePreparedTxn::PrepareInternal() { Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_; WriteOptions write_options = write_options_;
write_options.disableWAL = false; write_options.disableWAL = false;
@ -168,72 +105,7 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch, Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
size_t batch_cnt) { size_t batch_cnt) {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
"CommitBatchInternal");
if (batch->Count() == 0) {
// Otherwise our 1 seq per batch logic will break since there is no seq
// increased for this batch.
return Status::OK();
}
if (batch_cnt == 0) { // not provided, then compute it
// TODO(myabandeh): add an option to allow user skipping this cost
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = batch->Iterate(&counter);
assert(s.ok());
batch_cnt = counter.BatchCount();
}
assert(batch_cnt);
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
bool sync = write_options_.sync;
if (!do_one_write) {
// No need to sync on the first write
write_options_.sync = false;
}
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch);
const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
const size_t ZERO_PREPARES = 0;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
auto s = db_impl_->WriteImpl(
write_options_, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE,
&seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used;
SetId(prepare_seq);
if (!s.ok()) {
return s;
}
if (do_one_write) {
return s;
} // else do the 2nd write for commit
// Set the original value of sync
write_options_.sync = sync;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// Note: we skip AddPrepared here. This could be further optimized by skip
// erasing prepare_seq from prepared_txn_ in the following callback.
// TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and
// readers assume the prepared data as committed? Almost zero probability.
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
wpt_db_, db_impl_, prepare_seq, batch_cnt);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1;
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
} }
Status WritePreparedTxn::CommitInternal() { Status WritePreparedTxn::CommitInternal() {

@ -61,8 +61,14 @@ class WritePreparedTxn : public PessimisticTransaction {
virtual Iterator* GetIterator(const ReadOptions& options, virtual Iterator* GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override; ColumnFamilyHandle* column_family) override;
protected:
// Override the protected SetId to make it visible to the firend class
// WritePreparedTxnDB
inline void SetId(uint64_t id) override { Transaction::SetId(id); }
private: private:
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTxnDB;
Status PrepareInternal() override; Status PrepareInternal() override;

@ -73,6 +73,100 @@ Transaction* WritePreparedTxnDB::BeginTransaction(
} }
} }
Status WritePreparedTxnDB::Write(
const WriteOptions& opts,
const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
if (optimizations.skip_concurrency_control) {
// Skip locking the rows
const size_t UNKNOWN_BATCH_CNT = 0;
const size_t ONE_BATCH_CNT = 1;
const size_t batch_cnt = optimizations.skip_duplicate_key_check
? ONE_BATCH_CNT
: UNKNOWN_BATCH_CNT;
WritePreparedTxn* NO_TXN = nullptr;
return WriteInternal(opts, updates, batch_cnt, NO_TXN);
} else {
// TODO(myabandeh): Make use of skip_duplicate_key_check hint
// Fall back to unoptimized version
return PessimisticTransactionDB::Write(opts, updates);
}
}
Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
WriteBatch* batch, size_t batch_cnt,
WritePreparedTxn* txn) {
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal");
if (batch->Count() == 0) {
// Otherwise our 1 seq per batch logic will break since there is no seq
// increased for this batch.
return Status::OK();
}
if (batch_cnt == 0) { // not provided, then compute it
// TODO(myabandeh): add an option to allow user skipping this cost
SubBatchCounter counter(*GetCFComparatorMap());
auto s = batch->Iterate(&counter);
assert(s.ok());
batch_cnt = counter.BatchCount();
}
assert(batch_cnt);
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
WriteOptions write_options(write_options_orig);
bool sync = write_options.sync;
if (!do_one_write) {
// No need to sync on the first write
write_options.sync = false;
}
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch);
const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
const size_t ZERO_PREPARES = 0;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
auto s = db_impl_->WriteImpl(
write_options, batch, nullptr, nullptr, no_log_ref, !DISABLE_MEMTABLE,
&seq_used, batch_cnt, do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used;
if (txn != nullptr) {
txn->SetId(prepare_seq);
}
if (!s.ok()) {
return s;
}
if (do_one_write) {
return s;
} // else do the 2nd write for commit
// Set the original value of sync
write_options.sync = sync;
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// TODO(myabandeh): Note: we skip AddPrepared here. This could be further
// optimized by skip erasing prepare_seq from prepared_txn_ in the following
// callback.
// TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and
// readers assume the prepared data as committed? Almost zero probability.
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
this, db_impl_, prepare_seq, batch_cnt);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1;
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s;
}
Status WritePreparedTxnDB::Get(const ReadOptions& options, Status WritePreparedTxnDB::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) { const Slice& key, PinnableSlice* value) {
@ -633,5 +727,19 @@ WritePreparedTxnDB::~WritePreparedTxnDB() {
db_impl_->CancelAllBackgroundWork(true /*wait*/); db_impl_->CancelAllBackgroundWork(true /*wait*/);
} }
void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
auto cmp = comparators_[cf];
keys_[cf] = CFKeys(SetComparator(cmp));
}
auto it = cf_keys.insert(key);
if (it.second == false) { // second is false if a element already existed.
batches_++;
keys_.clear();
keys_[cf].insert(key);
}
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -73,13 +73,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const TransactionOptions& txn_options, const TransactionOptions& txn_options,
Transaction* old_txn) override; Transaction* old_txn) override;
// TODO(myabandeh): Implement this
// Optimized version of ::Write that receives more optimization request such // Optimized version of ::Write that receives more optimization request such
// as skip_concurrency_control. // as skip_concurrency_control.
// using PessimisticTransactionDB::Write; using PessimisticTransactionDB::Write;
// Status Write(const WriteOptions& opts, const Status Write(const WriteOptions& opts, const TransactionDBWriteOptimizations&,
// TransactionDBWriteOptimizations&, WriteBatch* updates) override;
// WriteBatch* updates) override;
// Write the batch to the underlying DB and mark it as committed. Could be
// used by both directly from TxnDB or through a transaction.
Status WriteInternal(const WriteOptions& write_options, WriteBatch* batch,
size_t batch_cnt, WritePreparedTxn* txn);
using DB::Get; using DB::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
@ -473,5 +476,53 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
bool includes_data_; bool includes_data_;
}; };
// A wrapper around Comparator to make it usable in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}
private:
const Comparator* user_comparator_;
};
// Count the number of sub-batches inside a batch. A sub-batch does not have
// duplicate keys.
struct SubBatchCounter : public WriteBatch::Handler {
explicit SubBatchCounter(std::map<uint32_t, const Comparator*>& comparators)
: comparators_(comparators), batches_(1) {}
std::map<uint32_t, const Comparator*>& comparators_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
size_t batches_;
size_t BatchCount() { return batches_; }
void AddKey(const uint32_t cf, const Slice& key);
Status MarkNoop(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
AddKey(cf, key);
return Status::OK();
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
AddKey(cf, key);
return Status::OK();
}
Status MarkBeginPrepare() override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
bool WriteAfterCommit() const override { return false; }
};
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

Loading…
Cancel
Save