WritePrepared Txn: optimizations for sysbench update_noindex

Summary:
These are optimization that we applied to improve sysbech's update_noindex performance.
1. Make use of LIKELY compiler hint
2. Move std::atomic so the subclass
3. Make use of skip_prepared in non-2pc transactions.
Closes https://github.com/facebook/rocksdb/pull/3512

Differential Revision: D7000075

Pulled By: maysamyabandeh

fbshipit-source-id: 1ab8292584df1f6305a4992973fb1b7933632181
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 97307d888f
commit c178da053b
  1. 25
      db/write_batch.cc
  2. 26
      utilities/transactions/pessimistic_transaction_db.cc
  3. 15
      utilities/transactions/pessimistic_transaction_db.h
  4. 16
      utilities/transactions/write_prepared_txn.cc
  5. 2
      utilities/transactions/write_prepared_txn.h
  6. 34
      utilities/transactions/write_prepared_txn_db.cc
  7. 20
      utilities/transactions/write_prepared_txn_db.h

@ -404,8 +404,9 @@ Status WriteBatch::Iterate(Handler* handler) const {
Status s;
char tag = 0;
uint32_t column_family = 0; // default
while ((s.ok() || s.IsTryAgain()) && !input.empty() && handler->Continue()) {
if (!s.IsTryAgain()) {
while ((s.ok() || UNLIKELY(s.IsTryAgain())) && !input.empty() &&
handler->Continue()) {
if (LIKELY(!s.IsTryAgain())) {
tag = 0;
column_family = 0; // default
@ -425,7 +426,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
if (s.ok()) {
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
@ -435,7 +436,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
if (s.ok()) {
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
@ -445,7 +446,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
if (s.ok()) {
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
@ -455,7 +456,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
if (s.ok()) {
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
@ -465,7 +466,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
if (s.ok()) {
if (LIKELY(s.ok())) {
empty_batch = false;
found++;
}
@ -475,7 +476,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutBlobIndexCF(column_family, key, value);
if (s.ok()) {
if (LIKELY(s.ok())) {
found++;
}
break;
@ -1158,7 +1159,7 @@ class MemTableInserter : public WriteBatch::Handler {
bool mem_res =
mem->Add(sequence_, value_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
if (!mem_res) {
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
@ -1234,7 +1235,7 @@ class MemTableInserter : public WriteBatch::Handler {
bool mem_res =
mem->Add(sequence_, delete_type, key, value,
concurrent_memtable_writes_, get_post_process_info(mem));
if (!mem_res) {
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
@ -1391,7 +1392,7 @@ class MemTableInserter : public WriteBatch::Handler {
} else {
// 3) Add value to memtable
bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value);
if (!mem_res) {
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;
@ -1403,7 +1404,7 @@ class MemTableInserter : public WriteBatch::Handler {
if (!perform_merge) {
// Add merge operator to memtable
bool mem_res = mem->Add(sequence_, kTypeMerge, key, value);
if (!mem_res) {
if (UNLIKELY(!mem_res)) {
assert(seq_per_batch_);
ret_status = Status::TryAgain("key+seq exists");
const bool BATCH_BOUNDRY = true;

@ -175,30 +175,6 @@ TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
return validated;
}
void PessimisticTransactionDB::UpdateCFComparatorMap(
const std::vector<ColumnFamilyHandle*>& handles) {
auto cf_map = new std::map<uint32_t, const Comparator*>();
for (auto h : handles) {
auto id = h->GetID();
const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator;
}
cf_map_.store(cf_map);
cf_map_gc_.reset(cf_map);
}
void PessimisticTransactionDB::UpdateCFComparatorMap(
const ColumnFamilyHandle* h) {
auto old_cf_map_ptr = cf_map_.load();
assert(old_cf_map_ptr);
auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
auto id = h->GetID();
const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator;
cf_map_.store(cf_map);
cf_map_gc_.reset(cf_map);
}
Status TransactionDB::Open(const Options& options,
const TransactionDBOptions& txn_db_options,
const std::string& dbname, TransactionDB** dbptr) {
@ -228,7 +204,7 @@ Status TransactionDB::Open(
Status s;
DB* db;
ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is " PRId32,
ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32,
static_cast<int>(txn_db_options.write_policy));
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
std::vector<size_t> compaction_enabled_cf_indices;

@ -113,20 +113,17 @@ class PessimisticTransactionDB : public TransactionDB {
std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
void SetDeadlockInfoBufferSize(uint32_t target_size) override;
void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>& handles);
void UpdateCFComparatorMap(const ColumnFamilyHandle* handle);
std::map<uint32_t, const Comparator*>* GetCFComparatorMap() {
return cf_map_.load();
}
// The default implementation does nothing. The actual implementation is moved
// to the child classes that actually need this information. This was due to
// an odd performance drop we observed when the added std::atomic member to
// the base class even when the subclass do not read it in the fast path.
virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
virtual void UpdateCFComparatorMap(const ColumnFamilyHandle*) {}
protected:
DBImpl* db_impl_;
std::shared_ptr<Logger> info_log_;
const TransactionDBOptions txn_db_options_;
// A cache of the cf comparators
std::atomic<std::map<uint32_t, const Comparator*>*> cf_map_;
// GC of the object above
std::unique_ptr<std::map<uint32_t, const Comparator*>> cf_map_gc_;
void ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,

@ -72,7 +72,9 @@ Status WritePreparedTxn::PrepareInternal() {
uint64_t seq_used = kMaxSequenceNumber;
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());
@ -97,7 +99,7 @@ Status WritePreparedTxn::PrepareInternal() {
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
// For each duplicate key we account for a new sub-batch
size_t batch_cnt = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) {
batch_cnt = 0; // this will trigger a batch cnt compute
}
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
@ -129,7 +131,9 @@ Status WritePreparedTxn::CommitInternal() {
const bool includes_data = !empty && !for_recovery;
assert(prepare_batch_cnt_);
size_t commit_batch_cnt = 0;
if (includes_data) {
if (UNLIKELY(includes_data)) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = working_batch->Iterate(&counter);
assert(s.ok());
@ -143,7 +147,7 @@ Status WritePreparedTxn::CommitInternal() {
// a connection between the memtable and its WAL, so there is no need to
// redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
size_t batch_cnt = commit_batch_cnt ? commit_batch_cnt : 1;
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used,
batch_cnt, &update_commit_map);
@ -320,7 +324,9 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
prepare_batch_cnt_ = 1;
if (GetWriteBatch()->HasDuplicateKeys()) {
if (UNLIKELY(GetWriteBatch()->HasDuplicateKeys())) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&counter);
assert(s.ok());

@ -62,7 +62,7 @@ class WritePreparedTxn : public PessimisticTransaction {
ColumnFamilyHandle* column_family) override;
protected:
// Override the protected SetId to make it visible to the firend class
// Override the protected SetId to make it visible to the friend class
// WritePreparedTxnDB
inline void SetId(uint64_t id) override { Transaction::SetId(id); }

@ -104,6 +104,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
}
if (batch_cnt == 0) { // not provided, then compute it
// TODO(myabandeh): add an option to allow user skipping this cost
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*GetCFComparatorMap());
auto s = batch->Iterate(&counter);
assert(s.ok());
@ -145,16 +147,15 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
// TODO(myabandeh): Note: we skip AddPrepared here. This could be further
// optimized by skip erasing prepare_seq from prepared_txn_ in the following
// callback.
// TODO(myabandeh): What if max advances the prepare_seq_ in the meanwhile and
// readers assume the prepared data as committed? Almost zero probability.
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
const size_t ZERO_COMMITS = 0;
const bool PREP_HEAP_SKIPPED = true;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
this, db_impl_, prepare_seq, batch_cnt);
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, PREP_HEAP_SKIPPED);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
const size_t ONE_BATCH = 1;
@ -184,6 +185,31 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
&callback);
}
void WritePreparedTxnDB::UpdateCFComparatorMap(
const std::vector<ColumnFamilyHandle*>& handles) {
auto cf_map = new std::map<uint32_t, const Comparator*>();
for (auto h : handles) {
auto id = h->GetID();
const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator;
}
cf_map_.store(cf_map);
cf_map_gc_.reset(cf_map);
}
void WritePreparedTxnDB::UpdateCFComparatorMap(
const ColumnFamilyHandle* h) {
auto old_cf_map_ptr = cf_map_.load();
assert(old_cf_map_ptr);
auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
auto id = h->GetID();
const Comparator* comparator = h->GetComparator();
(*cf_map)[id] = comparator;
cf_map_.store(cf_map);
cf_map_gc_.reset(cf_map);
}
std::vector<Status> WritePreparedTxnDB::MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,

@ -216,6 +216,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState;
std::map<uint32_t, const Comparator*>* GetCFComparatorMap() {
return cf_map_.load();
}
void UpdateCFComparatorMap(
const std::vector<ColumnFamilyHandle*>& handles) override;
void UpdateCFComparatorMap(const ColumnFamilyHandle* handle) override;
protected:
virtual Status VerifyCFOptions(
const ColumnFamilyOptions& cf_options) override;
@ -394,6 +401,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
mutable port::RWMutex old_commit_map_mutex_;
mutable port::RWMutex commit_cache_mutex_;
mutable port::RWMutex snapshots_mutex_;
// A cache of the cf comparators
std::atomic<std::map<uint32_t, const Comparator*>*> cf_map_;
// GC of the object above
std::unique_ptr<std::map<uint32_t, const Comparator*>> cf_map_gc_;
};
class WritePreparedTxnReadCallback : public ReadCallback {
@ -420,12 +431,14 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
DBImpl* db_impl,
SequenceNumber prep_seq,
size_t prep_batch_cnt,
size_t data_batch_cnt = 0)
size_t data_batch_cnt = 0,
bool prep_heap_skipped = false)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),
prep_batch_cnt_(prep_batch_cnt),
data_batch_cnt_(data_batch_cnt),
prep_heap_skipped_(prep_heap_skipped),
includes_data_(data_batch_cnt_ > 0) {
assert((prep_batch_cnt_ > 0) != (prep_seq == kMaxSequenceNumber)); // xor
assert(prep_batch_cnt_ > 0 || data_batch_cnt_ > 0);
@ -438,7 +451,7 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
: commit_seq + data_batch_cnt_ - 1;
if (prep_seq_ != kMaxSequenceNumber) {
for (size_t i = 0; i < prep_batch_cnt_; i++) {
db_->AddCommitted(prep_seq_ + i, last_commit_seq);
db_->AddCommitted(prep_seq_ + i, last_commit_seq, prep_heap_skipped_);
}
} // else there was no prepare phase
if (includes_data_) {
@ -471,6 +484,9 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
SequenceNumber prep_seq_;
size_t prep_batch_cnt_;
size_t data_batch_cnt_;
// An optimization that indicates that there is no need to update the prepare
// heap since the prepare sequence number was not added to it.
bool prep_heap_skipped_;
// Either because it is commit without prepare or it has a
// CommitTimeWriteBatch
bool includes_data_;

Loading…
Cancel
Save